-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix: Eliminate consecutive repartitions #18521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: Eliminate consecutive repartitions #18521
Conversation
c7f39df to
746d6ba
Compare
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks amazing @gene-bordegaray -- thank you 🙏
I kicked off some benchmarks to make sure it doesn't impact performance. Assuming not I'll then try and take a closer look
|
@NGA-TRAN and @gabotechs could you also please help review this PR? |
NGA-TRAN
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonderful work. Way to go @gene-bordegaray
| } | ||
| Distribution::HashPartitioned(exprs) => { | ||
| if add_roundrobin { | ||
| if add_roundrobin && !hash_necessary { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we permalink to these in the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adriangb : Do you mean adding the comment with links right at the fix for future reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we link these I would like to finish the docs to be complete. If you notice I cut off at the end of the explanation of the enforce_distribution because this was thorough enough for this bug
|
🤖 |
|
🤖: Benchmark completed Details
|
2010YOUY01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the amazing work.
Removing unnecessary consecutive RepartitionExec makes sense to me, however I have went through half of the test and found 2 plan changes that is not 2 RepartitionExec -> 1 RepartitionExec, I'm wondering is that expected? Do we have other plan changes that is not removing 1 of the consecutive RepartitionExec?
The first one see review comment.
The second one is tpch-q4, I saw a 20% speedup in benchmark result, so I checked the query plan, and the difference is it's removing a round robin repartition above parquet reader:
// before
> explain select
o_orderpriority,
count(*) as order_count
from
orders
where
o_orderdate >= '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and exists (
select
*
from
lineitem
where
l_orderkey = o_orderkey
and l_commitdate < l_receiptdate
)
group by
o_orderpriority
order by
o_orderpriority;
+---------------+------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------+
| physical_plan | ┌───────────────────────────┐ |
| | │ SortPreservingMergeExec │ |
| | │ -------------------- │ |
| | │ o_orderpriority ASC NULLS │ |
| | │ LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ SortExec │ |
| | │ -------------------- │ |
| | │ o_orderpriority@0 ASC │ |
| | │ NULLS LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ ProjectionExec │ |
| | │ -------------------- │ |
| | │ o_orderpriority: │ |
| | │ o_orderpriority │ |
| | │ │ |
| | │ order_count: │ |
| | │ count(Int64(1)) │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ AggregateExec │ |
| | │ -------------------- │ |
| | │ aggr: count(1) │ |
| | │ │ |
| | │ group_by: │ |
| | │ o_orderpriority │ |
| | │ │ |
| | │ mode: │ |
| | │ FinalPartitioned │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ CoalesceBatchesExec │ |
| | │ -------------------- │ |
| | │ target_batch_size: │ |
| | │ 8192 │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ RepartitionExec │ |
| | │ -------------------- │ |
| | │ partition_count(in->out): │ |
| | │ 14 -> 14 │ |
| | │ │ |
| | │ partitioning_scheme: │ |
| | │ Hash([o_orderpriority@0], │ |
| | │ 14) │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ AggregateExec │ |
| | │ -------------------- │ |
| | │ aggr: count(1) │ |
| | │ │ |
| | │ group_by: │ |
| | │ o_orderpriority │ |
| | │ │ |
| | │ mode: Partial │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ CoalesceBatchesExec │ |
| | │ -------------------- │ |
| | │ target_batch_size: │ |
| | │ 8192 │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ HashJoinExec │ |
| | │ -------------------- │ |
| | │ join_type: RightSemi │ |
| | │ ├──────────────┐ |
| | │ on: │ │ |
| | │ (l_orderkey = o_orderkey) │ │ |
| | └─────────────┬─────────────┘ │ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ CoalescePartitionsExec ││ CoalesceBatchesExec │ |
| | │ ││ -------------------- │ |
| | │ ││ target_batch_size: │ |
| | │ ││ 8192 │ |
| | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ CoalesceBatchesExec ││ FilterExec │ |
| | │ -------------------- ││ -------------------- │ |
| | │ target_batch_size: ││ predicate: │ |
| | │ 8192 ││ o_orderdate >= 1993-07-01 │ |
| | │ ││ AND o_orderdate < 1993 │ |
| | │ ││ -10-01 │ |
| | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ FilterExec ││ RepartitionExec │ |
| | │ -------------------- ││ -------------------- │ |
| | │ predicate: ││ partition_count(in->out): │ |
| | │ l_receiptdate > ││ 1 -> 14 │ |
| | │ l_commitdate ││ │ |
| | │ ││ partitioning_scheme: │ |
| | │ ││ RoundRobinBatch(14) │ |
| | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ RepartitionExec ││ DataSourceExec │ |
| | │ -------------------- ││ -------------------- │ |
| | │ partition_count(in->out): ││ files: 1 │ |
| | │ 1 -> 14 ││ format: parquet │ |
| | │ ││ │ |
| | │ partitioning_scheme: ││ predicate: │ |
| | │ RoundRobinBatch(14) ││ o_orderdate >= 1993-07-01 │ |
| | │ ││ AND o_orderdate < 1993 │ |
| | │ ││ -10-01 │ |
| | └─────────────┬─────────────┘└───────────────────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ DataSourceExec │ |
| | │ -------------------- │ |
| | │ files: 1 │ |
| | │ format: parquet │ |
| | │ │ |
| | │ predicate: │ |
| | │ l_receiptdate > │ |
| | │ l_commitdate │ |
| | └───────────────────────────┘ |
| | |
+---------------+------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.011 seconds.
// PR
+---------------+------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------+
| physical_plan | ┌───────────────────────────┐ |
| | │ SortPreservingMergeExec │ |
| | │ -------------------- │ |
| | │ o_orderpriority ASC NULLS │ |
| | │ LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ SortExec │ |
| | │ -------------------- │ |
| | │ o_orderpriority@0 ASC │ |
| | │ NULLS LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ ProjectionExec │ |
| | │ -------------------- │ |
| | │ o_orderpriority: │ |
| | │ o_orderpriority │ |
| | │ │ |
| | │ order_count: │ |
| | │ count(Int64(1)) │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ AggregateExec │ |
| | │ -------------------- │ |
| | │ aggr: count(1) │ |
| | │ │ |
| | │ group_by: │ |
| | │ o_orderpriority │ |
| | │ │ |
| | │ mode: │ |
| | │ FinalPartitioned │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ CoalesceBatchesExec │ |
| | │ -------------------- │ |
| | │ target_batch_size: │ |
| | │ 8192 │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ RepartitionExec │ |
| | │ -------------------- │ |
| | │ partition_count(in->out): │ |
| | │ 14 -> 14 │ |
| | │ │ |
| | │ partitioning_scheme: │ |
| | │ Hash([o_orderpriority@0], │ |
| | │ 14) │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ AggregateExec │ |
| | │ -------------------- │ |
| | │ aggr: count(1) │ |
| | │ │ |
| | │ group_by: │ |
| | │ o_orderpriority │ |
| | │ │ |
| | │ mode: Partial │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ CoalesceBatchesExec │ |
| | │ -------------------- │ |
| | │ target_batch_size: │ |
| | │ 8192 │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ HashJoinExec │ |
| | │ -------------------- │ |
| | │ join_type: RightSemi │ |
| | │ ├──────────────┐ |
| | │ on: │ │ |
| | │ (l_orderkey = o_orderkey) │ │ |
| | └─────────────┬─────────────┘ │ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ CoalescePartitionsExec ││ CoalesceBatchesExec │ |
| | │ ││ -------------------- │ |
| | │ ││ target_batch_size: │ |
| | │ ││ 8192 │ |
| | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ CoalesceBatchesExec ││ FilterExec │ |
| | │ -------------------- ││ -------------------- │ |
| | │ target_batch_size: ││ predicate: │ |
| | │ 8192 ││ o_orderdate >= 1993-07-01 │ |
| | │ ││ AND o_orderdate < 1993 │ |
| | │ ││ -10-01 │ |
| | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ FilterExec ││ RepartitionExec │ |
| | │ -------------------- ││ -------------------- │ |
| | │ predicate: ││ partition_count(in->out): │ |
| | │ l_receiptdate > ││ 1 -> 14 │ |
| | │ l_commitdate ││ │ |
| | │ ││ partitioning_scheme: │ |
| | │ ││ RoundRobinBatch(14) │ |
| | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
| | │ DataSourceExec ││ DataSourceExec │ |
| | │ -------------------- ││ -------------------- │ |
| | │ files: 14 ││ files: 1 │ |
| | │ format: parquet ││ format: parquet │ |
| | │ ││ │ |
| | │ predicate: ││ predicate: │ |
| | │ l_receiptdate > ││ o_orderdate >= 1993-07-01 │ |
| | │ l_commitdate ││ AND o_orderdate < 1993 │ |
| | │ ││ -10-01 │ |
| | └───────────────────────────┘└───────────────────────────┘ |
| | |
+---------------+------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.013 seconds.
| 15)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] | ||
| 16)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] | ||
| 17)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true | ||
| 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it removed a SortExec, is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my three comments below. If the data in each corresponding partition/stream of the join input is already sorted, we can skip the re-sorting step and simply perform a merge join on each matching partition/stream.
Can you calrify the data you used to create the tables, thank you |
| } | ||
| Distribution::HashPartitioned(exprs) => { | ||
| if add_roundrobin { | ||
| if add_roundrobin && !hash_necessary { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adriangb : Do you mean adding the comment with links right at the fix for future reference?
| 09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 | ||
| 10)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] | ||
| 11)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] | ||
| 12)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data is sorted on a, b, c
| 08)----CoalesceBatchesExec: target_batch_size=2 | ||
| 09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 | ||
| 10)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] | ||
| 11)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data is sorted on a, b, c, rn1
| 06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] | ||
| 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true | ||
| 08)----CoalesceBatchesExec: target_batch_size=2 | ||
| 09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the key check: we need to confirm whether the data remains sorted after Hash Repartition. We're using it to split one partition/stream into two, and if we simply apply the hash function and stream each row forward, the data should stay sorted within each resulting partition/stream on a, b, c, rn1. I strongly suspect this holds, but we need to verify.
Could you test this across different datasets to confirm both correctness and sort order?
Most importantly:
i. Check whether the execution plan marks the data as sorted per partition after Hash Repartition. If it does, please file a new ticket to ensure we display the sort order post-repartition in future work.
ii. Investigate whether the data is actually sorted per partition, even when it's marked as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if this is the case, this would be another great optimization we have for free with this fix. I actually ran into this. See Even More Suboptimal Plan here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok great, yes I can investigate this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have recreated the query from join.slt and checked the results, here is the command recreated:
-- These are settings that are set in join.slt that are set prior to the query
SET datafusion.optimizer.prefer_hash_join = false;
SET datafusion.explain.format = 'indent';
CREATE EXTERNAL TABLE annotated_data (
a0 INTEGER,
a INTEGER,
b INTEGER,
c INTEGER,
d INTEGER
)
STORED AS CSV
WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC)
LOCATION 'datafusion/core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');
EXPLAIN SELECT *
FROM (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as l_table
JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as r_table
ON l_table.a = r_table.a
ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1;
SELECT *
FROM (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as l_table
JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as r_table
ON l_table.a = r_table.a
ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1
LIMIT 30;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: l_table.a ASC NULLS FIRST, l_table.b ASC NULLS LAST, l_table.c ASC NULLS LAST, r_table.rn1 ASC NULLS LAST |
| | Inner Join: l_table.a = r_table.a |
| | SubqueryAlias: l_table |
| | Projection: annotated_data.a0, annotated_data.a, annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1 |
| | WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
| | TableScan: annotated_data projection=[a0, a, b, c, d] |
| | SubqueryAlias: r_table |
| | Projection: annotated_data.a0, annotated_data.a, annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1 |
| | WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] |
| | TableScan: annotated_data projection=[a0, a, b, c, d] |
| physical_plan | SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, rn1@11 ASC NULLS LAST] |
| | SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] |
| | CoalesceBatchesExec: target_batch_size=2 |
| | RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 |
| | ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] |
| | BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] |
| | DataSourceExec: file_groups={1 group: [[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true |
| | CoalesceBatchesExec: target_batch_size=2 |
| | RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1 |
| | ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] |
| | BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] |
| | DataSourceExec: file_groups={1 group: [[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.003 seconds.
+----+---+---+---+---+-----+----+---+---+----+---+-----+
| a0 | a | b | c | d | rn1 | a0 | a | b | c | d | rn1 |
+----+---+---+---+---+-----+----+---+---+----+---+-----+
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 1 | 2 | 2 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 2 | 0 | 3 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 3 | 0 | 4 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 4 | 1 | 5 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 5 | 1 | 6 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 6 | 0 | 7 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 7 | 2 | 8 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 8 | 1 | 9 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 9 | 4 | 10 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 10 | 4 | 11 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 11 | 2 | 12 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 12 | 2 | 13 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 13 | 1 | 14 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 14 | 2 | 15 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 15 | 3 | 16 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 16 | 3 | 17 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 17 | 2 | 18 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 18 | 1 | 19 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 19 | 4 | 20 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 20 | 0 | 21 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 21 | 3 | 22 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 22 | 0 | 23 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 23 | 0 | 24 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 24 | 4 | 25 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 25 | 0 | 26 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 26 | 2 | 27 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 27 | 0 | 28 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 28 | 1 | 29 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 29 | 1 | 30 |
+----+---+---+---+---+-----+----+---+---+----+---+-----+
30 row(s) fetched.
Elapsed 0.003 seconds.
- The physical plan here has a Hash Repartition on the and no SortExec node above. Despite this as seen the results are still sorted meaning that sorting was preserved.
- The metadata for when order is preserved is not shown as seen in the above plan. The EXPLAIN output should display this but does not. This is tracked in the field
maintains_input_order. I can create an issue for this and link it here - To recreate this I had to have: explicit ordering in the
WITH ORDERandORDER BYclause and set the prefer_hash_join flag to false to force SortMergeJoin. This means with the default config if a user is using pre-sorted files they might miss speed ups by keeping unneededSortExecnodes. Might be worth further discussion if there is a better way to handle this or open an issue to look further into this.
Let me know what you think
@2010YOUY01
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this seems a different issue that is triggered by the change, would be great to investigate in the future.
| 15)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] | ||
| 16)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] | ||
| 17)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true | ||
| 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my three comments below. If the data in each corresponding partition/stream of the join input is already sorted, we can skip the re-sorting step and simply perform a merge join on each matching partition/stream.
| 09)--------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] | ||
| 10)----------TableScan: annotated_data projection=[a0, a, b, c, d] | ||
| physical_plan | ||
| 01)SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, rn1@11 ASC NULLS LAST] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And because there are 2 sorted partitions/streams after the join, we need this SortPreservingMergeExec to merge them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This the conclusion I cam to as well off of intutition but will confirm 😄
@2010YOUY01 : The reason Gene asked you to verify your test data is in this explanation. It seems in the build side your lineitem table is too small to repartition. Thus the plan after the fix does not repartition it. That is the reason for the speedup |
Yes, I am suspecting that i is due to the size of the tables because the logic for repartitioning at the file level lives above and independent of the repartitioning logic changes I have made |
That’s likely the case—there’s a predicate on the lineitem table, and we’ve pushed down a kind-of bloom filter to prune data during the scan. As a result, the dataset could be quite small. I wouldn’t worry about this scenario; it’s a reasonable outcome and one of the motivations behind the fix. Plus, it clearly improves query performance. |
I'm using tpch-sf0.1 dataset generated by https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli This PR collected the HJ build side into a single partition, while previously it doesn't. I checked with And the statistics estimates the scan output row to be below I think the result is good, nothing to worry about. |
2010YOUY01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you.
I’ve reviewed all the test changes, everything looks good except several SortExec additions and removals — these appear to be redundant. It’s not a correctness issue, but it could affect performance, so it might be worth investigating further in the future.
| 02)--ProjectionExec: expr=[c3@0 as c3, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2] | ||
| 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] | ||
| 04)------SortExec: expr=[c3@0 ASC NULLS LAST, c9@1 DESC], preserve_partitioning=[true] | ||
| 02)--SortExec: TopK(fetch=5), expr=[c3@0 ASC NULLS LAST], preserve_partitioning=[true] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it adds a SortExec similarly, might also worth investigating later.
|
BTW, I think it's a great opportunity to add a section in the tuning guide https://datafusion.apache.org/user-guide/configs.html#tuning-guide explaining the default repartitioning behavior and how related configuration options (such as the minimum file size for repartitioning, minimum repartition size for hash joins, etc.) can affect it. |
Which issue does this PR close?
Rationale for this change
Cases where two RepartitionExec operators appear consecutively in the plan. This is unneeded overhead that eliminating provides speed ups.
Full Report: The Physical Optimizer and Fixing Consecutive Repartitions In the Enforce Distribution Rule.pdf
Issue Report: Fixing Consecutive Repartitions In the Enforce Distribution Rule.pdf
What changes are included in this PR?
Change to repartition adding logic in
enforce_distribution.rsA ton of test and bench updates to mirror new behavior
Are these changes tested?
Yes benchmarked and tested, check report for benchmarks
Are there any user-facing changes?