Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/query/sql/src/planner/metadata/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub type MetadataRef = Arc<RwLock<Metadata>>;
pub struct Metadata {
tables: Vec<TableEntry>,
columns: Vec<ColumnEntry>,
removed_mark_indexes: ColumnSet,
/// Table column indexes that are lazy materialized.
table_lazy_columns: HashMap<IndexType, ColumnSet>,
table_source: HashMap<IndexType, DataSourcePlan>,
Expand Down Expand Up @@ -134,6 +135,14 @@ impl Metadata {
self.columns.as_slice()
}

pub fn add_removed_mark_index(&mut self, index: IndexType) {
self.removed_mark_indexes.insert(index);
}

pub fn is_removed_mark_index(&self, index: IndexType) -> bool {
self.removed_mark_indexes.contains(&index)
}

pub fn add_retained_column(&mut self, index: IndexType) {
self.retained_columns.insert(index);
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::CTEFilterPushdownOptimizer;
use crate::optimizer::optimizers::CascadesOptimizer;
use crate::optimizer::optimizers::DPhpyOptimizer;
use crate::optimizer::optimizers::EliminateSelfJoinOptimizer;
use crate::optimizer::optimizers::distributed::BroadcastToShuffleOptimizer;
use crate::optimizer::optimizers::operator::CleanupUnusedCTEOptimizer;
use crate::optimizer::optimizers::operator::DeduplicateJoinConditionOptimizer;
Expand Down Expand Up @@ -265,6 +266,8 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
]))
// Apply DPhyp algorithm for cost-based join reordering
.add(DPhpyOptimizer::new(opt_ctx.clone()))
// Eliminate self joins when possible
.add(EliminateSelfJoinOptimizer::new(opt_ctx.clone()))
// After join reorder, Convert some single join to inner join.
.add(SingleToInnerOptimizer::new())
// Deduplicate join conditions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;

use crate::optimizer::Optimizer;
use crate::optimizer::OptimizerContext;
use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer;
use crate::optimizer::optimizers::rule::RuleID;

pub struct EliminateSelfJoinOptimizer {
opt_ctx: Arc<OptimizerContext>,
}

impl EliminateSelfJoinOptimizer {
pub fn new(opt_ctx: Arc<OptimizerContext>) -> Self {
Self { opt_ctx }
}

fn optimize_sync(&self, s_expr: &SExpr) -> Result<SExpr> {
// `EagerAggregation` is used here as a speculative pre-rewrite to expose patterns that
// `EliminateSelfJoin` can match. If no self-join is actually eliminated, we intentionally
// return the original input plan to avoid keeping the eager-aggregation rewrite as a
// standalone optimization.
static RULES_EAGER_AGGREGATION: &[RuleID] = &[RuleID::EagerAggregation];
let optimizer = RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_EAGER_AGGREGATION);
let s_expr_after_eager_aggregation = optimizer.optimize_sync(s_expr)?;

static RULES_ELIMINATE_SELF_JOIN: &[RuleID] = &[RuleID::EliminateSelfJoin];
let optimizer =
RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_ELIMINATE_SELF_JOIN);
let s_expr_after_eliminate_self_join =
optimizer.optimize_sync(&s_expr_after_eager_aggregation)?;

if s_expr_after_eager_aggregation.eq(&s_expr_after_eliminate_self_join) {
return Ok(s_expr.clone());
}

Ok(s_expr_after_eliminate_self_join)
}
}

#[async_trait::async_trait]
impl Optimizer for EliminateSelfJoinOptimizer {
fn name(&self) -> String {
"EliminateSelfJoinOptimizer".to_string()
}

async fn optimize(&mut self, s_expr: &SExpr) -> Result<SExpr> {
self.optimize_sync(s_expr)
}
}
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
mod cascades;
pub mod cte_filter_pushdown;
pub mod distributed;
mod eliminate_self_join;
mod hyper_dp;
pub mod operator;
pub mod recursive;
pub mod rule;

pub use cascades::CascadesOptimizer;
pub use cte_filter_pushdown::CTEFilterPushdownOptimizer;
pub use eliminate_self_join::EliminateSelfJoinOptimizer;
pub use hyper_dp::DPhpyOptimizer;
pub use operator::CleanupUnusedCTEOptimizer;
Original file line number Diff line number Diff line change
Expand Up @@ -692,20 +692,22 @@ impl SubqueryDecorrelatorOptimizer {
SubqueryType::Any => {
let output_column = subquery.output_column.clone();
let column_name = format!("subquery_{}", output_column.index);
let left_condition = wrap_cast(
&ScalarExpr::BoundColumnRef(BoundColumnRef {
span: subquery.span,
column: ColumnBindingBuilder::new(
column_name,
output_column.index,
output_column.data_type,
Visibility::Visible,
)
.table_index(output_column.table_index)
.build(),
}),
&subquery.data_type,
);
let left_condition_base = ScalarExpr::BoundColumnRef(BoundColumnRef {
span: subquery.span,
column: ColumnBindingBuilder::new(
column_name,
output_column.index,
output_column.data_type,
Visibility::Visible,
)
.table_index(output_column.table_index)
.build(),
});
let left_condition = if left_condition_base.data_type()? == *subquery.data_type {
left_condition_base
} else {
wrap_cast(&left_condition_base, &subquery.data_type)
};
let child_expr = *subquery.child_expr.as_ref().unwrap().clone();
let op = subquery.compare_op.as_ref().unwrap().clone();
let (right_condition, is_non_equi_condition) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ impl Rule for RuleEagerAggregation {
if extra_eval_scalar_expr.is_some() {
for eval_item in &extra_eval_scalar.items {
let eval_used_columns = eval_item.scalar.used_columns();
if eval_used_columns
.iter()
.any(|column| self.metadata.read().is_removed_mark_index(*column))
{
continue;
}
let mut resolved_by_one_child = false;
join_columns.for_each_mut(|side, columns_set| {
if eval_used_columns.is_subset(columns_set) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::optimizer::optimizers::rule::RuleDeduplicateSort;
use crate::optimizer::optimizers::rule::RuleEagerAggregation;
use crate::optimizer::optimizers::rule::RuleEliminateEvalScalar;
use crate::optimizer::optimizers::rule::RuleEliminateFilter;
use crate::optimizer::optimizers::rule::RuleEliminateSelfJoin;
use crate::optimizer::optimizers::rule::RuleEliminateSort;
use crate::optimizer::optimizers::rule::RuleEliminateUnion;
use crate::optimizer::optimizers::rule::RuleFilterFlattenOr;
Expand Down Expand Up @@ -124,6 +125,7 @@ impl RuleFactory {
RuleID::EagerAggregation => Ok(Box::new(RuleEagerAggregation::new(metadata))),
RuleID::PushDownPrewhere => Ok(Box::new(RulePushDownPrewhere::new(metadata))),
RuleID::TryApplyAggIndex => Ok(Box::new(RuleTryApplyAggIndex::new(metadata))),
RuleID::EliminateSelfJoin => Ok(Box::new(RuleEliminateSelfJoin::new(metadata))),
RuleID::EliminateSort => Ok(Box::new(RuleEliminateSort::new())),
RuleID::DeduplicateSort => Ok(Box::new(RuleDeduplicateSort::new())),
RuleID::SemiToInnerJoin => Ok(Box::new(RuleSemiToInnerJoin::new())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Rule for RulePushDownFilterJoin {
let (s_expr, outer_to_inner) = outer_join_to_inner_join(s_expr, self.metadata.clone())?;

// Second, check if can convert mark join to semi join
let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr)?;
let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr, self.metadata.clone())?;
if s_expr.plan().rel_op() != RelOp::Filter {
state.add_result(s_expr);
return Ok(());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
mod push_down_filter_join;
mod rule_commute_join;
mod rule_commute_join_base_table;
mod rule_eliminate_self_join;
mod rule_left_exchange_join;
mod rule_semi_to_inner_join;
mod util;

pub use push_down_filter_join::*;
pub use rule_commute_join::RuleCommuteJoin;
pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable;
pub use rule_eliminate_self_join::RuleEliminateSelfJoin;
pub use rule_left_exchange_join::RuleLeftExchangeJoin;
pub use rule_semi_to_inner_join::RuleSemiToInnerJoin;
pub use util::get_join_predicates;
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use std::sync::Arc;

use databend_common_exception::Result;

use crate::MetadataRef;
use crate::ScalarExpr;
use crate::optimizer::ir::SExpr;
use crate::plans::Filter;
use crate::plans::Join;
use crate::plans::JoinType;

pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
pub fn convert_mark_to_semi_join(s_expr: &SExpr, metadata: MetadataRef) -> Result<(SExpr, bool)> {
let mut filter: Filter = s_expr.plan().clone().try_into()?;
let mut join: Join = s_expr.child(0)?.plan().clone().try_into()?;

Expand All @@ -34,7 +35,9 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
return Ok((s_expr.clone(), false));
}

let mark_index = join.marker_index.unwrap();
let Some(mark_index) = join.marker_index else {
return Ok((s_expr.clone(), false));
};
let mut find_mark_index = false;

// remove mark index filter
Expand Down Expand Up @@ -65,6 +68,8 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
_ => unreachable!(),
};

metadata.write().add_removed_mark_index(mark_index);

// clear is null equal sign
join.equi_conditions.iter_mut().for_each(|c| {
c.is_null_equal = false;
Expand Down
Loading
Loading