Skip to content

Commit d66b393

Browse files
Eliminate consecutive repartitions and update tests
1 parent e969500 commit d66b393

23 files changed

+407
-548
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2863,10 +2863,9 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
28632863
| | ProjectionExec: expr=[b@0 as b, count(Int64(1))@1 as count(*), count(Int64(1))@1 as count(Int64(1))] |
28642864
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(Int64(1))] |
28652865
| | CoalesceBatchesExec: target_batch_size=8192 |
2866-
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 |
2867-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
2868-
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(Int64(1))] |
2869-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2866+
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2867+
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(Int64(1))] |
2868+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
28702869
| | |
28712870
+---------------+------------------------------------------------------------------------------------------------------------+
28722871
"###
@@ -2875,22 +2874,21 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
28752874
assert_snapshot!(
28762875
pretty_format_batches(&df_results).unwrap(),
28772876
@r###"
2878-
+---------------+--------------------------------------------------------------------------------+
2879-
| plan_type | plan |
2880-
+---------------+--------------------------------------------------------------------------------+
2881-
| logical_plan | Sort: count(*) ASC NULLS LAST |
2882-
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
2883-
| | TableScan: t1 projection=[b] |
2884-
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
2885-
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
2886-
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] |
2887-
| | CoalesceBatchesExec: target_batch_size=8192 |
2888-
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 |
2889-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
2890-
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
2891-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2892-
| | |
2893-
+---------------+--------------------------------------------------------------------------------+
2877+
+---------------+----------------------------------------------------------------------------+
2878+
| plan_type | plan |
2879+
+---------------+----------------------------------------------------------------------------+
2880+
| logical_plan | Sort: count(*) ASC NULLS LAST |
2881+
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
2882+
| | TableScan: t1 projection=[b] |
2883+
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
2884+
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
2885+
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] |
2886+
| | CoalesceBatchesExec: target_batch_size=8192 |
2887+
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2888+
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
2889+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2890+
| | |
2891+
+---------------+----------------------------------------------------------------------------+
28942892
"###
28952893
);
28962894
Ok(())
@@ -3200,10 +3198,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
32003198
| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] |
32013199
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] |
32023200
| | CoalesceBatchesExec: target_batch_size=8192 |
3203-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 |
3204-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
3205-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3206-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3201+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3202+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3203+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
32073204
| | |
32083205
+---------------+---------------------------------------------------------------------------------------------------------------------------+
32093206
"
@@ -3257,10 +3254,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
32573254
| | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] |
32583255
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] |
32593256
| | CoalesceBatchesExec: target_batch_size=8192 |
3260-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 |
3261-
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
3262-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3263-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3257+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3258+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3259+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
32643260
| | |
32653261
+---------------+---------------------------------------------------------------------------------------------------------------------------+
32663262
"

0 commit comments

Comments
 (0)