Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support unnest multiple arrays #10044

Merged
merged 13 commits into from
Apr 15, 2024
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,12 +1168,13 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Unnest(Unnest { input, column, schema, options }) => {
LogicalPlan::Unnest(Unnest { input, columns, schema, options }) => {
let input = self.create_initial_plan(input, session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
let column_execs = columns.iter().map(|column| {
schema.index_of_column(column).map(|idx| Column::new(&column.name, idx))
}).collect::<Result<_>>()?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(UnnestExec::new(input, column_exec, schema, options.clone())))
Ok(Arc::new(UnnestExec::new(input, column_execs, schema, options.clone())))
}
LogicalPlan::Ddl(ddl) => {
// There is no default plan for DDl statements --
Expand Down
20 changes: 16 additions & 4 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,16 @@ pub enum Expr {

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub exprs: Vec<Expr>,
pub expr: Box<Expr>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the expression position, only one argument can be accepted.

This comment was marked as outdated.

}

impl Unnest {
/// Create a new Unnest expression.
pub fn new(expr: Expr) -> Self {
Self {
expr: Box::new(expr),
}
}
}

/// Alias expression
Expand Down Expand Up @@ -1567,8 +1576,8 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { exprs }) => {
write!(f, "UNNEST({exprs:?})")
Expr::Unnest(Unnest { expr }) => {
write!(f, "UNNEST({expr:?})")
}
}
}
Expand Down Expand Up @@ -1757,7 +1766,10 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
Expr::Unnest(Unnest { exprs }) => create_function_name("unnest", false, exprs),
Expr::Unnest(Unnest { expr }) => {
let expr_name = create_name(expr)?;
Ok(format!("unnest({expr_name})"))
}
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
if let Expr::Unnest(Unnest { exprs }) = expr {
if let Expr::Unnest(Unnest { expr }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
exprs[0].clone(),
expr.as_ref().clone(),
schemas,
using_columns,
)?;
return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
}

expr.transform(&|expr| {
Expand Down
8 changes: 2 additions & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,8 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::Unnest(Unnest { exprs }) => {
let arg_data_types = exprs
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
let arg_data_type = arg_data_types[0].clone();
Expr::Unnest(Unnest { expr }) => {
let arg_data_type = expr.get_type(schema)?;
// Unnest's output type is the inner type of the list
match arg_data_type{
DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) =>{
Expand Down
77 changes: 46 additions & 31 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ impl LogicalPlanBuilder {

/// Unnest the given column.
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
Ok(Self::from(unnest(self.plan, column.into())?))
Ok(Self::from(unnest(self.plan, vec![column.into()])?))
}

/// Unnest the given column given [`UnnestOptions`]
Expand All @@ -1123,10 +1123,21 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
Ok(Self::from(unnest_with_options(
self.plan,
column.into(),
vec![column.into()],
options,
)?))
}

/// Unnest the given columns with the given [`UnnestOptions`]
pub fn unnest_columns_with_options(
self,
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
Ok(Self::from(unnest_with_options(
self.plan, columns, options,
)?))
}
}
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
let mut name_map = HashMap::new();
Expand Down Expand Up @@ -1534,44 +1545,50 @@ impl TableSource for LogicalTableSource {
}

/// Create a [`LogicalPlan::Unnest`] plan
pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
unnest_with_options(input, column, UnnestOptions::new())
pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
unnest_with_options(input, columns, UnnestOptions::new())
}

/// Create a [`LogicalPlan::Unnest`] plan with options
pub fn unnest_with_options(
input: LogicalPlan,
column: Column,
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<LogicalPlan> {
let (unnest_qualifier, unnest_field) =
input.schema().qualified_field_from_column(&column)?;

// Extract the type of the nested field in the list.
let unnested_field = match unnest_field.data_type() {
DataType::List(field)
| DataType::FixedSizeList(field, _)
| DataType::LargeList(field) => Arc::new(Field::new(
unnest_field.name(),
field.data_type().clone(),
unnest_field.is_nullable(),
)),
_ => {
// If the unnest field is not a list type return the input plan.
return Ok(input);
}
};
let mut unnested_fields: HashMap<usize, _> = HashMap::with_capacity(columns.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

// Add qualifiers to the columns.
let mut qualified_columns = Vec::with_capacity(columns.len());
for c in &columns {
let index = input.schema().index_of_column(c)?;
let (unnest_qualifier, unnest_field) = input.schema().qualified_field(index);
let unnested_field = match unnest_field.data_type() {
DataType::List(field)
| DataType::FixedSizeList(field, _)
| DataType::LargeList(field) => Arc::new(Field::new(
unnest_field.name(),
field.data_type().clone(),
// Unnesting may produce NULLs even if the list is not null.
// For example: unnset([1], []) -> 1, null
Copy link
Contributor

@jayzhan211 jayzhan211 Apr 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the test does not cover this case, sadly I don't think there is currently a way to create a list with nullable as false 🤔 Maybe we can write a simple rust test for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too. true should be equivalent to the previous unnest_field.is_nullable().
I planned to write some tests related to DataFrame, which could potentially implement it. But the changes in this PR are larger than I expected, so those tests have been delayed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I planned to write some tests related to DataFrame, which could potentially implement it. But the changes in this PR are larger than I expected, so those tests have been delayed.

We can do it in another PR!

Copy link
Member Author

@jonahgao jonahgao Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it in another PR!

👌

true,
)),
_ => {
// If the unnest field is not a list type return the input plan.
return Ok(input);
}
};
qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref())));
unnested_fields.insert(index, unnested_field);
}

// Update the schema with the unnest column type changed to contain the nested type.
// Update the schema with the unnest column types changed to contain the nested types.
let input_schema = input.schema();
let fields = input_schema
.iter()
.map(|(q, f)| {
if f.as_ref() == unnest_field && q == unnest_qualifier {
(unnest_qualifier.cloned(), unnested_field.clone())
} else {
(q.cloned(), f.clone())
}
.enumerate()
.map(|(index, (q, f))| match unnested_fields.get(&index) {
Some(unnested_field) => (q.cloned(), unnested_field.clone()),
None => (q.cloned(), f.clone()),
})
.collect::<Vec<_>>();

Expand All @@ -1580,11 +1597,9 @@ pub fn unnest_with_options(
// We can use the existing functional dependencies:
let deps = input_schema.functional_dependencies().clone();
let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
let column = Column::from((unnest_qualifier, unnested_field.as_ref()));

Ok(LogicalPlan::Unnest(Unnest {
input: Arc::new(input),
column,
columns: qualified_columns,
schema,
options,
}))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,10 +638,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Node Type": "DescribeTable"
})
}
LogicalPlan::Unnest(Unnest { column, .. }) => {
LogicalPlan::Unnest(Unnest { columns, .. }) => {
json!({
"Node Type": "Unnest",
"Column": format!("{}", column)
"Column": expr_vec_fmt!(columns),
})
}
}
Expand Down
56 changes: 8 additions & 48 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use super::dml::CopyTo;
use super::DdlStatement;
use crate::builder::change_redundant_column;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Alias, Placeholder, Sort as SortExpr, WindowFunction};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
Expand Down Expand Up @@ -807,51 +807,11 @@ impl LogicalPlan {
}
LogicalPlan::DescribeTable(_) => Ok(self.clone()),
LogicalPlan::Unnest(Unnest {
column,
schema,
options,
..
columns, options, ..
}) => {
// Update schema with unnested column type.
let input = Arc::new(inputs.swap_remove(0));
let (nested_qualifier, nested_field) =
input.schema().qualified_field_from_column(column)?;
let (unnested_qualifier, unnested_field) =
schema.qualified_field_from_column(column)?;
let qualifiers_and_fields = input
.schema()
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&nested_qualifier)
&& field.as_ref() == nested_field
{
(
unnested_qualifier.cloned(),
Arc::new(unnested_field.clone()),
)
} else {
(qualifier.cloned(), field.clone())
}
})
.collect::<Vec<_>>();

let schema = Arc::new(
DFSchema::new_with_metadata(
qualifiers_and_fields,
input.schema().metadata().clone(),
)?
// We can use the existing functional dependencies as is:
.with_functional_dependencies(
input.schema().functional_dependencies().clone(),
)?,
);

Ok(LogicalPlan::Unnest(Unnest {
input,
column: column.clone(),
schema,
options: options.clone(),
}))
let input = inputs.swap_remove(0);
unnest_with_options(input, columns.clone(), options.clone())
}
}
}
Expand Down Expand Up @@ -1604,8 +1564,8 @@ impl LogicalPlan {
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
LogicalPlan::Unnest(Unnest { column, .. }) => {
write!(f, "Unnest: {column}")
LogicalPlan::Unnest(Unnest { columns, .. }) => {
write!(f, "Unnest: {}", expr_vec_fmt!(columns))
}
}
}
Expand Down Expand Up @@ -2579,8 +2539,8 @@ pub enum Partitioning {
pub struct Unnest {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The column to unnest
pub column: Column,
/// The columns to unnest
pub columns: Vec<Column>,
/// The output schema, containing the unnested field column.
pub schema: DFSchemaRef,
/// Options
Expand Down
27 changes: 9 additions & 18 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,13 @@ impl TreeNode for LogicalPlan {
}
LogicalPlan::Unnest(Unnest {
input,
column,
columns,
schema,
options,
}) => rewrite_arc(input, f)?.update_data(|input| {
LogicalPlan::Unnest(Unnest {
input,
column,
columns,
schema,
options,
})
Expand Down Expand Up @@ -504,8 +504,12 @@ impl LogicalPlan {
LogicalPlan::TableScan(TableScan { filters, .. }) => {
filters.iter().apply_until_stop(f)
}
LogicalPlan::Unnest(Unnest { column, .. }) => {
f(&Expr::Column(column.clone()))
LogicalPlan::Unnest(Unnest { columns, .. }) => {
let exprs = columns
.iter()
.map(|c| Expr::Column(c.clone()))
.collect::<Vec<_>>();
exprs.iter().apply_until_stop(f)
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
Expand Down Expand Up @@ -701,20 +705,6 @@ impl LogicalPlan {
fetch,
})
}),
LogicalPlan::Unnest(Unnest {
input,
column,
schema,
options,
}) => f(Expr::Column(column))?.map_data(|column| match column {
Expr::Column(column) => Ok(LogicalPlan::Unnest(Unnest {
input,
column,
schema,
options,
})),
_ => internal_err!("Transformation should return Column"),
})?,
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
select_expr,
Expand All @@ -739,6 +729,7 @@ impl LogicalPlan {
}),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl TreeNode for Expr {
) -> Result<TreeNodeRecursion> {
let children = match self {
Expr::Alias(Alias{expr,..})
| Expr::Unnest(Unnest{expr})
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
Expand All @@ -60,7 +61,6 @@ impl TreeNode for Expr {
GetFieldAccess::NamedStructField { .. } => vec![expr],
}
}
Expr::Unnest(Unnest { exprs }) |
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
Expand Down
Loading
Loading