diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 9e192b0be0f1..7ddc0af4306c 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1373,6 +1373,7 @@ dependencies = [ "arrow", "arrow-array", "arrow-buffer", + "arrow-ord", "arrow-schema", "async-trait", "chrono", diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c25523c5ae33..f5e937bb56a0 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1168,12 +1168,13 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch))) } - LogicalPlan::Unnest(Unnest { input, column, schema, options }) => { + LogicalPlan::Unnest(Unnest { input, columns, schema, options }) => { let input = self.create_initial_plan(input, session_state).await?; - let column_exec = schema.index_of_column(column) - .map(|idx| Column::new(&column.name, idx))?; + let column_execs = columns.iter().map(|column| { + schema.index_of_column(column).map(|idx| Column::new(&column.name, idx)) + }).collect::>()?; let schema = SchemaRef::new(schema.as_ref().to_owned().into()); - Ok(Arc::new(UnnestExec::new(input, column_exec, schema, options.clone()))) + Ok(Arc::new(UnnestExec::new(input, column_execs, schema, options.clone()))) } LogicalPlan::Ddl(ddl) => { // There is no default plan for DDl statements -- diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c7c50d871902..cffb58dadd8e 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -186,7 +186,16 @@ pub enum Expr { #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct Unnest { - pub exprs: Vec, + pub expr: Box, +} + +impl Unnest { + /// Create a new Unnest expression. + pub fn new(expr: Expr) -> Self { + Self { + expr: Box::new(expr), + } + } } /// Alias expression @@ -1567,8 +1576,8 @@ impl fmt::Display for Expr { } }, Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"), - Expr::Unnest(Unnest { exprs }) => { - write!(f, "UNNEST({exprs:?})") + Expr::Unnest(Unnest { expr }) => { + write!(f, "UNNEST({expr:?})") } } } @@ -1757,7 +1766,10 @@ fn create_name(e: &Expr) -> Result { } } } - Expr::Unnest(Unnest { exprs }) => create_function_name("unnest", false, exprs), + Expr::Unnest(Unnest { expr }) => { + let expr_name = create_name(expr)?; + Ok(format!("unnest({expr_name})")) + } Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args), Expr::WindowFunction(WindowFunction { fun, diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index d678fe7ee39c..c11619fc0ea2 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -82,13 +82,13 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( using_columns: &[HashSet], ) -> Result { // Normalize column inside Unnest - if let Expr::Unnest(Unnest { exprs }) = expr { + if let Expr::Unnest(Unnest { expr }) = expr { let e = normalize_col_with_schemas_and_ambiguity_check( - exprs[0].clone(), + expr.as_ref().clone(), schemas, using_columns, )?; - return Ok(Expr::Unnest(Unnest { exprs: vec![e] })); + return Ok(Expr::Unnest(Unnest { expr: Box::new(e) })); } expr.transform(&|expr| { diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 39892d9e0c0d..466fd13ce207 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -115,12 +115,8 @@ impl ExprSchemable for Expr { Expr::Case(case) => case.when_then_expr[0].1.get_type(schema), Expr::Cast(Cast { data_type, .. }) | Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()), - Expr::Unnest(Unnest { exprs }) => { - let arg_data_types = exprs - .iter() - .map(|e| e.get_type(schema)) - .collect::>>()?; - let arg_data_type = arg_data_types[0].clone(); + Expr::Unnest(Unnest { expr }) => { + let arg_data_type = expr.get_type(schema)?; // Unnest's output type is the inner type of the list match arg_data_type{ DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) =>{ diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index beac5a7f4eb7..f7c0fbac537b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1112,7 +1112,7 @@ impl LogicalPlanBuilder { /// Unnest the given column. pub fn unnest_column(self, column: impl Into) -> Result { - Ok(Self::from(unnest(self.plan, column.into())?)) + Ok(Self::from(unnest(self.plan, vec![column.into()])?)) } /// Unnest the given column given [`UnnestOptions`] @@ -1123,10 +1123,21 @@ impl LogicalPlanBuilder { ) -> Result { Ok(Self::from(unnest_with_options( self.plan, - column.into(), + vec![column.into()], options, )?)) } + + /// Unnest the given columns with the given [`UnnestOptions`] + pub fn unnest_columns_with_options( + self, + columns: Vec, + options: UnnestOptions, + ) -> Result { + Ok(Self::from(unnest_with_options( + self.plan, columns, options, + )?)) + } } pub fn change_redundant_column(fields: &Fields) -> Vec { let mut name_map = HashMap::new(); @@ -1534,44 +1545,50 @@ impl TableSource for LogicalTableSource { } /// Create a [`LogicalPlan::Unnest`] plan -pub fn unnest(input: LogicalPlan, column: Column) -> Result { - unnest_with_options(input, column, UnnestOptions::new()) +pub fn unnest(input: LogicalPlan, columns: Vec) -> Result { + unnest_with_options(input, columns, UnnestOptions::new()) } /// Create a [`LogicalPlan::Unnest`] plan with options pub fn unnest_with_options( input: LogicalPlan, - column: Column, + columns: Vec, options: UnnestOptions, ) -> Result { - let (unnest_qualifier, unnest_field) = - input.schema().qualified_field_from_column(&column)?; - // Extract the type of the nested field in the list. - let unnested_field = match unnest_field.data_type() { - DataType::List(field) - | DataType::FixedSizeList(field, _) - | DataType::LargeList(field) => Arc::new(Field::new( - unnest_field.name(), - field.data_type().clone(), - unnest_field.is_nullable(), - )), - _ => { - // If the unnest field is not a list type return the input plan. - return Ok(input); - } - }; + let mut unnested_fields: HashMap = HashMap::with_capacity(columns.len()); + // Add qualifiers to the columns. + let mut qualified_columns = Vec::with_capacity(columns.len()); + for c in &columns { + let index = input.schema().index_of_column(c)?; + let (unnest_qualifier, unnest_field) = input.schema().qualified_field(index); + let unnested_field = match unnest_field.data_type() { + DataType::List(field) + | DataType::FixedSizeList(field, _) + | DataType::LargeList(field) => Arc::new(Field::new( + unnest_field.name(), + field.data_type().clone(), + // Unnesting may produce NULLs even if the list is not null. + // For example: unnset([1], []) -> 1, null + true, + )), + _ => { + // If the unnest field is not a list type return the input plan. + return Ok(input); + } + }; + qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref()))); + unnested_fields.insert(index, unnested_field); + } - // Update the schema with the unnest column type changed to contain the nested type. + // Update the schema with the unnest column types changed to contain the nested types. let input_schema = input.schema(); let fields = input_schema .iter() - .map(|(q, f)| { - if f.as_ref() == unnest_field && q == unnest_qualifier { - (unnest_qualifier.cloned(), unnested_field.clone()) - } else { - (q.cloned(), f.clone()) - } + .enumerate() + .map(|(index, (q, f))| match unnested_fields.get(&index) { + Some(unnested_field) => (q.cloned(), unnested_field.clone()), + None => (q.cloned(), f.clone()), }) .collect::>(); @@ -1580,11 +1597,9 @@ pub fn unnest_with_options( // We can use the existing functional dependencies: let deps = input_schema.functional_dependencies().clone(); let schema = Arc::new(df_schema.with_functional_dependencies(deps)?); - let column = Column::from((unnest_qualifier, unnested_field.as_ref())); - Ok(LogicalPlan::Unnest(Unnest { input: Arc::new(input), - column, + columns: qualified_columns, schema, options, })) diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index edc3afd55d63..3a2ed9ffc2d8 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -638,10 +638,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Node Type": "DescribeTable" }) } - LogicalPlan::Unnest(Unnest { column, .. }) => { + LogicalPlan::Unnest(Unnest { columns, .. }) => { json!({ "Node Type": "Unnest", - "Column": format!("{}", column) + "Column": expr_vec_fmt!(columns), }) } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 91c8670f3805..dbff5046013b 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use super::dml::CopyTo; use super::DdlStatement; -use crate::builder::change_redundant_column; +use crate::builder::{change_redundant_column, unnest_with_options}; use crate::expr::{Alias, Placeholder, Sort as SortExpr, WindowFunction}; use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols}; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; @@ -807,51 +807,11 @@ impl LogicalPlan { } LogicalPlan::DescribeTable(_) => Ok(self.clone()), LogicalPlan::Unnest(Unnest { - column, - schema, - options, - .. + columns, options, .. }) => { // Update schema with unnested column type. - let input = Arc::new(inputs.swap_remove(0)); - let (nested_qualifier, nested_field) = - input.schema().qualified_field_from_column(column)?; - let (unnested_qualifier, unnested_field) = - schema.qualified_field_from_column(column)?; - let qualifiers_and_fields = input - .schema() - .iter() - .map(|(qualifier, field)| { - if qualifier.eq(&nested_qualifier) - && field.as_ref() == nested_field - { - ( - unnested_qualifier.cloned(), - Arc::new(unnested_field.clone()), - ) - } else { - (qualifier.cloned(), field.clone()) - } - }) - .collect::>(); - - let schema = Arc::new( - DFSchema::new_with_metadata( - qualifiers_and_fields, - input.schema().metadata().clone(), - )? - // We can use the existing functional dependencies as is: - .with_functional_dependencies( - input.schema().functional_dependencies().clone(), - )?, - ); - - Ok(LogicalPlan::Unnest(Unnest { - input, - column: column.clone(), - schema, - options: options.clone(), - })) + let input = inputs.swap_remove(0); + unnest_with_options(input, columns.clone(), options.clone()) } } } @@ -1581,8 +1541,8 @@ impl LogicalPlan { LogicalPlan::DescribeTable(DescribeTable { .. }) => { write!(f, "DescribeTable") } - LogicalPlan::Unnest(Unnest { column, .. }) => { - write!(f, "Unnest: {column}") + LogicalPlan::Unnest(Unnest { columns, .. }) => { + write!(f, "Unnest: {}", expr_vec_fmt!(columns)) } } } @@ -2556,8 +2516,8 @@ pub enum Partitioning { pub struct Unnest { /// The incoming logical plan pub input: Arc, - /// The column to unnest - pub column: Column, + /// The columns to unnest + pub columns: Vec, /// The output schema, containing the unnested field column. pub schema: DFSchemaRef, /// Options diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 3644f89e8b42..48f047c070dd 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -311,13 +311,13 @@ impl TreeNode for LogicalPlan { } LogicalPlan::Unnest(Unnest { input, - column, + columns, schema, options, }) => rewrite_arc(input, f)?.update_data(|input| { LogicalPlan::Unnest(Unnest { input, - column, + columns, schema, options, }) @@ -507,8 +507,12 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScan { filters, .. }) => { filters.iter().apply_until_stop(f) } - LogicalPlan::Unnest(Unnest { column, .. }) => { - f(&Expr::Column(column.clone())) + LogicalPlan::Unnest(Unnest { columns, .. }) => { + let exprs = columns + .iter() + .map(|c| Expr::Column(c.clone())) + .collect::>(); + exprs.iter().apply_until_stop(f) } LogicalPlan::Distinct(Distinct::On(DistinctOn { on_expr, @@ -706,20 +710,6 @@ impl LogicalPlan { fetch, }) }), - LogicalPlan::Unnest(Unnest { - input, - column, - schema, - options, - }) => f(Expr::Column(column))?.map_data(|column| match column { - Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest { - input, - column, - schema, - options, - })), - _ => internal_err!("Transformation should return Column"), - })?, LogicalPlan::Distinct(Distinct::On(DistinctOn { on_expr, select_expr, @@ -744,6 +734,7 @@ impl LogicalPlan { }), // plans without expressions LogicalPlan::EmptyRelation(_) + | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index 85097f6249e1..35fec509c95a 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -36,6 +36,7 @@ impl TreeNode for Expr { ) -> Result { let children = match self { Expr::Alias(Alias{expr,..}) + | Expr::Unnest(Unnest{expr}) | Expr::Not(expr) | Expr::IsNotNull(expr) | Expr::IsTrue(expr) @@ -60,7 +61,6 @@ impl TreeNode for Expr { GetFieldAccess::NamedStructField { .. } => vec![expr], } } - Expr::Unnest(Unnest { exprs }) | Expr::GroupingSet(GroupingSet::Rollup(exprs)) | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(), Expr::ScalarFunction (ScalarFunction{ args, .. } ) => { diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 6a78bd596a46..6863f2646000 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -39,6 +39,7 @@ ahash = { version = "0.8", default-features = false, features = [ arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-ord = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 6ea1b3c40c83..45b848112ba9 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Defines the unnest column plan for unnesting values in a column that contains a list -//! type, conceptually is like joining each row with all the values in the list column. +//! Define a plan for unnesting values in columns that contain a list type. + +use std::collections::HashMap; use std::{any::Any, sync::Arc}; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; @@ -27,15 +28,17 @@ use crate::{ }; use arrow::array::{ - Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, GenericListArray, - LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, -}; -use arrow::compute::kernels; -use arrow::datatypes::{ - ArrowNativeType, DataType, Int32Type, Int64Type, Schema, SchemaRef, + Array, ArrayRef, AsArray, FixedSizeListArray, LargeListArray, ListArray, + PrimitiveArray, }; +use arrow::compute::kernels::length::length; +use arrow::compute::kernels::zip::zip; +use arrow::compute::{cast, is_not_null, kernels, sum}; +use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::{exec_err, Result, UnnestOptions}; +use arrow_array::{Int64Array, Scalar}; +use arrow_ord::cmp::lt; +use datafusion_common::{exec_datafusion_err, exec_err, Result, UnnestOptions}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; @@ -43,7 +46,7 @@ use async_trait::async_trait; use futures::{Stream, StreamExt}; use log::trace; -/// Unnest the given column by joining the row with each value in the +/// Unnest the given columns by joining the row with each value in the /// nested type. /// /// See [`UnnestOptions`] for more details and an example. @@ -53,8 +56,8 @@ pub struct UnnestExec { input: Arc, /// The schema once the unnest is applied schema: SchemaRef, - /// The unnest column - column: Column, + /// The unnest columns + columns: Vec, /// Options options: UnnestOptions, /// Execution metrics @@ -67,7 +70,7 @@ impl UnnestExec { /// Create a new [UnnestExec]. pub fn new( input: Arc, - column: Column, + columns: Vec, schema: SchemaRef, options: UnnestOptions, ) -> Self { @@ -75,7 +78,7 @@ impl UnnestExec { UnnestExec { input, schema, - column, + columns, options, metrics: Default::default(), cache, @@ -134,7 +137,7 @@ impl ExecutionPlan for UnnestExec { ) -> Result> { Ok(Arc::new(UnnestExec::new( children[0].clone(), - self.column.clone(), + self.columns.clone(), self.schema.clone(), self.options.clone(), ))) @@ -155,7 +158,7 @@ impl ExecutionPlan for UnnestExec { Ok(Box::pin(UnnestStream { input, schema: self.schema.clone(), - column: self.column.clone(), + columns: self.columns.clone(), options: self.options.clone(), metrics, })) @@ -210,8 +213,8 @@ struct UnnestStream { input: SendableRecordBatchStream, /// Unnested schema schema: Arc, - /// The unnest column - column: Column, + /// The unnest columns + columns: Vec, /// Options options: UnnestOptions, /// Metrics @@ -249,7 +252,7 @@ impl UnnestStream { Some(Ok(batch)) => { let timer = self.metrics.elapsed_compute.timer(); let result = - build_batch(&batch, &self.schema, &self.column, &self.options); + build_batch(&batch, &self.schema, &self.columns, &self.options); self.metrics.input_batches.add(1); self.metrics.input_rows.add(batch.num_rows()); if let Ok(ref batch) = result { @@ -276,270 +279,265 @@ impl UnnestStream { } } +/// For each row in a `RecordBatch`, some list columns need to be unnested. +/// We will expand the values in each list into multiple rows, +/// taking the longest length among these lists, and shorter lists are padded with NULLs. +// +/// For columns that don't need to be unnested, repeat their values until reaching the longest length. fn build_batch( batch: &RecordBatch, schema: &SchemaRef, - column: &Column, + columns: &[Column], options: &UnnestOptions, ) -> Result { - let list_array = column.evaluate(batch)?.into_array(batch.num_rows())?; - match list_array.data_type() { - DataType::List(_) => { - let list_array = list_array.as_any().downcast_ref::().unwrap(); - build_batch_generic_list::( - batch, - schema, - column.index(), - list_array, - options, - ) - } - DataType::LargeList(_) => { - let list_array = list_array - .as_any() - .downcast_ref::() - .unwrap(); - build_batch_generic_list::( - batch, - schema, - column.index(), - list_array, - options, - ) - } - DataType::FixedSizeList(_, _) => { - let list_array = list_array - .as_any() - .downcast_ref::() - .unwrap(); - build_batch_fixedsize_list(batch, schema, column.index(), list_array, options) - } - _ => exec_err!("Invalid unnest column {column}"), + let list_arrays: Vec = columns + .iter() + .map(|column| column.evaluate(batch)?.into_array(batch.num_rows())) + .collect::>()?; + + let longest_length = find_longest_length(&list_arrays, options)?; + let unnested_length = longest_length.as_primitive::(); + let total_length = if unnested_length.is_empty() { + 0 + } else { + sum(unnested_length).ok_or_else(|| { + exec_datafusion_err!("Failed to calculate the total unnested length") + })? as usize + }; + if total_length == 0 { + return Ok(RecordBatch::new_empty(schema.clone())); } -} -fn build_batch_generic_list>( - batch: &RecordBatch, - schema: &SchemaRef, - unnest_column_idx: usize, - list_array: &GenericListArray, - options: &UnnestOptions, -) -> Result { - let unnested_array = unnest_generic_list::(list_array, options)?; - - let take_indicies = - create_take_indicies_generic::(list_array, unnested_array.len(), options); - - batch_from_indices( - batch, - schema, - unnest_column_idx, - &unnested_array, - &take_indicies, - ) + // Unnest all the list arrays + let unnested_arrays = + unnest_list_arrays(&list_arrays, unnested_length, total_length)?; + let unnested_array_map: HashMap<_, _> = unnested_arrays + .into_iter() + .zip(columns.iter()) + .map(|(array, column)| (column.index(), array)) + .collect(); + + // Create the take indices array for other columns + let take_indicies = create_take_indicies(unnested_length, total_length); + + batch_from_indices(batch, schema, &unnested_array_map, &take_indicies) } -/// Given this `GenericList` list_array: +/// Find the longest list length among the given list arrays for each row. +/// +/// For example if we have the following two list arrays: /// /// ```ignore -/// [1], null, [2, 3, 4], null, [5, 6] +/// l1: [1, 2, 3], null, [], [3] +/// l2: [4,5], [], null, [6, 7] /// ``` -/// Its values array is represented like this: +/// +/// If `preserve_nulls` is false, the longest length array will be: /// /// ```ignore -/// [1, 2, 3, 4, 5, 6] +/// longest_length: [3, 0, 0, 2] /// ``` /// -/// So if there are no null values or `UnnestOptions.preserve_nulls` is false -/// we can return the values array without any copying. +/// whereas if `preserve_nulls` is true, the longest length array will be: /// -/// Otherwise we'll transfrom the values array using the take kernel and the following take indicies: /// /// ```ignore -/// 0, null, 1, 2, 3, null, 4, 5 +/// longest_length: [3, 1, 1, 2] /// ``` /// -fn unnest_generic_list>( - list_array: &GenericListArray, +fn find_longest_length( + list_arrays: &[ArrayRef], options: &UnnestOptions, -) -> Result> { - let values = list_array.values(); - if list_array.null_count() == 0 { - return Ok(values.clone()); +) -> Result { + // The length of a NULL list + let null_length = if options.preserve_nulls { + Scalar::new(Int64Array::from_value(1, 1)) + } else { + Scalar::new(Int64Array::from_value(0, 1)) + }; + let list_lengths: Vec = list_arrays + .iter() + .map(|list_array| { + let mut length_array = length(list_array)?; + // Make sure length arrays have the same type. Int64 is the most general one. + length_array = cast(&length_array, &DataType::Int64)?; + length_array = + zip(&is_not_null(&length_array)?, &length_array, &null_length)?; + Ok(length_array) + }) + .collect::>()?; + + let longest_length = list_lengths.iter().skip(1).try_fold( + list_lengths[0].clone(), + |longest, current| { + let is_lt = lt(&longest, ¤t)?; + zip(&is_lt, ¤t, &longest) + }, + )?; + Ok(longest_length) +} + +/// Trait defining common methods used for unnesting, implemented by list array types. +trait ListArrayType: Array { + /// Returns a reference to the values of this list. + fn values(&self) -> &ArrayRef; + + /// Returns the start and end offset of the values for the given row. + fn value_offsets(&self, row: usize) -> (i64, i64); +} + +impl ListArrayType for ListArray { + fn values(&self) -> &ArrayRef { + self.values() } - let mut take_indicies_builder = - PrimitiveArray::

::builder(values.len() + list_array.null_count()); - let offsets = list_array.value_offsets(); - for row in 0..list_array.len() { - if list_array.is_null(row) { - if options.preserve_nulls { - take_indicies_builder.append_null(); - } - } else { - let start = offsets[row].as_usize(); - let end = offsets[row + 1].as_usize(); - for idx in start..end { - take_indicies_builder.append_value(P::Native::from_usize(idx).unwrap()); - } - } + fn value_offsets(&self, row: usize) -> (i64, i64) { + let offsets = self.value_offsets(); + (offsets[row].into(), offsets[row + 1].into()) } - Ok(kernels::take::take( - &values, - &take_indicies_builder.finish(), - None, - )?) } -fn build_batch_fixedsize_list( - batch: &RecordBatch, - schema: &SchemaRef, - unnest_column_idx: usize, - list_array: &FixedSizeListArray, - options: &UnnestOptions, -) -> Result { - let unnested_array = unnest_fixed_list(list_array, options)?; - - let take_indicies = - create_take_indicies_fixed(list_array, unnested_array.len(), options); - - batch_from_indices( - batch, - schema, - unnest_column_idx, - &unnested_array, - &take_indicies, - ) +impl ListArrayType for LargeListArray { + fn values(&self) -> &ArrayRef { + self.values() + } + + fn value_offsets(&self, row: usize) -> (i64, i64) { + let offsets = self.value_offsets(); + (offsets[row], offsets[row + 1]) + } } -/// Given this `FixedSizeListArray` list_array: +impl ListArrayType for FixedSizeListArray { + fn values(&self) -> &ArrayRef { + self.values() + } + + fn value_offsets(&self, row: usize) -> (i64, i64) { + let start = self.value_offset(row) as i64; + (start, start + self.value_length() as i64) + } +} + +/// Unnest multiple list arrays according to the length array. +fn unnest_list_arrays( + list_arrays: &[ArrayRef], + length_array: &PrimitiveArray, + capacity: usize, +) -> Result> { + let typed_arrays = list_arrays + .iter() + .map(|list_array| match list_array.data_type() { + DataType::List(_) => Ok(list_array.as_list::() as &dyn ListArrayType), + DataType::LargeList(_) => { + Ok(list_array.as_list::() as &dyn ListArrayType) + } + DataType::FixedSizeList(_, _) => { + Ok(list_array.as_fixed_size_list() as &dyn ListArrayType) + } + other => exec_err!("Invalid unnest datatype {other }"), + }) + .collect::>>()?; + + // If there is only one list column to unnest and it doesn't contain any NULL lists, + // we can return the values array directly without any copying. + if typed_arrays.len() == 1 && typed_arrays[0].null_count() == 0 { + Ok(vec![typed_arrays[0].values().clone()]) + } else { + typed_arrays + .iter() + .map(|list_array| unnest_list_array(*list_array, length_array, capacity)) + .collect::>() + } +} + +/// Unnest a list array according the target length array. /// -/// ```ignore -/// [1, 2], null, [3, 4], null, [5, 6] -/// ``` -/// Its values array is represented like this: +/// Consider a list array like this: /// /// ```ignore -/// [1, 2, null, null 3, 4, null, null, 5, 6] +/// [1], [2, 3, 4], null, [5], [], /// ``` /// -/// So if there are no null values -/// we can return the values array without any copying. -/// -/// Otherwise we'll transfrom the values array using the take kernel. -/// -/// If `UnnestOptions.preserve_nulls` is true the take indicies will look like this: +/// and the length array is: /// /// ```ignore -/// 0, 1, null, 4, 5, null, 8, 9 +/// [2, 3, 2, 1, 2] /// ``` -/// Otherwise we drop the nulls and take indicies will look like this: +/// +/// If the length of a certain list is less than the target length, pad with NULLs. +/// So the unnested array will look like this: /// /// ```ignore -/// 0, 1, 4, 5, 8, 9 +/// [1, null, 2, 3, 4, null, null, 5, null, null] /// ``` /// -fn unnest_fixed_list( - list_array: &FixedSizeListArray, - options: &UnnestOptions, -) -> Result> { +fn unnest_list_array( + list_array: &dyn ListArrayType, + length_array: &PrimitiveArray, + capacity: usize, +) -> Result { let values = list_array.values(); - - if list_array.null_count() == 0 { - Ok(values.clone()) - } else { - let len_without_nulls = - values.len() - list_array.null_count() * list_array.value_length() as usize; - let null_count = if options.preserve_nulls { - list_array.null_count() - } else { - 0 - }; - let mut builder = - PrimitiveArray::::builder(len_without_nulls + null_count); - let mut take_offset = 0; - let fixed_value_length = list_array.value_length() as usize; - list_array.iter().for_each(|elem| match elem { - Some(_) => { - for i in 0..fixed_value_length { - //take_offset + i is always positive - let take_index = take_offset + i; - builder.append_value(take_index as i32); - } - take_offset += fixed_value_length; - } - None => { - if options.preserve_nulls { - builder.append_null(); - } - take_offset += fixed_value_length; + let mut take_indicies_builder = PrimitiveArray::::builder(capacity); + for row in 0..list_array.len() { + let mut value_length = 0; + if !list_array.is_null(row) { + let (start, end) = list_array.value_offsets(row); + value_length = end - start; + for i in start..end { + take_indicies_builder.append_value(i) } - }); - Ok(kernels::take::take(&values, &builder.finish(), None)?) + } + let target_length = length_array.value(row); + debug_assert!( + value_length <= target_length, + "value length is beyond the longest length" + ); + // Pad with NULL values + for _ in value_length..target_length { + take_indicies_builder.append_null(); + } } + Ok(kernels::take::take( + &values, + &take_indicies_builder.finish(), + None, + )?) } -/// Creates take indicies to be used to expand all other column's data. -/// Every column value needs to be repeated as many times as many elements there is in each corresponding array value. +/// Creates take indicies that will be used to expand all columns except for the unnest [`columns`](UnnestExec::columns). +/// Every column value needs to be repeated multiple times according to the length array. /// -/// If the column being unnested looks like this: +/// If the length array looks like this: /// /// ```ignore -/// [1], null, [2, 3, 4], null, [5, 6] +/// [2, 3, 1] /// ``` -/// Then `create_take_indicies_generic` will return an array like this +/// Then `create_take_indicies` will return an array like this /// /// ```ignore -/// [1, null, 2, 2, 2, null, 4, 4] +/// [0, 0, 1, 1, 1, 2] /// ``` /// -fn create_take_indicies_generic>( - list_array: &GenericListArray, +fn create_take_indicies( + length_array: &PrimitiveArray, capacity: usize, - options: &UnnestOptions, -) -> PrimitiveArray

{ - let mut builder = PrimitiveArray::

::builder(capacity); - let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 }; - - for row in 0..list_array.len() { - let repeat = if list_array.is_null(row) { - null_repeat - } else { - list_array.value(row).len() - }; - - // `index` is a positive interger. - let index = P::Native::from_usize(row).unwrap(); - (0..repeat).for_each(|_| builder.append_value(index)); +) -> PrimitiveArray { + // `find_longest_length()` guarantees this. + debug_assert!( + length_array.null_count() == 0, + "length array should not contain nulls" + ); + let mut builder = PrimitiveArray::::builder(capacity); + for (index, repeat) in length_array.iter().enumerate() { + // The length array should not contain nulls, so unwrap is safe + let repeat = repeat.unwrap(); + (0..repeat).for_each(|_| builder.append_value(index as i64)); } - builder.finish() } -fn create_take_indicies_fixed( - list_array: &FixedSizeListArray, - capacity: usize, - options: &UnnestOptions, -) -> PrimitiveArray { - let mut builder = PrimitiveArray::::builder(capacity); - let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 }; - - for row in 0..list_array.len() { - let repeat = if list_array.is_null(row) { - null_repeat - } else { - list_array.value_length() as usize - }; - - // `index` is a positive interger. - let index = ::Native::from_usize(row).unwrap(); - (0..repeat).for_each(|_| builder.append_value(index)); - } - - builder.finish() -} - -/// Create the final batch given the unnested column array and a `indices` array +/// Create the final batch given the unnested column arrays and a `indices` array /// that is used by the take kernel to copy values. /// /// For example if we have the following `RecordBatch`: @@ -549,8 +547,8 @@ fn create_take_indicies_fixed( /// c2: 'a', 'b', 'c', null, 'd' /// ``` /// -/// then the `unnested_array` contains the unnest column that will replace `c1` in -/// the final batch: +/// then the `unnested_list_arrays` contains the unnest column that will replace `c1` in +/// the final batch if `preserve_nulls` is true: /// /// ```ignore /// c1: 1, null, 2, 3, 4, null, 5, 6 @@ -570,26 +568,19 @@ fn create_take_indicies_fixed( /// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd' /// ``` /// -fn batch_from_indices( +fn batch_from_indices( batch: &RecordBatch, schema: &SchemaRef, - unnest_column_idx: usize, - unnested_array: &ArrayRef, - indices: &PrimitiveArray, -) -> Result -where - T: ArrowPrimitiveType, -{ + unnested_list_arrays: &HashMap, + indices: &PrimitiveArray, +) -> Result { let arrays = batch .columns() .iter() .enumerate() - .map(|(col_idx, arr)| { - if col_idx == unnest_column_idx { - Ok(unnested_array.clone()) - } else { - Ok(kernels::take::take(&arr, indices, None)?) - } + .map(|(col_idx, arr)| match unnested_list_arrays.get(&col_idx) { + Some(unnested_array) => Ok(unnested_array.clone()), + None => Ok(kernels::take::take(arr, indices, None)?), }) .collect::>>()?; @@ -599,51 +590,51 @@ where #[cfg(test)] mod tests { use super::*; - use arrow::{ - array::AsArray, - datatypes::{DataType, Field}, - }; - use arrow_array::StringArray; + use arrow::datatypes::{DataType, Field}; + use arrow_array::{GenericListArray, OffsetSizeTrait, StringArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; - // Create a ListArray with the following list values: + // Create a GenericListArray with the following list values: // [A, B, C], [], NULL, [D], NULL, [NULL, F] - fn make_test_array() -> ListArray { + fn make_generic_array() -> GenericListArray + where + OffsetSize: OffsetSizeTrait, + { let mut values = vec![]; - let mut offsets = vec![0]; - let mut valid = BooleanBufferBuilder::new(2); + let mut offsets: Vec = vec![OffsetSize::zero()]; + let mut valid = BooleanBufferBuilder::new(6); // [A, B, C] values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]); - offsets.push(values.len() as i32); + offsets.push(OffsetSize::from_usize(values.len()).unwrap()); valid.append(true); // [] - offsets.push(values.len() as i32); + offsets.push(OffsetSize::from_usize(values.len()).unwrap()); valid.append(true); // NULL with non-zero value length // Issue https://github.com/apache/arrow-datafusion/issues/9932 values.push(Some("?")); - offsets.push(values.len() as i32); + offsets.push(OffsetSize::from_usize(values.len()).unwrap()); valid.append(false); // [D] values.push(Some("D")); - offsets.push(values.len() as i32); + offsets.push(OffsetSize::from_usize(values.len()).unwrap()); valid.append(true); // Another NULL with zero value length - offsets.push(values.len() as i32); + offsets.push(OffsetSize::from_usize(values.len()).unwrap()); valid.append(false); // [NULL, F] values.extend_from_slice(&[None, Some("F")]); - offsets.push(values.len() as i32); + offsets.push(OffsetSize::from_usize(values.len()).unwrap()); valid.append(true); let field = Arc::new(Field::new("item", DataType::Utf8, true)); - ListArray::new( + GenericListArray::::new( field, OffsetBuffer::new(offsets.into()), Arc::new(StringArray::from(values)), @@ -651,43 +642,141 @@ mod tests { ) } - #[test] - fn test_unnest_generic_list() -> datafusion_common::Result<()> { - let list_array = make_test_array(); - - // Test with preserve_nulls = false - let options = UnnestOptions { - preserve_nulls: false, - }; - let unnested_array = - unnest_generic_list::(&list_array, &options)?; - let strs = unnested_array.as_string::().iter().collect::>(); - assert_eq!( - strs, - vec![Some("A"), Some("B"), Some("C"), Some("D"), None, Some("F")] - ); + // Create a FixedSizeListArray with the following list values: + // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL] + fn make_fixed_list() -> FixedSizeListArray { + let values = Arc::new(StringArray::from_iter([ + Some("A"), + Some("B"), + None, + None, + Some("C"), + Some("D"), + None, + None, + None, + Some("F"), + None, + None, + ])); + let field = Arc::new(Field::new("item", DataType::Utf8, true)); + let valid = NullBuffer::from(vec![true, false, true, false, true, true]); + FixedSizeListArray::new(field, 2, values, Some(valid)) + } - // Test with preserve_nulls = true - let options = UnnestOptions { - preserve_nulls: true, - }; - let unnested_array = - unnest_generic_list::(&list_array, &options)?; + fn verify_unnest_list_array( + list_array: &dyn ListArrayType, + lengths: Vec, + expected: Vec>, + ) -> datafusion_common::Result<()> { + let length_array = Int64Array::from(lengths); + let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?; let strs = unnested_array.as_string::().iter().collect::>(); - assert_eq!( - strs, + assert_eq!(strs, expected); + Ok(()) + } + + #[test] + fn test_unnest_list_array() -> datafusion_common::Result<()> { + // [A, B, C], [], NULL, [D], NULL, [NULL, F] + let list_array = make_generic_array::(); + verify_unnest_list_array( + &list_array, + vec![3, 2, 1, 2, 0, 3], vec![ Some("A"), Some("B"), Some("C"), None, + None, + None, Some("D"), None, None, - Some("F") - ] + Some("F"), + None, + ], + )?; + + // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL] + let list_array = make_fixed_list(); + verify_unnest_list_array( + &list_array, + vec![3, 1, 2, 0, 2, 3], + vec![ + Some("A"), + Some("B"), + None, + None, + Some("C"), + Some("D"), + None, + Some("F"), + None, + None, + None, + ], + )?; + + Ok(()) + } + + fn verify_longest_length( + list_arrays: &[ArrayRef], + preserve_nulls: bool, + expected: Vec, + ) -> datafusion_common::Result<()> { + let options = UnnestOptions { preserve_nulls }; + let longest_length = find_longest_length(list_arrays, &options)?; + let expected_array = Int64Array::from(expected); + assert_eq!( + longest_length + .as_any() + .downcast_ref::() + .unwrap(), + &expected_array ); + Ok(()) + } + + #[test] + fn test_longest_list_length() -> datafusion_common::Result<()> { + // Test with single ListArray + // [A, B, C], [], NULL, [D], NULL, [NULL, F] + let list_array = Arc::new(make_generic_array::()) as ArrayRef; + verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1, 0, 2])?; + verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1, 2])?; + + // Test with single LargeListArray + // [A, B, C], [], NULL, [D], NULL, [NULL, F] + let list_array = Arc::new(make_generic_array::()) as ArrayRef; + verify_longest_length(&[list_array.clone()], false, vec![3, 0, 0, 1, 0, 2])?; + verify_longest_length(&[list_array.clone()], true, vec![3, 0, 1, 1, 1, 2])?; + + // Test with single FixedSizeListArray + // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL] + let list_array = Arc::new(make_fixed_list()) as ArrayRef; + verify_longest_length(&[list_array.clone()], false, vec![2, 0, 2, 0, 2, 2])?; + verify_longest_length(&[list_array.clone()], true, vec![2, 1, 2, 1, 2, 2])?; + + // Test with multiple list arrays + // [A, B, C], [], NULL, [D], NULL, [NULL, F] + // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL] + let list1 = Arc::new(make_generic_array::()) as ArrayRef; + let list2 = Arc::new(make_fixed_list()) as ArrayRef; + let list_arrays = vec![list1.clone(), list2.clone()]; + verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?; + verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?; Ok(()) } + + #[test] + fn test_create_take_indicies() -> datafusion_common::Result<()> { + let length_array = Int64Array::from(vec![2, 3, 1]); + let take_indicies = create_take_indicies(&length_array, 6); + let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]); + assert_eq!(take_indicies, expected); + Ok(()) + } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 057690aacee6..01935efb0ac8 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -1260,8 +1260,11 @@ pub fn parse_expr( parse_required_expr(negative.expr.as_deref(), registry, "expr", codec)?, ))), ExprType::Unnest(unnest) => { - let exprs = parse_exprs(&unnest.exprs, registry, codec)?; - Ok(Expr::Unnest(Unnest { exprs })) + let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?; + if exprs.len() != 1 { + return Err(proto_error("Unnest must have exactly one expression")); + } + Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0)))) } ExprType::InList(in_list) => Ok(Expr::InList(InList::new( Box::new(parse_required_expr( diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 358eea785713..61d57fa4e32b 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -963,9 +963,9 @@ pub fn serialize_expr( expr_type: Some(ExprType::Negative(expr)), } } - Expr::Unnest(Unnest { exprs }) => { + Expr::Unnest(Unnest { expr }) => { let expr = protobuf::Unnest { - exprs: serialize_exprs(exprs, codec)?, + exprs: vec![serialize_expr(expr.as_ref(), codec)?], }; protobuf::LogicalExprNode { expr_type: Some(ExprType::Unnest(expr)), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index e680a1b2ff1e..eee15008fbbb 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -1599,7 +1599,7 @@ fn roundtrip_inlist() { #[test] fn roundtrip_unnest() { let test_expr = Expr::Unnest(Unnest { - exprs: vec![lit(1), lit(2), lit(3)], + expr: Box::new(col("col")), }); let ctx = SessionContext::new(); diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 4bf0906685ca..afa653b329aa 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -119,10 +119,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Build Unnest expression if name.eq("unnest") { - let exprs = + let mut exprs = self.function_args_to_expr(args.clone(), schema, planner_context)?; - Self::check_unnest_args(&exprs, schema)?; - return Ok(Expr::Unnest(Unnest { exprs })); + if exprs.len() != 1 { + return plan_err!("unnest() requires exactly one argument"); + } + let expr = exprs.swap_remove(0); + Self::check_unnest_arg(&expr, schema)?; + return Ok(Expr::Unnest(Unnest::new(expr))); } // next, scalar built-in @@ -353,17 +357,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>() } - pub(crate) fn check_unnest_args(args: &[Expr], schema: &DFSchema) -> Result<()> { - // Currently only one argument is supported - let arg = match args.len() { - 0 => { - return plan_err!("unnest() requires at least one argument"); - } - 1 => &args[0], - _ => { - return not_impl_err!("unnest() does not support multiple arguments yet"); - } - }; + pub(crate) fn check_unnest_arg(arg: &Expr, schema: &DFSchema) -> Result<()> { // Check argument type, array types are supported match arg.get_type(schema)? { DataType::List(_) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 1e01205ba618..9380e569f2e4 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -105,15 +105,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Unnest table factor has empty input let schema = DFSchema::empty(); let input = LogicalPlanBuilder::empty(true).build()?; - let exprs = array_exprs + // Unnest table factor can have multiple arugments. + // We treat each argument as a separate unnest expression. + let unnest_exprs = array_exprs .into_iter() - .map(|expr| { - self.sql_expr_to_logical_expr(expr, &schema, planner_context) + .map(|sql_expr| { + let expr = self.sql_expr_to_logical_expr( + sql_expr, + &schema, + planner_context, + )?; + Self::check_unnest_arg(&expr, &schema)?; + Ok(Expr::Unnest(Unnest::new(expr))) }) .collect::>>()?; - Self::check_unnest_args(&exprs, &schema)?; - let unnest_expr = Expr::Unnest(Unnest { exprs }); - let logical_plan = self.try_process_unnest(input, vec![unnest_expr])?; + if unnest_exprs.is_empty() { + return plan_err!("UNNEST must have at least one argument"); + } + let logical_plan = self.try_process_unnest(input, unnest_exprs)?; (logical_plan, alias) } TableFactor::UNNEST { .. } => { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1bfd60a8ce1a..30eacdb44c4a 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -294,13 +294,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { transformed, tnr: _, } = expr.transform_up_mut(&mut |expr: Expr| { - if let Expr::Unnest(Unnest { ref exprs }) = expr { + if let Expr::Unnest(Unnest { expr: ref arg }) = expr { let column_name = expr.display_name()?; unnest_columns.push(column_name.clone()); // Add alias for the argument expression, to avoid naming conflicts with other expressions // in the select list. For example: `select unnest(col1), col1 from t`. inner_projection_exprs - .push(exprs[0].clone().alias(column_name.clone())); + .push(arg.clone().alias(column_name.clone())); Ok(Transformed::yes(Expr::Column(Column::from_name( column_name, )))) @@ -332,15 +332,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .project(inner_projection_exprs)? .build() } else { - if unnest_columns.len() > 1 { - return not_impl_err!("Only support single unnest expression for now"); - } - let unnest_column = unnest_columns.pop().unwrap(); + let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL let unnest_options = UnnestOptions::new().with_preserve_nulls(false); LogicalPlanBuilder::from(input) .project(inner_projection_exprs)? - .unnest_column_with_options(unnest_column, unnest_options)? + .unnest_columns_with_options(columns, unnest_options)? .project(outer_projection_exprs)? .build() } diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 5c178bb392b1..38207fa7d1d6 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -22,12 +22,12 @@ statement ok CREATE TABLE unnest_table AS VALUES - ([1,2,3], [7], 1), - ([4,5], [8,9,10], 2), - ([6], [11,12], 3), - ([12], [null, 42, null], null), + ([1,2,3], [7], 1, [13, 14]), + ([4,5], [8,9,10], 2, [15, 16]), + ([6], [11,12], 3, null), + ([12], [null, 42, null], null, null), -- null array to verify the `preserve_nulls` option - (null, null, 4) + (null, null, 4, [17, 18]) ; ## Basic unnest expression in select list @@ -93,6 +93,20 @@ NULL 42 NULL +## Unnest single column and filter out null lists +query I +select unnest(column2) from unnest_table where column2 is not null; +---- +7 +8 +9 +10 +11 +12 +NULL +42 +NULL + ## Unnest with additional column ## Issue: https://github.com/apache/arrow-datafusion/issues/9349 query II @@ -135,9 +149,48 @@ select array_remove(column1, 4), unnest(column2), column3 * 10 from unnest_table query error DataFusion error: Error during planning: unnest\(\) can only be applied to array, struct and null select unnest(column3) from unnest_table; +## Unnest doesn't work with untyped nulls +query error DataFusion error: This feature is not implemented: unnest\(\) does not support null yet +select unnest(null) from unnest_table; + ## Multiple unnest functions in selection -query error DataFusion error: This feature is not implemented: Only support single unnest expression for now -select unnest(column1), unnest(column2) from unnest_table; +query ?I +select unnest([]), unnest(NULL::int[]); +---- + +query III +select + unnest(column1), + unnest(arrow_cast(column2, 'LargeList(Int64)')), + unnest(arrow_cast(column4, 'FixedSizeList(2, Int64)')) +from unnest_table where column4 is not null; +---- +1 7 13 +2 NULL 14 +3 NULL NULL +4 8 15 +5 9 16 +NULL 10 NULL +NULL NULL 17 +NULL NULL 18 + +query IIII +select + unnest(column1), unnest(column2) + 2, + column3 * 10, unnest(array_remove(column1, '4')) +from unnest_table; +---- +1 9 10 1 +2 NULL 10 2 +3 NULL 10 3 +4 10 20 5 +5 11 20 NULL +NULL 12 20 NULL +6 13 30 6 +NULL 14 30 NULL +12 NULL NULL 12 +NULL 44 NULL NULL +NULL NULL NULL NULL ## Unnest scalar in select list query error DataFusion error: Error during planning: unnest\(\) can only be applied to array, struct and null @@ -149,7 +202,7 @@ select * from unnest(1); ## Unnest empty expression in select list -query error DataFusion error: Error during planning: unnest\(\) requires at least one argument +query error DataFusion error: Error during planning: unnest\(\) requires exactly one argument select unnest(); ## Unnest empty expression in from clause @@ -157,13 +210,26 @@ query error DataFusion error: SQL error: ParserError\("Expected an expression:, select * from unnest(); -## Unnest multiple expressions in select list -query error DataFusion error: This feature is not implemented: unnest\(\) does not support multiple arguments yet +## Unnest multiple expressions in select list. This form is only allowed in a query's FROM clause. +query error DataFusion error: Error during planning: unnest\(\) requires exactly one argument select unnest([1,2], [2,3]); ## Unnest multiple expressions in from clause -query error DataFusion error: This feature is not implemented: unnest\(\) does not support multiple arguments yet -select * from unnest([1,2], [2,3]); +query ITII +select * from unnest( + [1,2], + arrow_cast(['a','b', 'c'], 'LargeList(Utf8)'), + arrow_cast([4, NULL], 'FixedSizeList(2, Int64)'), + NULL::int[] +) as t(a, b, c, d); +---- +1 a 4 NULL +2 b NULL NULL +NULL c NULL NULL + +query ?I +select * from unnest([], NULL::int[]); +---- ## Unnest struct expression in select list