diff --git a/src/query/sql/src/planner/metadata/metadata.rs b/src/query/sql/src/planner/metadata/metadata.rs index 37be5574b551e..90e82d503762a 100644 --- a/src/query/sql/src/planner/metadata/metadata.rs +++ b/src/query/sql/src/planner/metadata/metadata.rs @@ -59,6 +59,7 @@ pub type MetadataRef = Arc>; pub struct Metadata { tables: Vec, columns: Vec, + removed_mark_indexes: ColumnSet, /// Table column indexes that are lazy materialized. table_lazy_columns: HashMap, table_source: HashMap, @@ -134,6 +135,14 @@ impl Metadata { self.columns.as_slice() } + pub fn add_removed_mark_index(&mut self, index: IndexType) { + self.removed_mark_indexes.insert(index); + } + + pub fn is_removed_mark_index(&self, index: IndexType) -> bool { + self.removed_mark_indexes.contains(&index) + } + pub fn add_retained_column(&mut self, index: IndexType) { self.retained_columns.insert(index); } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 2b9b8da721b62..4c203d87cc902 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -30,6 +30,7 @@ use crate::optimizer::ir::SExpr; use crate::optimizer::optimizers::CTEFilterPushdownOptimizer; use crate::optimizer::optimizers::CascadesOptimizer; use crate::optimizer::optimizers::DPhpyOptimizer; +use crate::optimizer::optimizers::EliminateSelfJoinOptimizer; use crate::optimizer::optimizers::distributed::BroadcastToShuffleOptimizer; use crate::optimizer::optimizers::operator::CleanupUnusedCTEOptimizer; use crate::optimizer::optimizers::operator::DeduplicateJoinConditionOptimizer; @@ -265,6 +266,8 @@ pub async fn optimize_query(opt_ctx: Arc, s_expr: SExpr) -> Re ])) // Apply DPhyp algorithm for cost-based join reordering .add(DPhpyOptimizer::new(opt_ctx.clone())) + // Eliminate self joins when possible + .add(EliminateSelfJoinOptimizer::new(opt_ctx.clone())) // After join reorder, Convert some single join to inner join. .add(SingleToInnerOptimizer::new()) // Deduplicate join conditions. diff --git a/src/query/sql/src/planner/optimizer/optimizers/eliminate_self_join.rs b/src/query/sql/src/planner/optimizer/optimizers/eliminate_self_join.rs new file mode 100644 index 0000000000000..e0b712017d333 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/eliminate_self_join.rs @@ -0,0 +1,79 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use databend_common_exception::Result; +use log::info; + +use crate::optimizer::Optimizer; +use crate::optimizer::OptimizerContext; +use crate::optimizer::ir::SExpr; +use crate::optimizer::optimizers::DPhpyOptimizer; +use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer; +use crate::optimizer::optimizers::rule::RuleID; + +pub struct EliminateSelfJoinOptimizer { + opt_ctx: Arc, +} + +impl EliminateSelfJoinOptimizer { + pub fn new(opt_ctx: Arc) -> Self { + Self { opt_ctx } + } +} + +#[async_trait::async_trait] +impl Optimizer for EliminateSelfJoinOptimizer { + fn name(&self) -> String { + "EliminateSelfJoinOptimizer".to_string() + } + + async fn optimize(&mut self, s_expr: &SExpr) -> Result { + // `EagerAggregation` is used here as a speculative pre-rewrite to expose patterns that + // `EliminateSelfJoin` can match. If no self-join is actually eliminated, we intentionally + // return the original input plan to avoid keeping the eager-aggregation rewrite as a + // standalone optimization. + let start = Instant::now(); + static RULES_EAGER_AGGREGATION: &[RuleID] = &[RuleID::EagerAggregation]; + let optimizer = RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_EAGER_AGGREGATION); + let s_expr_after_eager_aggregation = optimizer.optimize_sync(s_expr)?; + + static RULES_ELIMINATE_SELF_JOIN: &[RuleID] = &[RuleID::EliminateSelfJoin]; + let optimizer = + RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_ELIMINATE_SELF_JOIN); + let s_expr_after_eliminate_self_join = + optimizer.optimize_sync(&s_expr_after_eager_aggregation)?; + + let duration = start.elapsed(); + + if s_expr_after_eliminate_self_join == s_expr_after_eager_aggregation { + return Ok(s_expr.clone()); + } + + // EliminateSelfJoinOptimizer should ideally run before Dphyp in the optimizer pipeline. + // However, due to issues with the current EagerAggregation implementation, running it + // before Dphyp causes TPC-H Q18 optimization to fail. Therefore, EliminateSelfJoinOptimizer + // is placed after Dphyp, and we run Dphyp again here to ensure join reordering after + // eliminating self-joins. + let s_expr_after_dphyp = DPhpyOptimizer::new(self.opt_ctx.clone()) + .optimize_async(&s_expr_after_eliminate_self_join) + .await?; + + info!("EliminateSelfJoinOptimizer: {}ms", duration.as_millis()); + + Ok(s_expr_after_dphyp) + } +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/mod.rs index 6fa380d8d997b..59a16423134c0 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/mod.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/mod.rs @@ -15,6 +15,7 @@ mod cascades; pub mod cte_filter_pushdown; pub mod distributed; +mod eliminate_self_join; mod hyper_dp; pub mod operator; pub mod recursive; @@ -22,5 +23,6 @@ pub mod rule; pub use cascades::CascadesOptimizer; pub use cte_filter_pushdown::CTEFilterPushdownOptimizer; +pub use eliminate_self_join::EliminateSelfJoinOptimizer; pub use hyper_dp::DPhpyOptimizer; pub use operator::CleanupUnusedCTEOptimizer; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs index 8cc561f33a1b8..ce4b515e3affb 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs @@ -692,20 +692,22 @@ impl SubqueryDecorrelatorOptimizer { SubqueryType::Any => { let output_column = subquery.output_column.clone(); let column_name = format!("subquery_{}", output_column.index); - let left_condition = wrap_cast( - &ScalarExpr::BoundColumnRef(BoundColumnRef { - span: subquery.span, - column: ColumnBindingBuilder::new( - column_name, - output_column.index, - output_column.data_type, - Visibility::Visible, - ) - .table_index(output_column.table_index) - .build(), - }), - &subquery.data_type, - ); + let left_condition_base = ScalarExpr::BoundColumnRef(BoundColumnRef { + span: subquery.span, + column: ColumnBindingBuilder::new( + column_name, + output_column.index, + output_column.data_type, + Visibility::Visible, + ) + .table_index(output_column.table_index) + .build(), + }); + let left_condition = if left_condition_base.data_type()? == *subquery.data_type { + left_condition_base + } else { + wrap_cast(&left_condition_base, &subquery.data_type) + }; let child_expr = *subquery.child_expr.as_ref().unwrap().clone(); let op = subquery.compare_op.as_ref().unwrap().clone(); let (right_condition, is_non_equi_condition) = diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs index 198310ddef8d1..f1952586b067b 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs @@ -337,6 +337,12 @@ impl Rule for RuleEagerAggregation { if extra_eval_scalar_expr.is_some() { for eval_item in &extra_eval_scalar.items { let eval_used_columns = eval_item.scalar.used_columns(); + if eval_used_columns + .iter() + .any(|column| self.metadata.read().is_removed_mark_index(*column)) + { + continue; + } let mut resolved_by_one_child = false; join_columns.for_each_mut(|side, columns_set| { if eval_used_columns.is_subset(columns_set) { diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs index d1b700f98fc18..125564d0939cc 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs @@ -23,6 +23,7 @@ use crate::optimizer::optimizers::rule::RuleDeduplicateSort; use crate::optimizer::optimizers::rule::RuleEagerAggregation; use crate::optimizer::optimizers::rule::RuleEliminateEvalScalar; use crate::optimizer::optimizers::rule::RuleEliminateFilter; +use crate::optimizer::optimizers::rule::RuleEliminateSelfJoin; use crate::optimizer::optimizers::rule::RuleEliminateSort; use crate::optimizer::optimizers::rule::RuleEliminateUnion; use crate::optimizer::optimizers::rule::RuleFilterFlattenOr; @@ -124,6 +125,7 @@ impl RuleFactory { RuleID::EagerAggregation => Ok(Box::new(RuleEagerAggregation::new(metadata))), RuleID::PushDownPrewhere => Ok(Box::new(RulePushDownPrewhere::new(metadata))), RuleID::TryApplyAggIndex => Ok(Box::new(RuleTryApplyAggIndex::new(metadata))), + RuleID::EliminateSelfJoin => Ok(Box::new(RuleEliminateSelfJoin::new(ctx))), RuleID::EliminateSort => Ok(Box::new(RuleEliminateSort::new())), RuleID::DeduplicateSort => Ok(Box::new(RuleDeduplicateSort::new())), RuleID::SemiToInnerJoin => Ok(Box::new(RuleSemiToInnerJoin::new())), diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_eliminate_filter.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_eliminate_filter.rs index 40ca36677edf5..858d4b660b324 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_eliminate_filter.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_eliminate_filter.rs @@ -111,6 +111,28 @@ impl Rule for RuleEliminateFilter { true } } + ScalarExpr::FunctionCall(func) + if func.func_name == "is_true" + && func.arguments.len() == 1 + && matches!(func.arguments[0], ScalarExpr::FunctionCall(_)) => + { + let ScalarExpr::FunctionCall(inner) = &func.arguments[0] else { + return true; + }; + if inner.func_name != "eq" || inner.arguments.len() != 2 { + return true; + } + if let ( + ScalarExpr::BoundColumnRef(left_col), + ScalarExpr::BoundColumnRef(right_col), + ) = (&inner.arguments[0], &inner.arguments[1]) + { + left_col.column.index != right_col.column.index + || left_col.column.data_type.is_nullable() + } else { + true + } + } _ => true, }) .collect::>(); diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs index 986d2e157e23e..99f0fd9d32694 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs @@ -82,7 +82,7 @@ impl Rule for RulePushDownFilterJoin { let (s_expr, outer_to_inner) = outer_join_to_inner_join(s_expr, self.metadata.clone())?; // Second, check if can convert mark join to semi join - let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr)?; + let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr, self.metadata.clone())?; if s_expr.plan().rel_op() != RelOp::Filter { state.add_result(s_expr); return Ok(()); diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs index fba55fa30df63..9454d18f3d79b 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs @@ -15,6 +15,7 @@ mod push_down_filter_join; mod rule_commute_join; mod rule_commute_join_base_table; +mod rule_eliminate_self_join; mod rule_left_exchange_join; mod rule_semi_to_inner_join; mod util; @@ -22,6 +23,7 @@ mod util; pub use push_down_filter_join::*; pub use rule_commute_join::RuleCommuteJoin; pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable; +pub use rule_eliminate_self_join::RuleEliminateSelfJoin; pub use rule_left_exchange_join::RuleLeftExchangeJoin; pub use rule_semi_to_inner_join::RuleSemiToInnerJoin; pub use util::get_join_predicates; diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/mark_join_to_semi_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/mark_join_to_semi_join.rs index 233f7bd71d6e8..cb5e61a42db7c 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/mark_join_to_semi_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/mark_join_to_semi_join.rs @@ -16,13 +16,14 @@ use std::sync::Arc; use databend_common_exception::Result; +use crate::MetadataRef; use crate::ScalarExpr; use crate::optimizer::ir::SExpr; use crate::plans::Filter; use crate::plans::Join; use crate::plans::JoinType; -pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> { +pub fn convert_mark_to_semi_join(s_expr: &SExpr, metadata: MetadataRef) -> Result<(SExpr, bool)> { let mut filter: Filter = s_expr.plan().clone().try_into()?; let mut join: Join = s_expr.child(0)?.plan().clone().try_into()?; @@ -34,7 +35,9 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> { return Ok((s_expr.clone(), false)); } - let mark_index = join.marker_index.unwrap(); + let Some(mark_index) = join.marker_index else { + return Ok((s_expr.clone(), false)); + }; let mut find_mark_index = false; // remove mark index filter @@ -65,6 +68,8 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> { _ => unreachable!(), }; + metadata.write().add_removed_mark_index(mark_index); + // clear is null equal sign join.equi_conditions.iter_mut().for_each(|c| { c.is_null_equal = false; diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_eliminate_self_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_eliminate_self_join.rs new file mode 100644 index 0000000000000..82fce0447fbd4 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_eliminate_self_join.rs @@ -0,0 +1,705 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::Scalar; + +use crate::ColumnBinding; +use crate::ColumnBindingBuilder; +use crate::ColumnEntry; +use crate::ColumnSet; +use crate::IndexType; +use crate::Metadata; +use crate::MetadataRef; +use crate::Visibility; +use crate::optimizer::OptimizerContext; +use crate::optimizer::ir::Matcher; +use crate::optimizer::ir::SExpr; +use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer; +use crate::optimizer::optimizers::rule::Rule; +use crate::optimizer::optimizers::rule::RuleID; +use crate::optimizer::optimizers::rule::TransformResult; +use crate::plans::ComparisonOp; +use crate::plans::Filter; +use crate::plans::FunctionCall; +use crate::plans::Join; +use crate::plans::JoinEquiCondition; +use crate::plans::JoinType; +use crate::plans::RelOp; +use crate::plans::RelOperator; +use crate::plans::ScalarExpr; +use crate::plans::ScalarItem; +use crate::plans::Scan; + +#[derive(Clone, Debug)] +struct MultiJoin { + relations: Vec>, + equi_conditions: Vec, + non_equi_conditions: Vec, +} + +impl MultiJoin { + fn build_equivalence_classes(&self) -> Result { + let mut uf = UnionFind::default(); + for cond in self.equi_conditions.iter() { + if let (ScalarExpr::BoundColumnRef(l), ScalarExpr::BoundColumnRef(r)) = + (&cond.left, &cond.right) + { + uf.union(l.column.index, r.column.index); + } + } + Ok(uf) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct GroupKeyItemSignature { + column_id: Option, + column_position: Option, + column_name: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct AggFuncSignature { + func_name: String, + distinct: bool, + params: Vec, + args: Vec, +} + +#[derive(Clone, Debug)] +struct Candidate { + table_id: u64, + group_key: GroupKeyItemSignature, + group_key_index: IndexType, + agg_func_indexes: Vec<(AggFuncSignature, IndexType)>, + /// Scalar items introduced above the (Filter +) Aggregate chain. + /// + /// If the corresponding relation is eliminated, these derived columns must be + /// recreated on top of the kept relation; otherwise parent operators may + /// reference missing columns. + extra_scalar_items: Vec, + strict: bool, + relation_idx: usize, +} + +pub struct RuleEliminateSelfJoin { + id: RuleID, + matchers: Vec, + opt_ctx: Arc, +} + +impl RuleEliminateSelfJoin { + pub fn new(opt_ctx: Arc) -> Self { + Self { + id: RuleID::EliminateSelfJoin, + matchers: vec![Matcher::MatchOp { + op_type: RelOp::Join, + children: vec![Matcher::Leaf, Matcher::Leaf], + }], + opt_ctx, + } + } + + fn core_candidate_matcher(&self) -> Matcher { + Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::MatchOp { + op_type: RelOp::EvalScalar, + children: vec![Matcher::MatchOp { + op_type: RelOp::Scan, + children: vec![], + }], + }], + }], + } + } + + fn extract_multi_join(&self, s_expr: &SExpr) -> Result> { + match s_expr.plan() { + RelOperator::Join(join) + if matches!(join.join_type, JoinType::Inner) && !join.has_null_equi_condition() => + { + let mut relations = Vec::new(); + let mut equi_conditions = join.equi_conditions.clone(); + let mut non_equi_conditions = join.non_equi_conditions.clone(); + + match self.extract_multi_join(s_expr.child(0)?)? { + Some(mut left) => { + relations.append(&mut left.relations); + equi_conditions.append(&mut left.equi_conditions); + non_equi_conditions.append(&mut left.non_equi_conditions); + } + None => { + relations.push(Arc::new(s_expr.child(0)?.clone())); + } + } + + match self.extract_multi_join(s_expr.child(1)?)? { + Some(mut right) => { + relations.append(&mut right.relations); + equi_conditions.append(&mut right.equi_conditions); + non_equi_conditions.append(&mut right.non_equi_conditions); + } + None => { + relations.push(Arc::new(s_expr.child(1)?.clone())); + } + } + + Ok(Some(MultiJoin { + relations, + equi_conditions, + non_equi_conditions, + })) + } + _ => Ok(None), + } + } + + fn eliminate_multi_join(&self, multi_join: MultiJoin, original: &SExpr) -> Result { + let mut eq_classes = multi_join.build_equivalence_classes()?; + let metadata_ref: MetadataRef = self.opt_ctx.get_metadata(); + let metadata = metadata_ref.read(); + + let mut candidates = Vec::new(); + for (relation_idx, relation) in multi_join.relations.iter().enumerate() { + if let Some(candidate) = self.try_parse_candidate(relation, relation_idx, &metadata) { + candidates.push(candidate); + } + } + + let mut groups: HashMap<(u64, GroupKeyItemSignature), Vec> = HashMap::new(); + for candidate in candidates.into_iter() { + groups + .entry((candidate.table_id, candidate.group_key.clone())) + .or_default() + .push(candidate); + } + + let mut remove_relations: HashSet = HashSet::new(); + let mut removed_to_keep_column_mapping: HashMap = HashMap::new(); + let mut extra_scalar_items: Vec = Vec::new(); + for (_key, group_candidates) in groups.iter() { + let has_strict = group_candidates.iter().any(|c| c.strict); + let has_loose = group_candidates.iter().any(|c| !c.strict); + if !has_strict || !has_loose { + continue; + } + + let Some(keep) = group_candidates.iter().find(|c| c.strict) else { + continue; + }; + + // If there is at least one strict candidate, loose candidates are redundant: + // both sides are INNER JOIN on unique group keys (Aggregate), so the join acts + // as a filter, and the strict side already filters stronger than the loose side. + for c in group_candidates.iter().filter(|c| !c.strict) { + let Some(mapping) = Self::build_removed_to_keep_index_mapping(c, keep) else { + continue; + }; + + if !eq_classes.same(c.group_key_index, keep.group_key_index) { + continue; + } + + let Some(rewritten_extra_items) = + Self::rewrite_extra_scalar_items(&c.extra_scalar_items, &mapping, &metadata) + else { + continue; + }; + remove_relations.insert(c.relation_idx); + removed_to_keep_column_mapping.extend(mapping); + extra_scalar_items.extend(rewritten_extra_items); + } + } + + if remove_relations.is_empty() { + return Ok(original.clone()); + } + + let Some(rewritten) = self.build_new_join_tree( + &multi_join.relations, + &multi_join.equi_conditions, + &multi_join.non_equi_conditions, + &remove_relations, + &removed_to_keep_column_mapping, + &metadata, + )? + else { + return Ok(original.clone()); + }; + + let rewritten = + Self::project_columns(rewritten, &removed_to_keep_column_mapping, &metadata)?; + let rewritten = Self::add_extra_scalar_items(rewritten, &extra_scalar_items, &metadata)?; + + Ok(rewritten) + } + + fn rewrite_extra_scalar_items( + items: &[ScalarItem], + mapping: &HashMap, + metadata: &Metadata, + ) -> Option> { + if items.is_empty() { + return Some(vec![]); + } + + let mut available_columns: ColumnSet = mapping.values().copied().collect(); + + let mut rewritten = Vec::with_capacity(items.len()); + for item in items.iter() { + let mut scalar = item.scalar.clone(); + for (old, new) in mapping.iter() { + if old == new { + continue; + } + let new_column = Self::make_column_binding(metadata, *new); + if scalar.replace_column_binding(*old, &new_column).is_err() { + return None; + } + } + + // After rewriting, the expression should be evaluable solely from the kept side's + // mapped columns. If not, eliminating this relation would drop required inputs. + if !scalar.used_columns().is_subset(&available_columns) { + return None; + } + rewritten.push(ScalarItem { + scalar, + index: item.index, + }); + available_columns.insert(item.index); + } + Some(rewritten) + } + + fn try_parse_candidate( + &self, + relation: &SExpr, + relation_idx: usize, + metadata: &Metadata, + ) -> Option { + let mut strict = false; + let mut node = relation; + let mut extra_scalar_items = Vec::new(); + + if matches!(node.plan(), RelOperator::EvalScalar(_)) { + let eval_scalar = node.plan().as_eval_scalar().unwrap(); + let child = node.unary_child(); + let child_output_columns = child.derive_relational_prop().ok()?.output_columns.clone(); + + // Collect only those items that introduce new output column indices. + let mut new_items = Vec::new(); + for item in eval_scalar.items.iter() { + if !child_output_columns.contains(&item.index) { + new_items.push(item.clone()); + } + } + extra_scalar_items.extend(new_items); + node = node.unary_child(); + } + + while matches!(node.plan(), RelOperator::Filter(_)) { + strict = true; + node = node.unary_child(); + } + + if !self.core_candidate_matcher().matches(node) { + return None; + } + + let final_agg = node.plan().as_aggregate().unwrap(); + + if final_agg.group_items.len() != 1 { + return None; + } + + let group_key = Self::group_key_signature(&final_agg.group_items[0], metadata)?; + let group_key_index = final_agg.group_items[0].index; + + let agg_func_indexes = Self::agg_func_indexes(&final_agg.aggregate_functions, metadata)?; + + let scan = node + .unary_child() + .unary_child() + .unary_child() + .plan() + .as_scan() + .unwrap(); + + if scan + .push_down_predicates + .as_ref() + .is_some_and(|v| !v.is_empty()) + || scan.limit.is_some() + || scan.order_by.as_ref().is_some_and(|v| !v.is_empty()) + || scan.prewhere.is_some() + || scan.agg_index.is_some() + || scan.change_type.is_some() + || scan.update_stream_columns + || scan.inverted_index.is_some() + || scan.vector_index.is_some() + || scan.is_lazy_table + { + return None; + } + + let table_id = self.table_id_from_scan(scan, metadata)?; + + Some(Candidate { + table_id, + group_key, + group_key_index, + agg_func_indexes, + extra_scalar_items, + strict, + relation_idx, + }) + } + + fn group_key_signature( + item: &crate::plans::ScalarItem, + metadata: &Metadata, + ) -> Option { + let ScalarExpr::BoundColumnRef(col) = &item.scalar else { + return None; + }; + let ColumnEntry::BaseTableColumn(base_col) = metadata.column(col.column.index) else { + return None; + }; + Some(GroupKeyItemSignature { + column_id: base_col.column_id, + column_position: base_col.column_position, + column_name: base_col.column_name.clone(), + }) + } + + fn agg_func_signature(item: &ScalarItem, metadata: &Metadata) -> Option { + let ScalarExpr::AggregateFunction(agg) = &item.scalar else { + return None; + }; + + let mut args = Vec::with_capacity(agg.args.len()); + for arg in agg.args.iter() { + let ScalarExpr::BoundColumnRef(col) = arg else { + return None; + }; + let ColumnEntry::BaseTableColumn(base_col) = metadata.column(col.column.index) else { + return None; + }; + args.push(GroupKeyItemSignature { + column_id: base_col.column_id, + column_position: base_col.column_position, + column_name: base_col.column_name.clone(), + }); + } + + Some(AggFuncSignature { + func_name: agg.func_name.clone(), + distinct: agg.distinct, + params: agg.params.clone(), + args, + }) + } + + fn agg_func_indexes( + agg_items: &[ScalarItem], + metadata: &Metadata, + ) -> Option> { + let mut result = Vec::with_capacity(agg_items.len()); + for item in agg_items.iter() { + result.push((Self::agg_func_signature(item, metadata)?, item.index)); + } + Some(result) + } + + fn table_id_from_scan(&self, scan: &Scan, metadata: &Metadata) -> Option { + let table_entry = metadata.table(scan.table_index); + Some(table_entry.table().get_table_info().ident.table_id) + } + + fn build_removed_to_keep_index_mapping( + remove: &Candidate, + keep: &Candidate, + ) -> Option> { + let mut mapping = HashMap::new(); + + mapping.insert(remove.group_key_index, keep.group_key_index); + + let mut keep_agg_by_sig: HashMap<&AggFuncSignature, Vec> = HashMap::new(); + for (sig, idx) in keep.agg_func_indexes.iter() { + keep_agg_by_sig.entry(sig).or_default().push(*idx); + } + let mut keep_agg_pos: HashMap<&AggFuncSignature, usize> = HashMap::new(); + + for (sig, old_index) in remove.agg_func_indexes.iter() { + let list = keep_agg_by_sig.get(sig)?; + let pos = keep_agg_pos.entry(sig).or_insert(0); + if *pos >= list.len() { + return None; + } + mapping.insert(*old_index, list[*pos]); + *pos += 1; + } + + Some(mapping) + } + + fn make_column_binding(metadata: &Metadata, index: IndexType) -> ColumnBinding { + let entry = metadata.column(index); + ColumnBindingBuilder::new( + entry.name(), + index, + Box::new(entry.data_type()), + Visibility::Visible, + ) + .build() + } + + fn make_bound_column_ref(metadata: &Metadata, index: IndexType) -> ScalarExpr { + let binding = Self::make_column_binding(metadata, index); + crate::plans::BoundColumnRef { + span: None, + column: binding, + } + .into() + } + + fn project_columns( + rewritten: SExpr, + column_mapping: &HashMap, + metadata: &Metadata, + ) -> Result { + let output_columns = rewritten.derive_relational_prop()?.output_columns.clone(); + let mut items = Vec::new(); + let mut has_mapping = false; + + for col in output_columns.iter() { + let scalar = Self::make_bound_column_ref(metadata, *col); + + for (old, new) in column_mapping.iter() { + if new == col { + has_mapping = true; + items.push(ScalarItem { + scalar: scalar.clone(), + index: *old, + }); + } + } + + items.push(ScalarItem { + scalar, + index: *col, + }); + } + + if !has_mapping { + return Ok(rewritten); + } + + Ok(SExpr::create_unary( + Arc::new(RelOperator::EvalScalar(crate::plans::EvalScalar { items })), + Arc::new(rewritten), + )) + } + + fn add_extra_scalar_items( + rewritten: SExpr, + extra_scalar_items: &[ScalarItem], + metadata: &Metadata, + ) -> Result { + if extra_scalar_items.is_empty() { + return Ok(rewritten); + } + + let output_columns = rewritten.derive_relational_prop()?.output_columns.clone(); + let mut items = Vec::with_capacity(output_columns.len() + extra_scalar_items.len()); + for col in output_columns.iter() { + items.push(ScalarItem { + scalar: Self::make_bound_column_ref(metadata, *col), + index: *col, + }); + } + + for item in extra_scalar_items.iter() { + if output_columns.contains(&item.index) { + continue; + } + items.push(item.clone()); + } + + Ok(SExpr::create_unary( + Arc::new(RelOperator::EvalScalar(crate::plans::EvalScalar { items })), + Arc::new(rewritten), + )) + } + + fn build_new_join_tree( + &self, + relations: &[Arc], + equi_conditions: &[JoinEquiCondition], + non_equi_conditions: &[ScalarExpr], + remove_relations: &HashSet, + mapping: &HashMap, + metadata: &Metadata, + ) -> Result> { + let mut kept_relations: Vec> = relations + .iter() + .enumerate() + .filter_map(|(idx, rel)| { + if remove_relations.contains(&idx) { + None + } else { + Some(rel.clone()) + } + }) + .collect(); + + if kept_relations.is_empty() { + return Ok(None); + } + + // Step 1: replace column bindings in join conditions. + let mut equi_conditions = equi_conditions.to_vec(); + let mut non_equi_conditions = non_equi_conditions.to_vec(); + for (old, new) in mapping.iter() { + if old == new { + continue; + } + let new_column = Self::make_column_binding(metadata, *new); + for cond in equi_conditions.iter_mut() { + cond.left.replace_column_binding(*old, &new_column)?; + cond.right.replace_column_binding(*old, &new_column)?; + } + for cond in non_equi_conditions.iter_mut() { + cond.replace_column_binding(*old, &new_column)?; + } + } + + // Step 2: rebuild a join tree from remaining relations (structure only). + let mut join_tree = kept_relations.remove(0); + for rel in kept_relations.into_iter() { + join_tree = Arc::new(SExpr::create_binary( + Arc::new(RelOperator::Join(Join::default())), + join_tree, + rel, + )); + } + + // Step 3: put all conditions into a single Filter above the join tree. + let mut predicates: Vec = Vec::with_capacity( + equi_conditions + .len() + .saturating_add(non_equi_conditions.len()), + ); + for cond in equi_conditions.into_iter() { + predicates.push(ScalarExpr::FunctionCall(FunctionCall { + span: None, + func_name: String::from(ComparisonOp::Equal.to_func_name()), + params: vec![], + arguments: vec![cond.left, cond.right], + })); + } + predicates.extend(non_equi_conditions); + + // Safety check: all predicates must be evaluable from the rebuilt tree. + if !predicates.is_empty() { + let output_columns = join_tree.derive_relational_prop()?.output_columns.clone(); + for pred in predicates.iter() { + if !pred.used_columns().is_subset(&output_columns) { + return Ok(None); + } + } + } + + let root = if predicates.is_empty() { + join_tree.as_ref().clone() + } else { + SExpr::create_unary( + Arc::new(RelOperator::Filter(Filter { predicates })), + join_tree, + ) + }; + + let root = self.push_down_filter(root)?; + Ok(Some(root)) + } + + fn push_down_filter(&self, root: SExpr) -> Result { + static RULES: &[RuleID] = &[RuleID::PushDownFilterJoin, RuleID::EliminateFilter]; + let optimizer = RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES); + optimizer.optimize_sync(&root) + } +} + +#[derive(Default, Clone, Debug)] +struct UnionFind { + parent: HashMap, +} + +impl UnionFind { + fn find(&mut self, x: IndexType) -> IndexType { + let parent = *self.parent.get(&x).unwrap_or(&x); + if parent == x { + self.parent.insert(x, x); + return x; + } + let root = self.find(parent); + self.parent.insert(x, root); + root + } + + fn union(&mut self, a: IndexType, b: IndexType) { + let ra = self.find(a); + let rb = self.find(b); + if ra == rb { + return; + } + self.parent.insert(rb, ra); + } + + fn same(&mut self, a: IndexType, b: IndexType) -> bool { + self.find(a) == self.find(b) + } +} + +impl Rule for RuleEliminateSelfJoin { + fn id(&self) -> RuleID { + self.id + } + + fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> { + if let Some(multi_join) = self.extract_multi_join(s_expr)? { + let result = self.eliminate_multi_join(multi_join, s_expr)?; + if result.ne(s_expr) { + let mut result = result; + result.set_applied_rule(&self.id); + state.add_result(result); + } + } + + Ok(()) + } + + fn matchers(&self) -> &[Matcher] { + &self.matchers + } +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs index 9eece3b4e9a92..2c690428ce4e3 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs @@ -134,6 +134,7 @@ pub enum RuleID { FoldCountAggregate, PushDownPrewhere, TryApplyAggIndex, + EliminateSelfJoin, CommuteJoin, // Exploration rules @@ -184,6 +185,7 @@ impl Display for RuleID { RuleID::SplitAggregate => write!(f, "SplitAggregate"), RuleID::FoldCountAggregate => write!(f, "FoldCountAggregate"), RuleID::PushDownPrewhere => write!(f, "PushDownPrewhere"), + RuleID::EliminateSelfJoin => write!(f, "EliminateSelfJoin"), RuleID::CommuteJoin => write!(f, "CommuteJoin"), RuleID::CommuteJoinBaseTable => write!(f, "CommuteJoinBaseTable"), diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index edccf83755409..7a1fe6cd3147b 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -270,6 +270,30 @@ impl ScalarExpr { Ok(()) } + pub fn replace_column_binding( + &mut self, + old: IndexType, + new_column: &ColumnBinding, + ) -> Result<()> { + struct ReplaceColumnBindingVisitor<'a> { + old: IndexType, + new_column: &'a ColumnBinding, + } + + impl VisitorMut<'_> for ReplaceColumnBindingVisitor<'_> { + fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> { + if col.column.index == self.old { + col.column = self.new_column.clone(); + } + Ok(()) + } + } + + let mut visitor = ReplaceColumnBindingVisitor { old, new_column }; + visitor.visit(self)?; + Ok(()) + } + pub fn replace_sub_scalar( &mut self, from_scalar: ScalarExpr, diff --git a/tests/sqllogictests/suites/mode/standalone/explain/eliminate_self_join.test b/tests/sqllogictests/suites/mode/standalone/explain/eliminate_self_join.test new file mode 100644 index 0000000000000..70ad481b12702 --- /dev/null +++ b/tests/sqllogictests/suites/mode/standalone/explain/eliminate_self_join.test @@ -0,0 +1,504 @@ +statement ok +drop database if exists eliminate_self_join + +statement ok +create database eliminate_self_join + +statement ok +use eliminate_self_join + +statement ok +create table t1(c1 int not null, c2 int not null) as select number, number % 10 from numbers(100) + +statement ok +create table t2(c1 int not null, c2 int not null) as select number, number % 7 from numbers(100) + +query T +explain +select t3.c1, t3.c2 +from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + having sum(c2) = 1 +) as t3 +join ( + select c1, sum(c2) as c2 + from t1 + group by c1 +) as t4 +on t3.c1 = t4.c1 +---- +Filter +├── output columns: [sum(c2) (#2), t1.c1 (#0)] +├── filters: [is_true(sum(c2) (#2) = 1)] +├── estimated rows: 50.00 +└── AggregateFinal + ├── output columns: [sum(c2) (#2), t1.c1 (#0)] + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 100.00 + └── AggregatePartial + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 100.00 + └── TableScan + ├── table: default.eliminate_self_join.t1 + ├── scan id: 0 + ├── output columns: [c1 (#0), c2 (#1)] + ├── read rows: 100 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 100.00 + + +statement ok +create table t_nullable(c1 int null, c2 int not null) + +statement ok +insert into t_nullable values (1, 10), (2, 20), (3, 30), (null, 40), (null, 50) + + +query T +explain +select t3.c1, t3.c2 +from ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 + having sum(c2) = 10 +) as t3 +join ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 +) as t4 +on t3.c1 = t4.c1 +---- +Filter +├── output columns: [sum(c2) (#2), t_nullable.c1 (#0)] +├── filters: [is_true(c1 (#0) = t3.c1 (#0))] +├── estimated rows: 0.30 +└── Filter + ├── output columns: [sum(c2) (#2), t_nullable.c1 (#0)] + ├── filters: [is_true(sum(c2) (#2) = 10)] + ├── estimated rows: 1.50 + └── AggregateFinal + ├── output columns: [sum(c2) (#2), t_nullable.c1 (#0)] + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 3.00 + └── AggregatePartial + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 3.00 + └── TableScan + ├── table: default.eliminate_self_join.t_nullable + ├── scan id: 0 + ├── output columns: [c1 (#0), c2 (#1)] + ├── read rows: 5 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 5.00 + +query II +select t3.c1, t3.c2 +from ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 + having sum(c2) = 10 +) as t3 +join ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 +) as t4 +on t3.c1 = t4.c1 +---- +1 10 + +query II +select t3.c1, t3.c2 +from ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 + having sum(c2) > 0 +) as t3 +join ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 +) as t4 +on t3.c1 = t4.c1 +order by t3.c1 +---- +1 10 +2 20 +3 30 + +query T +explain +select t3.c1, t3.c2 +from ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 + having sum(c2) = 10 +) as t3 +join ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 + having sum(c2) > 0 +) as t4 +on t3.c1 = t4.c1 +---- +HashJoin +├── output columns: [sum(c2) (#2), t_nullable.c1 (#0)] +├── join type: INNER +├── build keys: [t4.c1 (#3)] +├── probe keys: [t3.c1 (#0)] +├── keys is null equal: [false] +├── filters: [] +├── build join filters: +│ └── filter id:0, build key:t4.c1 (#3), probe targets:[t3.c1 (#0)@scan0], filter type:bloom,inlist,min_max +├── estimated rows: 1.12 +├── Filter(Build) +│ ├── output columns: [t_nullable.c1 (#3)] +│ ├── filters: [is_true(sum(c2) (#5) > 0)] +│ ├── estimated rows: 1.50 +│ └── AggregateFinal +│ ├── output columns: [sum(c2) (#5), t_nullable.c1 (#3)] +│ ├── group by: [c1] +│ ├── aggregate functions: [sum(c2)] +│ ├── estimated rows: 3.00 +│ └── AggregatePartial +│ ├── group by: [c1] +│ ├── aggregate functions: [sum(c2)] +│ ├── estimated rows: 3.00 +│ └── TableScan +│ ├── table: default.eliminate_self_join.t_nullable +│ ├── scan id: 1 +│ ├── output columns: [c1 (#3), c2 (#4)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter(Probe) + ├── output columns: [sum(c2) (#2), t_nullable.c1 (#0)] + ├── filters: [is_true(sum(c2) (#2) = 10)] + ├── estimated rows: 1.50 + └── AggregateFinal + ├── output columns: [sum(c2) (#2), t_nullable.c1 (#0)] + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 3.00 + └── AggregatePartial + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 3.00 + └── TableScan + ├── table: default.eliminate_self_join.t_nullable + ├── scan id: 0 + ├── output columns: [c1 (#0), c2 (#1)] + ├── read rows: 5 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + ├── apply join filters: [#0] + └── estimated rows: 5.00 + +query II +select t3.c1, t3.c2 +from ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 + having sum(c2) = 10 +) as t3 +join ( + select c1, sum(c2) as c2 + from t_nullable + group by c1 + having sum(c2) > 0 +) as t4 +on t3.c1 = t4.c1 +---- +1 10 + +# Test case: operators above Join reference columns from the eliminated table (t4) +# When t4 is eliminated, t4.c1 and t4.c2 should be mapped to t3.c1 and t3.c2 +query T +explain +select t3.c1, t3.c2, t4.c1, t4.c2 +from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + having sum(c2) = 1 +) as t3 +join ( + select c1, sum(c2) as c2 + from t1 + group by c1 +) as t4 +on t3.c1 = t4.c1 +---- +EvalScalar +├── output columns: [sum(c2) (#2), t1.c1 (#0), t1.c1 (#3), sum(c2) (#5)] +├── expressions: [c1 (#0), sum(c2) (#2)] +├── estimated rows: 50.00 +└── Filter + ├── output columns: [sum(c2) (#2), t1.c1 (#0)] + ├── filters: [is_true(sum(c2) (#2) = 1)] + ├── estimated rows: 50.00 + └── AggregateFinal + ├── output columns: [sum(c2) (#2), t1.c1 (#0)] + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 100.00 + └── AggregatePartial + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 100.00 + └── TableScan + ├── table: default.eliminate_self_join.t1 + ├── scan id: 0 + ├── output columns: [c1 (#0), c2 (#1)] + ├── read rows: 100 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 100.00 + +# Verify execution result: t4.c1 and t4.c2 should equal t3.c1 and t3.c2 +query IIII +select t3.c1, t3.c2, t4.c1, t4.c2 +from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + having sum(c2) = 1 +) as t3 +join ( + select c1, sum(c2) as c2 + from t1 + group by c1 +) as t4 +on t3.c1 = t4.c1 +order by t3.c1 +---- +1 1 1 1 +11 1 11 1 +21 1 21 1 +31 1 31 1 +41 1 41 1 +51 1 51 1 +61 1 61 1 +71 1 71 1 +81 1 81 1 +91 1 91 1 + +# Regression: if the eliminated side has an EvalScalar above the Aggregate that +# introduces new columns (e.g. `sum(c2)+1`), the elimination rewrite currently +# must recreate those derived column indices; otherwise parent operators may +# reference missing columns. +query T +explain select t3.c1, t3.c2, t4.c3 +from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + having sum(c2) = 1 +) as t3 +join ( + select c1, c2, c2 + 1 as c3 + from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + ) as s +) as t4 +on t3.c1 = t4.c1 +order by t3.c1 +---- +Sort(Single) +├── output columns: [sum(c2) (#2), t1.c1 (#0), c3 (#6)] +├── sort keys: [c1 ASC NULLS LAST] +├── estimated rows: 50.00 +└── EvalScalar + ├── output columns: [sum(c2) (#2), t1.c1 (#0), c3 (#6)] + ├── expressions: [sum(c2) (#2) + 1] + ├── estimated rows: 50.00 + └── Filter + ├── output columns: [sum(c2) (#2), t1.c1 (#0)] + ├── filters: [is_true(sum(c2) (#2) = 1)] + ├── estimated rows: 50.00 + └── AggregateFinal + ├── output columns: [sum(c2) (#2), t1.c1 (#0)] + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 100.00 + └── AggregatePartial + ├── group by: [c1] + ├── aggregate functions: [sum(c2)] + ├── estimated rows: 100.00 + └── TableScan + ├── table: default.eliminate_self_join.t1 + ├── scan id: 0 + ├── output columns: [c1 (#0), c2 (#1)] + ├── read rows: 100 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 100.00 + + +query III +select t3.c1, t3.c2, t4.c3 +from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + having sum(c2) = 1 +) as t3 +join ( + select c1, c2, c2 + 1 as c3 + from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + ) as s +) as t4 +on t3.c1 = t4.c1 +order by t3.c1 +---- +1 1 2 +11 1 2 +21 1 2 +31 1 2 +41 1 2 +51 1 2 +61 1 2 +71 1 2 +81 1 2 +91 1 2 + +query T +explain +select t3.c1, t3.c2, t4.c3 +from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + having sum(c2) = 1 +) as t3 +join ( + select c1, sum(c2) as c2, max(c2) as c3 + from t1 + group by c1 +) as t4 +on t3.c1 = t4.c1 +---- +HashJoin +├── output columns: [max(c2) (#6), sum(c2) (#2), t1.c1 (#0)] +├── join type: INNER +├── build keys: [t3.c1 (#0)] +├── probe keys: [t4.c1 (#3)] +├── keys is null equal: [false] +├── filters: [] +├── build join filters: +│ └── filter id:0, build key:t3.c1 (#0), probe targets:[t4.c1 (#3)@scan1], filter type:bloom,inlist,min_max +├── estimated rows: 50.00 +├── Filter(Build) +│ ├── output columns: [sum(c2) (#2), t1.c1 (#0)] +│ ├── filters: [is_true(sum(c2) (#2) = 1)] +│ ├── estimated rows: 50.00 +│ └── AggregateFinal +│ ├── output columns: [sum(c2) (#2), t1.c1 (#0)] +│ ├── group by: [c1] +│ ├── aggregate functions: [sum(c2)] +│ ├── estimated rows: 100.00 +│ └── AggregatePartial +│ ├── group by: [c1] +│ ├── aggregate functions: [sum(c2)] +│ ├── estimated rows: 100.00 +│ └── TableScan +│ ├── table: default.eliminate_self_join.t1 +│ ├── scan id: 0 +│ ├── output columns: [c1 (#0), c2 (#1)] +│ ├── read rows: 100 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [], limit: NONE] +│ └── estimated rows: 100.00 +└── AggregateFinal(Probe) + ├── output columns: [max(c2) (#6), t1.c1 (#3)] + ├── group by: [c1] + ├── aggregate functions: [max(c2)] + ├── estimated rows: 100.00 + └── AggregatePartial + ├── group by: [c1] + ├── aggregate functions: [max(c2)] + ├── estimated rows: 100.00 + └── TableScan + ├── table: default.eliminate_self_join.t1 + ├── scan id: 1 + ├── output columns: [c1 (#3), c2 (#4)] + ├── read rows: 100 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + ├── apply join filters: [#0] + └── estimated rows: 100.00 + +query III +select t3.c1, t3.c2, t4.c3 +from ( + select c1, sum(c2) as c2 + from t1 + group by c1 + having sum(c2) = 1 +) as t3 +join ( + select c1, sum(c2) as c2, max(c2) as c3 + from t1 + group by c1 +) as t4 +on t3.c1 = t4.c1 +order by t3.c1 +---- +1 1 1 +11 1 1 +21 1 1 +31 1 1 +41 1 1 +51 1 1 +61 1 1 +71 1 1 +81 1 1 +91 1 1 + +statement ok +drop database eliminate_self_join diff --git a/tests/sqllogictests/suites/tpch/join_order.test b/tests/sqllogictests/suites/tpch/join_order.test index 9113f51ca95db..ac6b0ae715164 100644 --- a/tests/sqllogictests/suites/tpch/join_order.test +++ b/tests/sqllogictests/suites/tpch/join_order.test @@ -32,7 +32,7 @@ statement ok analyze table supplier # Q1 -query I +query T explain join select l_returnflag, l_linestatus, @@ -58,7 +58,7 @@ order by Scan: default.tpch_test.lineitem (#0) (read rows: 6001215) # Q2 -query I +query T explain join select s_acctbal, s_name, @@ -138,7 +138,7 @@ HashJoin: INNER └── Scan: default.tpch_test.partsupp (#5) (read rows: 800000) # Q3 -query I +query T explain join select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, @@ -174,7 +174,7 @@ HashJoin: INNER └── Scan: default.tpch_test.lineitem (#2) (read rows: 6001215) # Q4 -query I +query T explain join select o_orderpriority, count(*) as order_count @@ -204,7 +204,7 @@ HashJoin: RIGHT SEMI └── Scan: default.tpch_test.lineitem (#1) (read rows: 6001215) # Q5 -query I +query T explain join select n_name, ceil(sum(l_extendedprice * (1 - l_discount)) / 10) as revenue @@ -253,7 +253,7 @@ HashJoin: INNER └── Scan: default.tpch_test.lineitem (#2) (read rows: 6001215) # Q6 -query I +query T explain join select truncate(sum(l_extendedprice * l_discount),3) as revenue from @@ -267,7 +267,7 @@ where Scan: default.tpch_test.lineitem (#0) (read rows: 6001215) # Q7 -query I +query T explain join select supp_nation, cust_nation, @@ -331,7 +331,7 @@ HashJoin: INNER └── Scan: default.tpch_test.lineitem (#1) (read rows: 6001215) # Q8 -query I +query T explain join select o_year, truncate(sum(case @@ -401,7 +401,7 @@ HashJoin: INNER └── Scan: default.tpch_test.supplier (#1) (read rows: 10000) # Q9 -query I +query T explain join select nation, o_year, @@ -458,7 +458,7 @@ HashJoin: INNER └── Scan: default.tpch_test.partsupp (#3) (read rows: 800000) # Q10 -query I +query T explain join select c_custkey, c_name, @@ -506,7 +506,7 @@ HashJoin: INNER └── Scan: default.tpch_test.lineitem (#2) (read rows: 6001215) # Q11 -query I +query T explain join select ps_partkey, sum(ps_supplycost * ps_availqty) as value @@ -558,7 +558,7 @@ HashJoin: INNER └── Scan: default.tpch_test.partsupp (#0) (read rows: 800000) # Q12 -query I +query T explain join select l_shipmode, sum(case @@ -595,7 +595,7 @@ HashJoin: INNER └── Scan: default.tpch_test.orders (#0) (read rows: 1500000) # Q13 -query I +query T explain join select c_count, count(*) as custdist @@ -627,7 +627,7 @@ HashJoin: RIGHT OUTER └── Scan: default.tpch_test.orders (#1) (read rows: 1500000) # Q14 -query I +query T explain join select TRUNCATE(100.00 * sum(case when p_type like 'PROMO%' @@ -736,7 +736,7 @@ HashJoin: INNER # Q16 -query I +query T explain join select p_brand, p_type, @@ -780,7 +780,7 @@ HashJoin: RIGHT MARK └── Scan: default.tpch_test.partsupp (#0) (read rows: 800000) #Q17 -query I +query T explain join select truncate(sum(l_extendedprice) / 7.0,8) as avg_yearly from @@ -810,7 +810,7 @@ HashJoin: INNER └── Scan: default.tpch_test.lineitem (#0) (read rows: 6001215) #Q18 -query I +query T explain join select c_name, c_custkey, @@ -846,20 +846,16 @@ order by ---- HashJoin: INNER ├── Build -│ └── HashJoin: INNER -│ ├── Build -│ │ └── Scan: default.tpch_test.lineitem (#3) (read rows: 6001215) -│ └── Probe -│ └── HashJoin: INNER -│ ├── Build -│ │ └── Scan: default.tpch_test.customer (#0) (read rows: 150000) -│ └── Probe -│ └── Scan: default.tpch_test.orders (#1) (read rows: 1500000) +│ └── Scan: default.tpch_test.lineitem (#3) (read rows: 6001215) └── Probe - └── Scan: default.tpch_test.lineitem (#2) (read rows: 6001215) + └── HashJoin: INNER + ├── Build + │ └── Scan: default.tpch_test.customer (#0) (read rows: 150000) + └── Probe + └── Scan: default.tpch_test.orders (#1) (read rows: 1500000) # Q19 -query I +query T explain join select truncate(sum(l_extendedprice* (1 - l_discount)),3) as revenue from @@ -937,7 +933,7 @@ HashJoin: INNER └── Scan: default.tpch_test.lineitem (#0) (read rows: 6001215) # Q20 -query I +query T explain join select s_name, s_address @@ -995,7 +991,7 @@ HashJoin: RIGHT SEMI └── Scan: default.tpch_test.lineitem (#4) (read rows: 6001215) # Q21 -query I +query T explain join select s_name, truncate(count(*),4) as numwait @@ -1059,7 +1055,7 @@ HashJoin: RIGHT ANTI └── Scan: default.tpch_test.lineitem (#5) (read rows: 6001215) # Q22 -query I +query T explain join select cntrycode, count(*) as numcust,