Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unnest_column to DataFrame #5106

Merged
merged 20 commits into from
Feb 6, 2023
Merged

Add unnest_column to DataFrame #5106

merged 20 commits into from
Feb 6, 2023

Conversation

vincev
Copy link
Contributor

@vincev vincev commented Jan 29, 2023

Which issue does this PR close?

This PR introduces changes to DataFrame to add support for unnest/explode functionality as requested in #212.

I would like to get feedback on these changes and then create another PR to try to add support to the SQL parser.

Rationale for this change

This PR add an unnest_column method to DataFrame to unnest list types columns (see tests), given the following data frame:

+----------+------------------------------------------------------------+--------------------+
| shape_id | points                                                     | tags               |
+----------+------------------------------------------------------------+--------------------+
| 1        | [{"x": -3, "y": -4}, {"x": -3, "y": 6}, {"x": 2, "y": -2}] | [tag1]             |
| 2        |                                                            | [tag1, tag2]       |
| 3        | [{"x": -9, "y": 2}, {"x": -10, "y": -4}]                   |                    |
| 4        | [{"x": -3, "y": 5}, {"x": 2, "y": -1}]                     | [tag1, tag2, tag3] |
+----------+------------------------------------------------------------+--------------------+

The call df.unnest_column("tags") produces:

+----------+------------------------------------------------------------+------+
| shape_id | points                                                     | tags |
+----------+------------------------------------------------------------+------+
| 1        | [{"x": -3, "y": -4}, {"x": -3, "y": 6}, {"x": 2, "y": -2}] | tag1 |
| 2        |                                                            | tag1 |
| 2        |                                                            | tag2 |
| 3        | [{"x": -9, "y": 2}, {"x": -10, "y": -4}]                   |      |
| 4        | [{"x": -3, "y": 5}, {"x": 2, "y": -1}]                     | tag1 |
| 4        | [{"x": -3, "y": 5}, {"x": 2, "y": -1}]                     | tag2 |
| 4        | [{"x": -3, "y": 5}, {"x": 2, "y": -1}]                     | tag3 |
+----------+------------------------------------------------------------+------+

calling df.unnest_column("points") produces:

+----------+---------------------+--------------------+
| shape_id | points              | tags               |
+----------+---------------------+--------------------+
| 1        | {"x": -3, "y": -4}  | [tag1]             |
| 1        | {"x": -3, "y": 6}   | [tag1]             |
| 1        | {"x": 2, "y": -2}   | [tag1]             |
| 2        |                     | [tag1, tag2]       |
| 3        | {"x": -9, "y": 2}   |                    |
| 3        | {"x": -10, "y": -4} |                    |
| 4        | {"x": -3, "y": 5}   | [tag1, tag2, tag3] |
| 4        | {"x": 2, "y": -1}   | [tag1, tag2, tag3] |
+----------+---------------------+--------------------+

and calling df.unnest_column("points").unnest_column("tags") produces:

+----------+---------------------+------+
| shape_id | points              | tags |
+----------+---------------------+------+
| 1        | {"x": -3, "y": -4}  | tag1 |
| 1        | {"x": -3, "y": 6}   | tag1 |
| 1        | {"x": 2, "y": -2}   | tag1 |
| 2        |                     | tag1 |
| 2        |                     | tag2 |
| 3        | {"x": -9, "y": 2}   |      |
| 3        | {"x": -10, "y": -4} |      |
| 4        | {"x": -3, "y": 5}   | tag1 |
| 4        | {"x": -3, "y": 5}   | tag2 |
| 4        | {"x": -3, "y": 5}   | tag3 |
| 4        | {"x": 2, "y": -1}   | tag1 |
| 4        | {"x": 2, "y": -1}   | tag2 |
| 4        | {"x": 2, "y": -1}   | tag3 |
+----------+---------------------+------+

What changes are included in this PR?

This PR add the following changes:

  • Add unnest_column to DataFrame
  • Add an Unnest variant to LogicalPlan that produces a new schema for the unnested column
  • Add UnnestExec to the execution plan
  • Add tests for DataFrame for unnest operation and aggregate count on unnested column
  • Add test for LogicalPlan

Are these changes tested?

Added some initial tests for DataFrame here, and for LogicalPlan here I am happy to add more tests following feedback.

I also run some test on parquet files with 1.9M rows that unnest to 15M:

Data set rows: 1941258 elapsed: 0.443
Dataset unnested rows: 15939196 elapsed: 0.799

Are there any user-facing changes?

Add an unnest_column method to DataFrame.

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules labels Jan 29, 2023
@vincev vincev marked this pull request as ready for review February 2, 2023 16:56
@alamb alamb requested a review from andygrove February 2, 2023 17:03
@alamb
Copy link
Contributor

alamb commented Feb 2, 2023

Thanks @vincev -- I hope someone else will be able to review this PR, otherwise I hope to have time to do so this weekend or early next week.

@alamb
Copy link
Contributor

alamb commented Feb 5, 2023

I plan to review this PR tomorrow

@vincev
Copy link
Contributor Author

vincev commented Feb 5, 2023

I plan to review this PR tomorrow

Thank you @alamb

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @vincev -- this was a pleasure to review. I think the code is well commented, tested and structured.

I went through this code carefully and I think we could improve the performance of the physical operator, but that can also be done as a follow on PR when better performance is desired.

datafusion/core/src/physical_plan/unnest.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/unnest.rs Show resolved Hide resolved
datafusion/core/src/physical_plan/unnest.rs Show resolved Hide resolved
datafusion/core/tests/dataframe.rs Show resolved Hide resolved
datafusion/core/tests/dataframe.rs Show resolved Hide resolved
Comment on lines +743 to +770
// Update schema with unnested column type.
let input = Arc::new(inputs[0].clone());
let nested_field = input.schema().field_from_column(column)?;
let unnested_field = schema.field_from_column(column)?;
let fields = input
.schema()
.fields()
.iter()
.map(|f| {
if f == nested_field {
unnested_field.clone()
} else {
f.clone()
}
})
.collect::<Vec<_>>();

let schema = Arc::new(DFSchema::new_with_metadata(
fields,
input.schema().metadata().clone(),
)?);

Ok(LogicalPlan::Unnest(Unnest {
input,
column: column.clone(),
schema,
}))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent of from_plan is to be mechanical construction rather than potentially changing the schema, so I was surprised this code is necessary.

However, when I tried changing this code to be something more like

            Ok(LogicalPlan::Unnest(Unnest {
                input: inputs[0].clone().into(),
                column: column.clone(),
                schema: schema.clone(),
            }))

The test fails:


---- unnest_columns stdout ----
Error: Internal("PhysicalExpr Column references column 'tags' at index 2 (zero-based) but input schema only has 1 columns: [\"tags\"]")


failures:
    unnest_columns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I had to add this code because tests were failing due to a projection push-down when calling an aggregate function like count. At this point the child schema has only the count field while the unnest schema has all the fields in the initial projection.

@alamb
Copy link
Contributor

alamb commented Feb 6, 2023

I'll plan to merge this PR later today or tomorrow unless there are other comments or people need more time to review

@vincev
Copy link
Contributor Author

vincev commented Feb 6, 2023

I'll plan to merge this PR later today or tomorrow unless there are other comments or people need more time to review

Thank you @alamb

@alamb alamb merged commit 0dfc66d into apache:master Feb 6, 2023
@ursabot
Copy link

ursabot commented Feb 6, 2023

Benchmark runs are scheduled for baseline = 916ec5d and contender = 0dfc66d. 0dfc66d is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants