Skip to content

Commit 2759cad

Browse files
okhsunrog0x501D
authored andcommitted
don't combine if input has multiple partitions - preserve distributed aggregation
1 parent 047d7fb commit 2759cad

File tree

2 files changed

+34
-7
lines changed

2 files changed

+34
-7
lines changed

datafusion/physical-optimizer/src/combine_partial_final_agg.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ use datafusion_common::error::Result;
2424
use datafusion_physical_plan::aggregates::{
2525
AggregateExec, AggregateMode, PhysicalGroupBy,
2626
};
27+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
2728
use datafusion_physical_plan::ExecutionPlan;
2829

2930
use crate::PhysicalOptimizerRule;
3031
use datafusion_common::config::ConfigOptions;
31-
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
32+
use datafusion_common::tree_node::{
33+
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
34+
};
3235
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
3336
use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr};
3437

@@ -85,7 +88,10 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
8588
input_agg_exec.aggr_expr(),
8689
input_agg_exec.filter_expr(),
8790
),
88-
) {
91+
)
92+
// Don't combine if input has multiple partitions - preserve distributed aggregation
93+
&& !has_multi_partition_coalesce(input_agg_exec.input())
94+
{
8995
let mode = if agg_exec.mode() == &AggregateMode::Final {
9096
AggregateMode::Single
9197
} else {
@@ -161,4 +167,26 @@ fn can_combine(final_agg: GroupExprsRef, partial_agg: GroupExprsRef) -> bool {
161167
)
162168
}
163169

170+
/// Check if the plan subtree contains a CoalescePartitionsExec with multiple input partitions.
171+
fn has_multi_partition_coalesce(plan: &Arc<dyn ExecutionPlan>) -> bool {
172+
plan.apply(|node| {
173+
// Check if this node is CoalescePartitionsExec with multiple inputs
174+
if node.as_any().is::<CoalescePartitionsExec>() {
175+
let partition_count = node
176+
.children()
177+
.first()
178+
.map(|child| child.properties().partitioning.partition_count())
179+
.unwrap_or(0);
180+
if partition_count > 1 {
181+
// Found a multi-partition coalesce, stop traversal
182+
return Ok(TreeNodeRecursion::Stop);
183+
}
184+
}
185+
// Continue traversing children
186+
Ok(TreeNodeRecursion::Continue)
187+
})
188+
.map(|recursion| matches!(recursion, TreeNodeRecursion::Stop))
189+
.unwrap_or(false)
190+
}
191+
164192
// See tests in datafusion/core/tests/physical_optimizer

datafusion/sql/src/parser.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,15 +279,14 @@ impl<'a> DFParser<'a> {
279279
sql: &str,
280280
dialect: &'a dyn Dialect,
281281
) -> Result<Self, ParserError> {
282-
let tokens = Tokenizer::new(dialect, sql).into_tokens().collect::<Result<_, _>>()?;
282+
let tokens = Tokenizer::new(dialect, sql)
283+
.into_tokens()
284+
.collect::<Result<_, _>>()?;
283285
Ok(Self::from_dialect_and_tokens(dialect, tokens))
284286
}
285287

286288
/// Create a new parser from specified dialect and tokens.
287-
pub fn from_dialect_and_tokens(
288-
dialect: &'a dyn Dialect,
289-
tokens: Vec<Token>,
290-
) -> Self {
289+
pub fn from_dialect_and_tokens(dialect: &'a dyn Dialect, tokens: Vec<Token>) -> Self {
291290
let parser = Parser::new(dialect).with_tokens(tokens);
292291
DFParser { parser }
293292
}

0 commit comments

Comments
 (0)