Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/query/sql/src/planner/metadata/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub type MetadataRef = Arc<RwLock<Metadata>>;
pub struct Metadata {
tables: Vec<TableEntry>,
columns: Vec<ColumnEntry>,
removed_mark_indexes: ColumnSet,
/// Table column indexes that are lazy materialized.
table_lazy_columns: HashMap<IndexType, ColumnSet>,
table_source: HashMap<IndexType, DataSourcePlan>,
Expand Down Expand Up @@ -134,6 +135,14 @@ impl Metadata {
self.columns.as_slice()
}

pub fn add_removed_mark_index(&mut self, index: IndexType) {
self.removed_mark_indexes.insert(index);
}

pub fn is_removed_mark_index(&self, index: IndexType) -> bool {
self.removed_mark_indexes.contains(&index)
}

pub fn add_retained_column(&mut self, index: IndexType) {
self.retained_columns.insert(index);
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::CTEFilterPushdownOptimizer;
use crate::optimizer::optimizers::CascadesOptimizer;
use crate::optimizer::optimizers::DPhpyOptimizer;
use crate::optimizer::optimizers::EliminateSelfJoinOptimizer;
use crate::optimizer::optimizers::distributed::BroadcastToShuffleOptimizer;
use crate::optimizer::optimizers::operator::CleanupUnusedCTEOptimizer;
use crate::optimizer::optimizers::operator::DeduplicateJoinConditionOptimizer;
Expand Down Expand Up @@ -265,6 +266,8 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
]))
// Apply DPhyp algorithm for cost-based join reordering
.add(DPhpyOptimizer::new(opt_ctx.clone()))
// Eliminate self joins when possible
.add(EliminateSelfJoinOptimizer::new(opt_ctx.clone()))
// After join reorder, Convert some single join to inner join.
.add(SingleToInnerOptimizer::new())
// Deduplicate join conditions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;

use databend_common_exception::Result;
use log::info;

use crate::optimizer::Optimizer;
use crate::optimizer::OptimizerContext;
use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::DPhpyOptimizer;
use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer;
use crate::optimizer::optimizers::rule::RuleID;

pub struct EliminateSelfJoinOptimizer {
opt_ctx: Arc<OptimizerContext>,
}

impl EliminateSelfJoinOptimizer {
pub fn new(opt_ctx: Arc<OptimizerContext>) -> Self {
Self { opt_ctx }
}
}

#[async_trait::async_trait]
impl Optimizer for EliminateSelfJoinOptimizer {
fn name(&self) -> String {
"EliminateSelfJoinOptimizer".to_string()
}

async fn optimize(&mut self, s_expr: &SExpr) -> Result<SExpr> {
// `EagerAggregation` is used here as a speculative pre-rewrite to expose patterns that
// `EliminateSelfJoin` can match. If no self-join is actually eliminated, we intentionally
// return the original input plan to avoid keeping the eager-aggregation rewrite as a
// standalone optimization.
let start = Instant::now();
static RULES_EAGER_AGGREGATION: &[RuleID] = &[RuleID::EagerAggregation];
let optimizer = RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_EAGER_AGGREGATION);
let s_expr_after_eager_aggregation = optimizer.optimize_sync(s_expr)?;

static RULES_ELIMINATE_SELF_JOIN: &[RuleID] = &[RuleID::EliminateSelfJoin];
let optimizer =
RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_ELIMINATE_SELF_JOIN);
let s_expr_after_eliminate_self_join =
optimizer.optimize_sync(&s_expr_after_eager_aggregation)?;

let duration = start.elapsed();

if s_expr_after_eliminate_self_join == s_expr_after_eager_aggregation {
return Ok(s_expr.clone());
}

// EliminateSelfJoinOptimizer should ideally run before Dphyp in the optimizer pipeline.
// However, due to issues with the current EagerAggregation implementation, running it
// before Dphyp causes TPC-H Q18 optimization to fail. Therefore, EliminateSelfJoinOptimizer
// is placed after Dphyp, and we run Dphyp again here to ensure join reordering after
// eliminating self-joins.
let s_expr_after_dphyp = DPhpyOptimizer::new(self.opt_ctx.clone())
.optimize_async(&s_expr_after_eliminate_self_join)
.await?;

info!("EliminateSelfJoinOptimizer: {}ms", duration.as_millis());

Ok(s_expr_after_dphyp)
}
}
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
mod cascades;
pub mod cte_filter_pushdown;
pub mod distributed;
mod eliminate_self_join;
mod hyper_dp;
pub mod operator;
pub mod recursive;
pub mod rule;

pub use cascades::CascadesOptimizer;
pub use cte_filter_pushdown::CTEFilterPushdownOptimizer;
pub use eliminate_self_join::EliminateSelfJoinOptimizer;
pub use hyper_dp::DPhpyOptimizer;
pub use operator::CleanupUnusedCTEOptimizer;
Original file line number Diff line number Diff line change
Expand Up @@ -692,20 +692,22 @@ impl SubqueryDecorrelatorOptimizer {
SubqueryType::Any => {
let output_column = subquery.output_column.clone();
let column_name = format!("subquery_{}", output_column.index);
let left_condition = wrap_cast(
&ScalarExpr::BoundColumnRef(BoundColumnRef {
span: subquery.span,
column: ColumnBindingBuilder::new(
column_name,
output_column.index,
output_column.data_type,
Visibility::Visible,
)
.table_index(output_column.table_index)
.build(),
}),
&subquery.data_type,
);
let left_condition_base = ScalarExpr::BoundColumnRef(BoundColumnRef {
span: subquery.span,
column: ColumnBindingBuilder::new(
column_name,
output_column.index,
output_column.data_type,
Visibility::Visible,
)
.table_index(output_column.table_index)
.build(),
});
let left_condition = if left_condition_base.data_type()? == *subquery.data_type {
left_condition_base
} else {
wrap_cast(&left_condition_base, &subquery.data_type)
};
let child_expr = *subquery.child_expr.as_ref().unwrap().clone();
let op = subquery.compare_op.as_ref().unwrap().clone();
let (right_condition, is_non_equi_condition) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ impl Rule for RuleEagerAggregation {
if extra_eval_scalar_expr.is_some() {
for eval_item in &extra_eval_scalar.items {
let eval_used_columns = eval_item.scalar.used_columns();
if eval_used_columns
.iter()
.any(|column| self.metadata.read().is_removed_mark_index(*column))
{
continue;
}
let mut resolved_by_one_child = false;
join_columns.for_each_mut(|side, columns_set| {
if eval_used_columns.is_subset(columns_set) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::optimizer::optimizers::rule::RuleDeduplicateSort;
use crate::optimizer::optimizers::rule::RuleEagerAggregation;
use crate::optimizer::optimizers::rule::RuleEliminateEvalScalar;
use crate::optimizer::optimizers::rule::RuleEliminateFilter;
use crate::optimizer::optimizers::rule::RuleEliminateSelfJoin;
use crate::optimizer::optimizers::rule::RuleEliminateSort;
use crate::optimizer::optimizers::rule::RuleEliminateUnion;
use crate::optimizer::optimizers::rule::RuleFilterFlattenOr;
Expand Down Expand Up @@ -124,6 +125,7 @@ impl RuleFactory {
RuleID::EagerAggregation => Ok(Box::new(RuleEagerAggregation::new(metadata))),
RuleID::PushDownPrewhere => Ok(Box::new(RulePushDownPrewhere::new(metadata))),
RuleID::TryApplyAggIndex => Ok(Box::new(RuleTryApplyAggIndex::new(metadata))),
RuleID::EliminateSelfJoin => Ok(Box::new(RuleEliminateSelfJoin::new(ctx))),
RuleID::EliminateSort => Ok(Box::new(RuleEliminateSort::new())),
RuleID::DeduplicateSort => Ok(Box::new(RuleDeduplicateSort::new())),
RuleID::SemiToInnerJoin => Ok(Box::new(RuleSemiToInnerJoin::new())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,28 @@ impl Rule for RuleEliminateFilter {
true
}
}
ScalarExpr::FunctionCall(func)
if func.func_name == "is_true"
&& func.arguments.len() == 1
&& matches!(func.arguments[0], ScalarExpr::FunctionCall(_)) =>
{
let ScalarExpr::FunctionCall(inner) = &func.arguments[0] else {
return true;
};
if inner.func_name != "eq" || inner.arguments.len() != 2 {
return true;
}
if let (
ScalarExpr::BoundColumnRef(left_col),
ScalarExpr::BoundColumnRef(right_col),
) = (&inner.arguments[0], &inner.arguments[1])
{
left_col.column.index != right_col.column.index
|| left_col.column.data_type.is_nullable()
} else {
true
}
}
_ => true,
})
.collect::<Vec<ScalarExpr>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Rule for RulePushDownFilterJoin {
let (s_expr, outer_to_inner) = outer_join_to_inner_join(s_expr, self.metadata.clone())?;

// Second, check if can convert mark join to semi join
let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr)?;
let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr, self.metadata.clone())?;
if s_expr.plan().rel_op() != RelOp::Filter {
state.add_result(s_expr);
return Ok(());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
mod push_down_filter_join;
mod rule_commute_join;
mod rule_commute_join_base_table;
mod rule_eliminate_self_join;
mod rule_left_exchange_join;
mod rule_semi_to_inner_join;
mod util;

pub use push_down_filter_join::*;
pub use rule_commute_join::RuleCommuteJoin;
pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable;
pub use rule_eliminate_self_join::RuleEliminateSelfJoin;
pub use rule_left_exchange_join::RuleLeftExchangeJoin;
pub use rule_semi_to_inner_join::RuleSemiToInnerJoin;
pub use util::get_join_predicates;
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use std::sync::Arc;

use databend_common_exception::Result;

use crate::MetadataRef;
use crate::ScalarExpr;
use crate::optimizer::ir::SExpr;
use crate::plans::Filter;
use crate::plans::Join;
use crate::plans::JoinType;

pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
pub fn convert_mark_to_semi_join(s_expr: &SExpr, metadata: MetadataRef) -> Result<(SExpr, bool)> {
let mut filter: Filter = s_expr.plan().clone().try_into()?;
let mut join: Join = s_expr.child(0)?.plan().clone().try_into()?;

Expand All @@ -34,7 +35,9 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
return Ok((s_expr.clone(), false));
}

let mark_index = join.marker_index.unwrap();
let Some(mark_index) = join.marker_index else {
return Ok((s_expr.clone(), false));
};
let mut find_mark_index = false;

// remove mark index filter
Expand Down Expand Up @@ -65,6 +68,8 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<(SExpr, bool)> {
_ => unreachable!(),
};

metadata.write().add_removed_mark_index(mark_index);

// clear is null equal sign
join.equi_conditions.iter_mut().for_each(|c| {
c.is_null_equal = false;
Expand Down
Loading
Loading