Skip to content

Commit ff31e6a

Browse files
committed
don't combine if input has multiple partitions - preserve distributed aggregation
1 parent 047d7fb commit ff31e6a

File tree

2 files changed

+33
-6
lines changed

2 files changed

+33
-6
lines changed

datafusion/physical-optimizer/src/combine_partial_final_agg.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion_common::error::Result;
2424
use datafusion_physical_plan::aggregates::{
2525
AggregateExec, AggregateMode, PhysicalGroupBy,
2626
};
27+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
2728
use datafusion_physical_plan::ExecutionPlan;
2829

2930
use crate::PhysicalOptimizerRule;
@@ -85,7 +86,10 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
8586
input_agg_exec.aggr_expr(),
8687
input_agg_exec.filter_expr(),
8788
),
88-
) {
89+
)
90+
// Don't combine if input has multiple partitions - preserve distributed aggregation
91+
&& !has_multi_partition_coalesce(input_agg_exec.input())
92+
{
8993
let mode = if agg_exec.mode() == &AggregateMode::Final {
9094
AggregateMode::Single
9195
} else {
@@ -161,4 +165,28 @@ fn can_combine(final_agg: GroupExprsRef, partial_agg: GroupExprsRef) -> bool {
161165
)
162166
}
163167

168+
/// Check if the plan subtree contains a CoalescePartitionsExec with multiple input partitions.
169+
fn has_multi_partition_coalesce(plan: &Arc<dyn ExecutionPlan>) -> bool {
170+
// Check if this node is CoalescePartitionsExec with multiple inputs
171+
if plan.as_any().is::<CoalescePartitionsExec>() {
172+
let partition_count = plan
173+
.children()
174+
.first()
175+
.map(|child| child.properties().partitioning.partition_count())
176+
.unwrap_or(0);
177+
if partition_count > 1 {
178+
return true;
179+
}
180+
}
181+
182+
// Recursively check all children
183+
for child in plan.children() {
184+
if has_multi_partition_coalesce(child) {
185+
return true;
186+
}
187+
}
188+
189+
false
190+
}
191+
164192
// See tests in datafusion/core/tests/physical_optimizer

datafusion/sql/src/parser.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,15 +279,14 @@ impl<'a> DFParser<'a> {
279279
sql: &str,
280280
dialect: &'a dyn Dialect,
281281
) -> Result<Self, ParserError> {
282-
let tokens = Tokenizer::new(dialect, sql).into_tokens().collect::<Result<_, _>>()?;
282+
let tokens = Tokenizer::new(dialect, sql)
283+
.into_tokens()
284+
.collect::<Result<_, _>>()?;
283285
Ok(Self::from_dialect_and_tokens(dialect, tokens))
284286
}
285287

286288
/// Create a new parser from specified dialect and tokens.
287-
pub fn from_dialect_and_tokens(
288-
dialect: &'a dyn Dialect,
289-
tokens: Vec<Token>,
290-
) -> Self {
289+
pub fn from_dialect_and_tokens(dialect: &'a dyn Dialect, tokens: Vec<Token>) -> Self {
291290
let parser = Parser::new(dialect).with_tokens(tokens);
292291
DFParser { parser }
293292
}

0 commit comments

Comments
 (0)