@@ -33,11 +33,10 @@ use datafusion::{
33
33
physical_plan:: FileScanConfig ,
34
34
} ,
35
35
error:: DataFusionError ,
36
- execution:: context:: ExecutionProps ,
37
- logical_expr:: { BuiltinScalarFunction , ColumnarValue , Operator } ,
36
+ logical_expr:: { ColumnarValue , Operator , ScalarUDF , Volatility } ,
38
37
physical_expr:: {
39
38
expressions:: { in_list, LikeExpr , SCAndExpr , SCOrExpr } ,
40
- functions , ScalarFunctionExpr ,
39
+ ScalarFunctionExpr ,
41
40
} ,
42
41
physical_plan:: {
43
42
expressions as phys_expr,
@@ -48,6 +47,7 @@ use datafusion::{
48
47
union:: UnionExec ,
49
48
ColumnStatistics , ExecutionPlan , Partitioning , PhysicalExpr , Statistics ,
50
49
} ,
50
+ prelude:: create_udf,
51
51
} ;
52
52
use datafusion_ext_commons:: downcast_any;
53
53
use datafusion_ext_exprs:: {
@@ -116,7 +116,7 @@ fn bind(
116
116
let new_children = expr_in
117
117
. children ( )
118
118
. iter ( )
119
- . map ( |child_expr| bind ( child_expr. clone ( ) , input_schema) )
119
+ . map ( |& child_expr| bind ( child_expr. clone ( ) , input_schema) )
120
120
. collect :: < Result < Vec < _ > , DataFusionError > > ( ) ?;
121
121
Ok ( expr_in. with_new_children ( new_children) ?)
122
122
}
@@ -804,74 +804,75 @@ impl From<&protobuf::BoundReference> for Column {
804
804
}
805
805
}
806
806
807
- impl From < & protobuf:: ScalarFunction > for BuiltinScalarFunction {
808
- fn from ( f : & protobuf:: ScalarFunction ) -> BuiltinScalarFunction {
807
+ impl From < protobuf:: ScalarFunction > for Arc < ScalarUDF > {
808
+ fn from ( f : protobuf:: ScalarFunction ) -> Self {
809
+ use datafusion:: functions as f;
809
810
use protobuf:: ScalarFunction ;
811
+
810
812
match f {
811
- ScalarFunction :: Sqrt => Self :: Sqrt ,
812
- ScalarFunction :: Sin => Self :: Sin ,
813
- ScalarFunction :: Cos => Self :: Cos ,
814
- ScalarFunction :: Tan => Self :: Tan ,
815
- ScalarFunction :: Asin => Self :: Asin ,
816
- ScalarFunction :: Acos => Self :: Acos ,
817
- ScalarFunction :: Atan => Self :: Atan ,
818
- ScalarFunction :: Exp => Self :: Exp ,
819
- ScalarFunction :: Log => Self :: Log ,
820
- ScalarFunction :: Ln => Self :: Ln ,
821
- ScalarFunction :: Log10 => Self :: Log10 ,
822
- ScalarFunction :: Floor => Self :: Floor ,
823
- ScalarFunction :: Ceil => Self :: Ceil ,
824
- ScalarFunction :: Round => Self :: Round ,
825
- ScalarFunction :: Trunc => Self :: Trunc ,
826
- ScalarFunction :: Abs => Self :: Abs ,
827
- ScalarFunction :: OctetLength => Self :: OctetLength ,
828
- ScalarFunction :: Concat => Self :: Concat ,
829
- ScalarFunction :: Lower => Self :: Lower ,
830
- ScalarFunction :: Upper => Self :: Upper ,
831
- ScalarFunction :: Trim => Self :: Trim ,
832
- ScalarFunction :: Ltrim => Self :: Ltrim ,
833
- ScalarFunction :: Rtrim => Self :: Rtrim ,
834
- ScalarFunction :: ToTimestamp => Self :: ToTimestamp ,
835
- ScalarFunction :: Array => Self :: MakeArray ,
836
- // ScalarFunction::NullIf => todo!(),
837
- ScalarFunction :: DatePart => Self :: DatePart ,
838
- ScalarFunction :: DateTrunc => Self :: DateTrunc ,
839
- ScalarFunction :: Md5 => Self :: MD5 ,
840
- ScalarFunction :: Sha224 => Self :: SHA224 ,
841
- ScalarFunction :: Sha256 => Self :: SHA256 ,
842
- ScalarFunction :: Sha384 => Self :: SHA384 ,
843
- ScalarFunction :: Sha512 => Self :: SHA512 ,
844
- ScalarFunction :: Digest => Self :: Digest ,
845
- ScalarFunction :: ToTimestampMillis => Self :: ToTimestampMillis ,
846
- ScalarFunction :: Log2 => Self :: Log2 ,
847
- ScalarFunction :: Signum => Self :: Signum ,
848
- ScalarFunction :: Ascii => Self :: Ascii ,
849
- ScalarFunction :: BitLength => Self :: BitLength ,
850
- ScalarFunction :: Btrim => Self :: Btrim ,
851
- ScalarFunction :: CharacterLength => Self :: CharacterLength ,
852
- ScalarFunction :: Chr => Self :: Chr ,
853
- ScalarFunction :: ConcatWithSeparator => Self :: ConcatWithSeparator ,
854
- ScalarFunction :: InitCap => Self :: InitCap ,
855
- ScalarFunction :: Left => Self :: Left ,
856
- ScalarFunction :: Lpad => Self :: Lpad ,
857
- ScalarFunction :: Random => Self :: Random ,
858
- ScalarFunction :: RegexpReplace => Self :: RegexpReplace ,
859
- ScalarFunction :: Repeat => Self :: Repeat ,
860
- ScalarFunction :: Replace => Self :: Replace ,
861
- ScalarFunction :: Reverse => Self :: Reverse ,
862
- ScalarFunction :: Right => Self :: Right ,
863
- ScalarFunction :: Rpad => Self :: Rpad ,
864
- ScalarFunction :: SplitPart => Self :: SplitPart ,
865
- ScalarFunction :: StartsWith => Self :: StartsWith ,
866
- ScalarFunction :: Strpos => Self :: Strpos ,
867
- ScalarFunction :: Substr => Self :: Substr ,
868
- ScalarFunction :: ToHex => Self :: ToHex ,
869
- ScalarFunction :: ToTimestampMicros => Self :: ToTimestampMicros ,
870
- ScalarFunction :: ToTimestampSeconds => Self :: ToTimestampSeconds ,
871
- ScalarFunction :: Now => Self :: Now ,
872
- ScalarFunction :: Translate => Self :: Translate ,
873
- ScalarFunction :: RegexpMatch => Self :: RegexpMatch ,
874
- ScalarFunction :: Coalesce => Self :: Coalesce ,
813
+ ScalarFunction :: Sqrt => f:: math:: sqrt ( ) ,
814
+ ScalarFunction :: Sin => f:: math:: sin ( ) ,
815
+ ScalarFunction :: Cos => f:: math:: cos ( ) ,
816
+ ScalarFunction :: Tan => f:: math:: tan ( ) ,
817
+ ScalarFunction :: Asin => f:: math:: asin ( ) ,
818
+ ScalarFunction :: Acos => f:: math:: acos ( ) ,
819
+ ScalarFunction :: Atan => f:: math:: atan ( ) ,
820
+ ScalarFunction :: Exp => f:: math:: exp ( ) ,
821
+ ScalarFunction :: Log => f:: math:: log ( ) ,
822
+ ScalarFunction :: Ln => f:: math:: ln ( ) ,
823
+ ScalarFunction :: Log10 => f:: math:: log10 ( ) ,
824
+ ScalarFunction :: Floor => f:: math:: floor ( ) ,
825
+ ScalarFunction :: Ceil => f:: math:: ceil ( ) ,
826
+ ScalarFunction :: Round => f:: math:: round ( ) ,
827
+ ScalarFunction :: Trunc => f:: math:: trunc ( ) ,
828
+ ScalarFunction :: Abs => f:: math:: abs ( ) ,
829
+ ScalarFunction :: OctetLength => f:: string:: octet_length ( ) ,
830
+ ScalarFunction :: Concat => f:: string:: concat ( ) ,
831
+ ScalarFunction :: Lower => f:: string:: lower ( ) ,
832
+ ScalarFunction :: Upper => f:: string:: upper ( ) ,
833
+ ScalarFunction :: Trim => f:: string:: btrim ( ) ,
834
+ ScalarFunction :: Ltrim => f:: string:: ltrim ( ) ,
835
+ ScalarFunction :: Rtrim => f:: string:: rtrim ( ) ,
836
+ ScalarFunction :: ToTimestamp => f:: datetime:: to_timestamp ( ) ,
837
+ ScalarFunction :: NullIf => f:: core:: nullif ( ) ,
838
+ ScalarFunction :: DatePart => f:: datetime:: date_part ( ) ,
839
+ ScalarFunction :: DateTrunc => f:: datetime:: date_trunc ( ) ,
840
+ ScalarFunction :: Md5 => f:: crypto:: md5 ( ) ,
841
+ ScalarFunction :: Sha224 => f:: crypto:: sha224 ( ) ,
842
+ ScalarFunction :: Sha256 => f:: crypto:: sha256 ( ) ,
843
+ ScalarFunction :: Sha384 => f:: crypto:: sha384 ( ) ,
844
+ ScalarFunction :: Sha512 => f:: crypto:: sha512 ( ) ,
845
+ ScalarFunction :: Digest => f:: crypto:: digest ( ) ,
846
+ ScalarFunction :: ToTimestampMillis => f:: datetime:: to_timestamp_millis ( ) ,
847
+ ScalarFunction :: Log2 => f:: math:: log2 ( ) ,
848
+ ScalarFunction :: Signum => f:: math:: signum ( ) ,
849
+ ScalarFunction :: Ascii => f:: string:: ascii ( ) ,
850
+ ScalarFunction :: BitLength => f:: string:: bit_length ( ) ,
851
+ ScalarFunction :: Btrim => f:: string:: btrim ( ) ,
852
+ ScalarFunction :: CharacterLength => f:: unicode:: character_length ( ) ,
853
+ ScalarFunction :: Chr => f:: string:: chr ( ) ,
854
+ ScalarFunction :: ConcatWithSeparator => f:: string:: concat_ws ( ) ,
855
+ ScalarFunction :: InitCap => f:: string:: initcap ( ) ,
856
+ ScalarFunction :: Left => f:: unicode:: left ( ) ,
857
+ ScalarFunction :: Lpad => f:: unicode:: lpad ( ) ,
858
+ ScalarFunction :: Random => f:: math:: random ( ) ,
859
+ ScalarFunction :: RegexpReplace => f:: regex:: regexp_replace ( ) ,
860
+ ScalarFunction :: Repeat => f:: string:: repeat ( ) ,
861
+ ScalarFunction :: Replace => f:: string:: replace ( ) ,
862
+ ScalarFunction :: Reverse => f:: unicode:: reverse ( ) ,
863
+ ScalarFunction :: Right => f:: unicode:: right ( ) ,
864
+ ScalarFunction :: Rpad => f:: unicode:: rpad ( ) ,
865
+ ScalarFunction :: SplitPart => f:: string:: split_part ( ) ,
866
+ ScalarFunction :: StartsWith => f:: string:: starts_with ( ) ,
867
+ ScalarFunction :: Strpos => f:: unicode:: strpos ( ) ,
868
+ ScalarFunction :: Substr => f:: unicode:: substr ( ) ,
869
+ ScalarFunction :: ToHex => f:: string:: to_hex ( ) ,
870
+ ScalarFunction :: ToTimestampMicros => f:: datetime:: to_timestamp_micros ( ) ,
871
+ ScalarFunction :: ToTimestampSeconds => f:: datetime:: to_timestamp_seconds ( ) ,
872
+ ScalarFunction :: Now => f:: datetime:: now ( ) ,
873
+ ScalarFunction :: Translate => f:: unicode:: translate ( ) ,
874
+ ScalarFunction :: RegexpMatch => f:: regex:: regexp_match ( ) ,
875
+ ScalarFunction :: Coalesce => f:: core:: coalesce ( ) ,
875
876
ScalarFunction :: SparkExtFunctions => {
876
877
unreachable ! ( )
877
878
}
@@ -998,20 +999,26 @@ fn try_parse_physical_expr(
998
999
. map ( |x| try_parse_physical_expr ( x, input_schema) )
999
1000
. collect :: < Result < Vec < _ > , _ > > ( ) ?;
1000
1001
1001
- let execution_props = ExecutionProps :: new ( ) ;
1002
- let fun_expr = if scalar_function == protobuf:: ScalarFunction :: SparkExtFunctions {
1003
- datafusion_ext_functions:: create_spark_ext_function ( & e. name ) ?
1002
+ let scalar_udf = if scalar_function == protobuf:: ScalarFunction :: SparkExtFunctions {
1003
+ let fun = datafusion_ext_functions:: create_spark_ext_function ( & e. name ) ?;
1004
+ Arc :: new ( create_udf (
1005
+ "spark_ext_function" ,
1006
+ args. iter ( )
1007
+ . map ( |e| e. data_type ( input_schema) )
1008
+ . collect :: < Result < Vec < _ > , _ > > ( ) ?,
1009
+ Arc :: new ( convert_required ! ( e. return_type) ?) ,
1010
+ Volatility :: Volatile ,
1011
+ fun,
1012
+ ) )
1004
1013
} else {
1005
- functions:: create_physical_fun ( & ( & scalar_function) . into ( ) , & execution_props) ?
1014
+ let scalar_udf: Arc < ScalarUDF > = scalar_function. into ( ) ;
1015
+ scalar_udf
1006
1016
} ;
1007
-
1008
1017
Arc :: new ( ScalarFunctionExpr :: new (
1009
- & e . name ,
1010
- fun_expr ,
1018
+ scalar_udf . name ( ) ,
1019
+ scalar_udf . clone ( ) ,
1011
1020
args,
1012
1021
convert_required ! ( e. return_type) ?,
1013
- None ,
1014
- false ,
1015
1022
) )
1016
1023
}
1017
1024
ExprType :: SparkUdfWrapperExpr ( e) => Arc :: new ( SparkUDFWrapperExpr :: try_new (
@@ -1153,6 +1160,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
1153
1160
. map ( |v| v. try_into ( ) )
1154
1161
. collect :: < Result < Vec < _ > , _ > > ( ) ?,
1155
1162
range : val. range . as_ref ( ) . map ( |v| v. try_into ( ) ) . transpose ( ) ?,
1163
+ statistics : None ,
1156
1164
extensions : None ,
1157
1165
} )
1158
1166
}
0 commit comments