Skip to content

Commit db548e2

Browse files
committed
fix column id
1 parent ceb6b54 commit db548e2

File tree

2 files changed

+177
-24
lines changed

2 files changed

+177
-24
lines changed

src/query/sql/src/planner/optimizer/optimizers/eliminate_self_join.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,29 +32,25 @@ impl EliminateSelfJoinOptimizer {
3232
}
3333

3434
fn optimize_sync(&self, s_expr: &SExpr) -> Result<SExpr> {
35-
println!(
36-
"before eager aggregation: {}",
37-
s_expr.pretty_format(&self.opt_ctx.get_metadata().read())?
38-
);
35+
// `EagerAggregation` is used here as a speculative pre-rewrite to expose patterns that
36+
// `EliminateSelfJoin` can match. If no self-join is actually eliminated, we intentionally
37+
// return the original input plan to avoid keeping the eager-aggregation rewrite as a
38+
// standalone optimization.
3939
static RULES_EAGER_AGGREGATION: &[RuleID] = &[RuleID::EagerAggregation];
4040
let optimizer = RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_EAGER_AGGREGATION);
41-
let s_expr = optimizer.optimize_sync(s_expr)?;
41+
let s_expr_after_eager_aggregation = optimizer.optimize_sync(s_expr)?;
4242

43-
println!(
44-
"after eager aggregation: {}",
45-
s_expr.pretty_format(&self.opt_ctx.get_metadata().read())?
46-
);
4743
static RULES_ELIMINATE_SELF_JOIN: &[RuleID] = &[RuleID::EliminateSelfJoin];
4844
let optimizer =
4945
RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_ELIMINATE_SELF_JOIN);
50-
let s_expr = optimizer.optimize_sync(&s_expr)?;
46+
let s_expr_after_eliminate_self_join =
47+
optimizer.optimize_sync(&s_expr_after_eager_aggregation)?;
5148

52-
println!(
53-
"after eliminate self join: {}",
54-
s_expr.pretty_format(&self.opt_ctx.get_metadata().read())?
55-
);
49+
if s_expr_after_eager_aggregation.eq(&s_expr_after_eliminate_self_join) {
50+
return Ok(s_expr.clone());
51+
}
5652

57-
Ok(s_expr)
53+
Ok(s_expr_after_eliminate_self_join)
5854
}
5955
}
6056

src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_eliminate_self_join.rs

Lines changed: 166 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ use std::collections::HashSet;
1717
use std::sync::Arc;
1818

1919
use databend_common_exception::Result;
20+
use databend_common_expression::Scalar;
2021

22+
use crate::ColumnBindingBuilder;
2123
use crate::ColumnEntry;
24+
use crate::IndexType;
2225
use crate::Metadata;
2326
use crate::MetadataRef;
27+
use crate::Visibility;
2428
use crate::optimizer::ir::Matcher;
2529
use crate::optimizer::ir::SExpr;
2630
use crate::optimizer::optimizers::rule::Rule;
@@ -31,6 +35,7 @@ use crate::plans::JoinType;
3135
use crate::plans::RelOp;
3236
use crate::plans::RelOperator;
3337
use crate::plans::ScalarExpr;
38+
use crate::plans::ScalarItem;
3439
use crate::plans::Scan;
3540

3641
#[derive(Clone, Debug)]
@@ -54,10 +59,20 @@ struct GroupKeyItemSignature {
5459
column_name: String,
5560
}
5661

62+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
63+
struct AggFuncSignature {
64+
func_name: String,
65+
distinct: bool,
66+
params: Vec<Scalar>,
67+
args: Vec<GroupKeyItemSignature>,
68+
}
69+
5770
#[derive(Clone, Debug)]
5871
struct Candidate {
5972
table_id: u64,
6073
group_key: Vec<GroupKeyItemSignature>,
74+
group_key_indexes: HashMap<GroupKeyItemSignature, IndexType>,
75+
agg_func_indexes: Vec<(AggFuncSignature, IndexType)>,
6176
strict: bool,
6277
path: Vec<usize>,
6378
}
@@ -175,18 +190,27 @@ impl RuleEliminateSelfJoin {
175190
}
176191

177192
let mut remove_paths: HashSet<Vec<usize>> = HashSet::new();
193+
let mut removed_to_keep_column_mapping: HashMap<IndexType, IndexType> = HashMap::new();
178194
for (_key, group_candidates) in groups.iter() {
179195
let has_strict = group_candidates.iter().any(|c| c.strict);
180196
let has_loose = group_candidates.iter().any(|c| !c.strict);
181197
if !has_strict || !has_loose {
182198
continue;
183199
}
184200

201+
let Some(keep) = group_candidates.iter().find(|c| c.strict) else {
202+
continue;
203+
};
204+
185205
// If there is at least one strict candidate, loose candidates are redundant:
186206
// both sides are INNER JOIN on unique group keys (Aggregate), so the join acts
187207
// as a filter, and the strict side already filters stronger than the loose side.
188208
for c in group_candidates.iter().filter(|c| !c.strict) {
209+
let Some(mapping) = Self::build_removed_to_keep_index_mapping(c, keep) else {
210+
continue;
211+
};
189212
remove_paths.insert(c.path.clone());
213+
removed_to_keep_column_mapping.extend(mapping);
190214
}
191215
}
192216

@@ -195,10 +219,38 @@ impl RuleEliminateSelfJoin {
195219
}
196220

197221
let mut path = Vec::new();
198-
let rewritten = self
222+
let mut rewritten = self
199223
.remove_paths_from_inner_join_tree(original, &mut path, &remove_paths)?
200224
.unwrap_or_else(|| original.clone());
201225

226+
if !removed_to_keep_column_mapping.is_empty() {
227+
let output_columns = rewritten.derive_relational_prop()?.output_columns.clone();
228+
let mut items = Vec::new();
229+
for (old, new) in removed_to_keep_column_mapping.iter() {
230+
if old == new {
231+
continue;
232+
}
233+
if output_columns.contains(old) {
234+
continue;
235+
}
236+
if !output_columns.contains(new) {
237+
continue;
238+
}
239+
let scalar = Self::make_bound_column_ref(&metadata, *new);
240+
items.push(ScalarItem {
241+
scalar,
242+
index: *old,
243+
});
244+
}
245+
246+
if !items.is_empty() {
247+
rewritten = SExpr::create_unary(
248+
Arc::new(RelOperator::EvalScalar(crate::plans::EvalScalar { items })),
249+
Arc::new(rewritten),
250+
);
251+
}
252+
}
253+
202254
Ok(rewritten)
203255
}
204256

@@ -233,8 +285,11 @@ impl RuleEliminateSelfJoin {
233285
return None;
234286
}
235287

288+
let group_key_indexes = Self::group_key_indexes(&final_agg.group_items, metadata)?;
236289
let group_key = Self::group_key_signature(&final_agg.group_items, metadata)?;
237290

291+
let agg_func_indexes = Self::agg_func_indexes(&final_agg.aggregate_functions, metadata)?;
292+
238293
let scan = match node.child(0).ok()?.child(0).ok()?.child(0).ok()?.plan() {
239294
RelOperator::Scan(scan) => scan,
240295
_ => return None,
@@ -245,40 +300,142 @@ impl RuleEliminateSelfJoin {
245300
Some(Candidate {
246301
table_id,
247302
group_key,
303+
group_key_indexes,
304+
agg_func_indexes,
248305
strict,
249306
path: path.to_vec(),
250307
})
251308
}
252309

310+
fn group_key_item_signature(
311+
item: &crate::plans::ScalarItem,
312+
metadata: &Metadata,
313+
) -> Option<GroupKeyItemSignature> {
314+
let ScalarExpr::BoundColumnRef(col) = &item.scalar else {
315+
return None;
316+
};
317+
let ColumnEntry::BaseTableColumn(base_col) = metadata.column(col.column.index) else {
318+
return None;
319+
};
320+
Some(GroupKeyItemSignature {
321+
column_id: base_col.column_id,
322+
column_position: base_col.column_position,
323+
column_name: base_col.column_name.clone(),
324+
})
325+
}
326+
253327
fn group_key_signature(
254328
group_items: &[crate::plans::ScalarItem],
255329
metadata: &Metadata,
256330
) -> Option<Vec<GroupKeyItemSignature>> {
257331
let mut sig = Vec::with_capacity(group_items.len());
258332
for item in group_items.iter() {
259-
let used = item.scalar.used_columns();
260-
if used.len() != 1 {
333+
sig.push(Self::group_key_item_signature(item, metadata)?);
334+
}
335+
Some(sig)
336+
}
337+
338+
fn group_key_indexes(
339+
group_items: &[crate::plans::ScalarItem],
340+
metadata: &Metadata,
341+
) -> Option<HashMap<GroupKeyItemSignature, IndexType>> {
342+
let mut sig = HashMap::with_capacity(group_items.len());
343+
for item in group_items.iter() {
344+
sig.insert(Self::group_key_item_signature(item, metadata)?, item.index);
345+
}
346+
Some(sig)
347+
}
348+
349+
fn agg_func_signature(item: &ScalarItem, metadata: &Metadata) -> Option<AggFuncSignature> {
350+
let ScalarExpr::AggregateFunction(agg) = &item.scalar else {
351+
return None;
352+
};
353+
354+
let mut args = Vec::with_capacity(agg.args.len());
355+
for arg in agg.args.iter() {
356+
let ScalarExpr::BoundColumnRef(col) = arg else {
261357
return None;
262-
}
263-
let col_idx = *used.iter().next()?;
264-
let ColumnEntry::BaseTableColumn(base_col) = metadata.column(col_idx) else {
358+
};
359+
let ColumnEntry::BaseTableColumn(base_col) = metadata.column(col.column.index) else {
265360
return None;
266361
};
267-
268-
sig.push(GroupKeyItemSignature {
362+
args.push(GroupKeyItemSignature {
269363
column_id: base_col.column_id,
270364
column_position: base_col.column_position,
271365
column_name: base_col.column_name.clone(),
272366
});
273367
}
274-
Some(sig)
368+
369+
Some(AggFuncSignature {
370+
func_name: agg.func_name.clone(),
371+
distinct: agg.distinct,
372+
params: agg.params.clone(),
373+
args,
374+
})
375+
}
376+
377+
fn agg_func_indexes(
378+
agg_items: &[ScalarItem],
379+
metadata: &Metadata,
380+
) -> Option<Vec<(AggFuncSignature, IndexType)>> {
381+
let mut result = Vec::with_capacity(agg_items.len());
382+
for item in agg_items.iter() {
383+
result.push((Self::agg_func_signature(item, metadata)?, item.index));
384+
}
385+
Some(result)
275386
}
276387

277388
fn table_id_from_scan(&self, scan: &Scan, metadata: &Metadata) -> Option<u64> {
278389
let table_entry = metadata.table(scan.table_index);
279390
Some(table_entry.table().get_table_info().ident.table_id)
280391
}
281392

393+
fn build_removed_to_keep_index_mapping(
394+
remove: &Candidate,
395+
keep: &Candidate,
396+
) -> Option<HashMap<IndexType, IndexType>> {
397+
let mut mapping = HashMap::new();
398+
399+
for (sig, old_index) in remove.group_key_indexes.iter() {
400+
let new_index = *keep.group_key_indexes.get(sig)?;
401+
mapping.insert(*old_index, new_index);
402+
}
403+
404+
let mut keep_agg_by_sig: HashMap<&AggFuncSignature, Vec<IndexType>> = HashMap::new();
405+
for (sig, idx) in keep.agg_func_indexes.iter() {
406+
keep_agg_by_sig.entry(sig).or_default().push(*idx);
407+
}
408+
let mut keep_agg_pos: HashMap<&AggFuncSignature, usize> = HashMap::new();
409+
410+
for (sig, old_index) in remove.agg_func_indexes.iter() {
411+
let list = keep_agg_by_sig.get(sig)?;
412+
let pos = keep_agg_pos.entry(sig).or_insert(0);
413+
if *pos >= list.len() {
414+
return None;
415+
}
416+
mapping.insert(*old_index, list[*pos]);
417+
*pos += 1;
418+
}
419+
420+
Some(mapping)
421+
}
422+
423+
fn make_bound_column_ref(metadata: &Metadata, index: IndexType) -> ScalarExpr {
424+
let entry = metadata.column(index);
425+
let binding = ColumnBindingBuilder::new(
426+
entry.name(),
427+
index,
428+
Box::new(entry.data_type()),
429+
Visibility::Visible,
430+
)
431+
.build();
432+
crate::plans::BoundColumnRef {
433+
span: None,
434+
column: binding,
435+
}
436+
.into()
437+
}
438+
282439
fn remove_paths_from_inner_join_tree(
283440
&self,
284441
s_expr: &SExpr,

0 commit comments

Comments
 (0)