diff --git a/src/query/service/tests/it/sql/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition_test.rs b/src/query/service/tests/it/sql/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition_test.rs index 688d23c6ae13c..e05a5a98b85fa 100644 --- a/src/query/service/tests/it/sql/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition_test.rs +++ b/src/query/service/tests/it/sql/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition_test.rs @@ -1471,3 +1471,283 @@ Join [t2.id = t3.id, t1.id = t3.id] Ok(()) } + +// Deduplicate redundant equi-conditions on a semi join +// Inner child builds t1.id = t2.id; upper LEFT SEMI has both t2.id = t3.id +// and t1.id = t3.id. One of them should be removed. +#[test] +fn test_left_semi_deduplication() -> Result<()> { + let mut builder = ExprBuilder::new(); + + let t1_id = builder.column( + "t1.id", + 0, + "id", + DataType::Number(NumberDataType::Int64), + "t1", + 0, + ); + let t2_id = builder.column( + "t2.id", + 1, + "id", + DataType::Number(NumberDataType::Int64), + "t2", + 1, + ); + let t3_id = builder.column( + "t3.id", + 2, + "id", + DataType::Number(NumberDataType::Int64), + "t3", + 2, + ); + + let t1 = builder.table_scan(0, "t1"); + let t2 = builder.table_scan(1, "t2"); + let t3 = builder.table_scan(2, "t3"); + + let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false); + let cond_t2_t3 = builder.join_condition(t2_id.clone(), t3_id.clone(), false); + let cond_t1_t3 = builder.join_condition(t1_id.clone(), t3_id.clone(), false); // redundant + + let join_t1_t2 = builder.join(t1, t2, vec![cond_t1_t2], JoinType::Inner); + let join_tree = builder.join( + join_t1_t2, + t3, + vec![cond_t2_t3.clone(), cond_t1_t3], + JoinType::LeftSemi, + ); + + let before_patterns = [r#" +Join [t2.id = t3.id, t1.id = t3.id] + Join [t1.id = t2.id] + Table t0 + Table t1 + Table t2 +"#]; + + let after_patterns = [r#" +Join [t2.id = t3.id] + Join [t1.id = t2.id] + Table t0 + Table t1 + Table t2 +"#]; + + let optimized = run_optimizer(join_tree.clone())?; + compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?; + + Ok(()) +} + +// Deduplicate redundant equi-conditions on an anti join +// Inner child builds t1.id = t2.id; upper RIGHT ANTI has both t3.id = t1.id +// and t3.id = t2.id. One of them should be removed. +#[test] +fn test_right_anti_deduplication() -> Result<()> { + let mut builder = ExprBuilder::new(); + + let t1_id = builder.column( + "t1.id", + 0, + "id", + DataType::Number(NumberDataType::Int64), + "t1", + 0, + ); + let t2_id = builder.column( + "t2.id", + 1, + "id", + DataType::Number(NumberDataType::Int64), + "t2", + 1, + ); + let t3_id = builder.column( + "t3.id", + 2, + "id", + DataType::Number(NumberDataType::Int64), + "t3", + 2, + ); + + let t1 = builder.table_scan(0, "t1"); + let t2 = builder.table_scan(1, "t2"); + let t3 = builder.table_scan(2, "t3"); + + let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false); + let cond_t3_t1 = builder.join_condition(t3_id.clone(), t1_id.clone(), false); + let cond_t3_t2 = builder.join_condition(t3_id.clone(), t2_id.clone(), false); // redundant + + let join_t1_t2 = builder.join(t1, t2, vec![cond_t1_t2], JoinType::Inner); + let join_tree = builder.join( + t3, + join_t1_t2, + vec![cond_t3_t1.clone(), cond_t3_t2], + JoinType::RightAnti, + ); + + let before_patterns = [r#" +Join [t3.id = t1.id, t3.id = t2.id] + Table t2 + Join [t1.id = t2.id] + Table t0 + Table t1 +"#]; + + let after_patterns = [r#" +Join [t3.id = t1.id] + Table t2 + Join [t1.id = t2.id] + Table t0 + Table t1 +"#]; + + let optimized = run_optimizer(join_tree.clone())?; + compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?; + + Ok(()) +} + +// Ensure anti join equivalence does not leak upward to remove parent join predicates. +// Child LEFT ANTI has t1.id = t2.id, parent INNER joins on both t1.id = t3.id and t2.id = t3.id. +// These two predicates must not be deduplicated using child's equality. +#[test] +fn test_anti_equivalence_not_leaking() -> Result<()> { + let mut builder = ExprBuilder::new(); + + let t1_id = builder.column( + "t1.id", + 0, + "id", + DataType::Number(NumberDataType::Int64), + "t1", + 0, + ); + let t2_id = builder.column( + "t2.id", + 1, + "id", + DataType::Number(NumberDataType::Int64), + "t2", + 1, + ); + let t3_id = builder.column( + "t3.id", + 2, + "id", + DataType::Number(NumberDataType::Int64), + "t3", + 2, + ); + + let t1 = builder.table_scan(0, "t1"); + let t2 = builder.table_scan(1, "t2"); + let t3 = builder.table_scan(2, "t3"); + + let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false); + let cond_t1_t3 = builder.join_condition(t1_id.clone(), t3_id.clone(), false); + let cond_t2_t3 = builder.join_condition(t2_id.clone(), t3_id.clone(), false); + + let anti = builder.join(t1, t2, vec![cond_t1_t2], JoinType::LeftAnti); + let join_tree = builder.join( + anti, + t3, + vec![cond_t1_t3.clone(), cond_t2_t3.clone()], + JoinType::Inner, + ); + + let before_patterns = [r#" +Join [t1.id = t3.id, t2.id = t3.id] + Join [t1.id = t2.id] + Table t0 + Table t1 + Table t2 +"#]; + + let after_patterns = [r#" +Join [t1.id = t3.id, t2.id = t3.id] + Join [t1.id = t2.id] + Table t0 + Table t1 + Table t2 +"#]; + + let optimized = run_optimizer(join_tree.clone())?; + compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?; + + Ok(()) +} + +// Ensure semi join equivalence does not leak upward to remove parent join predicates. +// Child LEFT SEMI has t1.id = t2.id, parent INNER joins on both t1.id = t3.id and t2.id = t3.id. +// These two predicates must not be deduplicated using child's equality. +#[test] +fn test_semi_equivalence_not_leaking() -> Result<()> { + let mut builder = ExprBuilder::new(); + + let t1_id = builder.column( + "t1.id", + 0, + "id", + DataType::Number(NumberDataType::Int64), + "t1", + 0, + ); + let t2_id = builder.column( + "t2.id", + 1, + "id", + DataType::Number(NumberDataType::Int64), + "t2", + 1, + ); + let t3_id = builder.column( + "t3.id", + 2, + "id", + DataType::Number(NumberDataType::Int64), + "t3", + 2, + ); + + let t1 = builder.table_scan(0, "t1"); + let t2 = builder.table_scan(1, "t2"); + let t3 = builder.table_scan(2, "t3"); + + let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false); + let cond_t1_t3 = builder.join_condition(t1_id.clone(), t3_id.clone(), false); + let cond_t2_t3 = builder.join_condition(t2_id.clone(), t3_id.clone(), false); + + let semi = builder.join(t1, t2, vec![cond_t1_t2], JoinType::LeftSemi); + let join_tree = builder.join( + semi, + t3, + vec![cond_t1_t3.clone(), cond_t2_t3.clone()], + JoinType::Inner, + ); + + let before_patterns = [r#" +Join [t1.id = t3.id, t2.id = t3.id] + Join [t1.id = t2.id] + Table t0 + Table t1 + Table t2 +"#]; + + let after_patterns = [r#" +Join [t1.id = t3.id, t2.id = t3.id] + Join [t1.id = t2.id] + Table t0 + Table t1 + Table t2 +"#]; + + let optimized = run_optimizer(join_tree.clone())?; + compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?; + + Ok(()) +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition.rs index 5120ed7eabcac..ae2181708fee8 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/filter/deduplicate_join_condition.rs @@ -106,18 +106,18 @@ impl DeduplicateJoinConditionOptimizer { #[recursive::recursive] pub fn deduplicate(&mut self, s_expr: &SExpr) -> Result { match s_expr.plan.as_ref() { - // Only optimize inner joins - RelOperator::Join(join) if matches!(join.join_type, JoinType::Inner) => { - self.optimize_inner_join(s_expr, join) + // Only optimize filtering joins that don't preserve nulls + RelOperator::Join(join) if join.join_type.is_filtering_join() => { + self.optimize_filtering_join(s_expr, join) } // Recursively process other nodes _ => self.deduplicate_children(s_expr), } } - /// Optimize inner join by removing redundant conditions - fn optimize_inner_join(&mut self, s_expr: &SExpr, join: &Join) -> Result { - debug_assert!(matches!(join.join_type, JoinType::Inner)); + /// Optimize filtering joins (inner/semi/anti) by removing redundant equi-conditions + fn optimize_filtering_join(&mut self, s_expr: &SExpr, join: &Join) -> Result { + debug_assert!(join.join_type.is_filtering_join()); // Recursively optimize left and right subtrees let left = self.deduplicate(s_expr.child(0)?)?; @@ -125,6 +125,15 @@ impl DeduplicateJoinConditionOptimizer { let mut join = join.clone(); let mut non_redundant_conditions = Vec::new(); + // Anti / Semi joins should not contribute new equivalence to ancestor nodes. + let snapshot = if matches!( + join.join_type, + JoinType::LeftAnti | JoinType::RightAnti | JoinType::LeftSemi | JoinType::RightSemi + ) { + Some(self.snapshot()) + } else { + None + }; // Check each equi-join condition for condition in &join.equi_conditions { @@ -149,6 +158,11 @@ impl DeduplicateJoinConditionOptimizer { join.equi_conditions = non_redundant_conditions; } + // Restore union-find state for anti joins to avoid leaking equivalence upward. + if let Some(snapshot) = snapshot { + self.restore(snapshot); + } + // Create new expression let new_plan = Arc::new(RelOperator::Join(join)); let new_children = vec![Arc::new(left), Arc::new(right)]; @@ -233,6 +247,28 @@ impl DeduplicateJoinConditionOptimizer { fn union(&mut self, idx1: usize, idx2: usize) { self.column_group.insert(idx2, idx1); } + + /// Snapshot the current union-find state so we can rollback after + /// optimizing an anti join. + fn snapshot(&self) -> UfSnapshot { + UfSnapshot { + expr_to_index: self.expr_to_index.clone(), + column_group: self.column_group.clone(), + next_index: self.next_index, + } + } + + fn restore(&mut self, snapshot: UfSnapshot) { + self.expr_to_index = snapshot.expr_to_index; + self.column_group = snapshot.column_group; + self.next_index = snapshot.next_index; + } +} + +struct UfSnapshot { + expr_to_index: HashMap, + column_group: HashMap, + next_index: usize, } #[async_trait::async_trait] diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index 7cc07b8b3909d..83a4320113f30 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -125,6 +125,20 @@ impl JoinType { JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof ) } + + /// Joins that behave like filters (no null preserving side) so + /// equi-join conditions can be deduplicated safely. + pub fn is_filtering_join(&self) -> bool { + matches!( + self, + JoinType::Inner + | JoinType::InnerAny + | JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::LeftAnti + | JoinType::RightAnti + ) + } } impl Display for JoinType { diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs index 5d984b2157c42..5bf7afb1a3f4c 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs @@ -62,6 +62,7 @@ pub struct ReadParquetDataTransform { table_schema: Arc, scan_id: IndexType, context: Arc, + read_settings: ReadSettings, stats: Arc, unfinished_processors_count: Arc, } @@ -80,6 +81,7 @@ impl ReadParquetDataTransform { unfinished_processors_count: Arc, ) -> Result { let func_ctx = ctx.get_function_context()?; + let read_settings = ReadSettings::from_ctx(&ctx)?; Ok(ProcessorPtr::create(AsyncTransformer::create( input, output, @@ -91,6 +93,7 @@ impl ReadParquetDataTransform { table_schema, scan_id: table_index, context: ctx, + read_settings, stats, unfinished_processors_count, }, @@ -154,7 +157,7 @@ impl AsyncTransform for ReadParquetDataTransform { fuse_part_infos.push(part.clone()); let block_reader = self.block_reader.clone(); - let settings = ReadSettings::from_ctx(&self.context)?; + let settings = self.read_settings; let index_reader = self.index_reader.clone(); let virtual_reader = self.virtual_reader.clone(); diff --git a/src/query/storages/fuse/src/pruning/expr_runtime_pruner.rs b/src/query/storages/fuse/src/pruning/expr_runtime_pruner.rs index 4dba9b1d48364..5b6fb836dda02 100644 --- a/src/query/storages/fuse/src/pruning/expr_runtime_pruner.rs +++ b/src/query/storages/fuse/src/pruning/expr_runtime_pruner.rs @@ -29,7 +29,6 @@ use databend_common_expression::Scalar; use databend_common_expression::TableSchema; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_storages_common_index::statistics_to_domain; -use log::debug; use log::info; use crate::FuseBlockPartInfo; @@ -102,10 +101,6 @@ impl ExprRuntimePruner { func_ctx, &BUILTIN_FUNCTIONS, ); - debug!( - "Runtime filter after constant fold is {:?}", - new_expr.sql_display() - ); if matches!( new_expr, Expr::Constant(Constant {