Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1471,3 +1471,283 @@ Join [t2.id = t3.id, t1.id = t3.id]

Ok(())
}

// Deduplicate redundant equi-conditions on a semi join
// Inner child builds t1.id = t2.id; upper LEFT SEMI has both t2.id = t3.id
// and t1.id = t3.id. One of them should be removed.
#[test]
fn test_left_semi_deduplication() -> Result<()> {
let mut builder = ExprBuilder::new();

let t1_id = builder.column(
"t1.id",
0,
"id",
DataType::Number(NumberDataType::Int64),
"t1",
0,
);
let t2_id = builder.column(
"t2.id",
1,
"id",
DataType::Number(NumberDataType::Int64),
"t2",
1,
);
let t3_id = builder.column(
"t3.id",
2,
"id",
DataType::Number(NumberDataType::Int64),
"t3",
2,
);

let t1 = builder.table_scan(0, "t1");
let t2 = builder.table_scan(1, "t2");
let t3 = builder.table_scan(2, "t3");

let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false);
let cond_t2_t3 = builder.join_condition(t2_id.clone(), t3_id.clone(), false);
let cond_t1_t3 = builder.join_condition(t1_id.clone(), t3_id.clone(), false); // redundant

let join_t1_t2 = builder.join(t1, t2, vec![cond_t1_t2], JoinType::Inner);
let join_tree = builder.join(
join_t1_t2,
t3,
vec![cond_t2_t3.clone(), cond_t1_t3],
JoinType::LeftSemi,
);

let before_patterns = [r#"
Join [t2.id = t3.id, t1.id = t3.id]
Join [t1.id = t2.id]
Table t0
Table t1
Table t2
"#];

let after_patterns = [r#"
Join [t2.id = t3.id]
Join [t1.id = t2.id]
Table t0
Table t1
Table t2
"#];

let optimized = run_optimizer(join_tree.clone())?;
compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?;

Ok(())
}

// Deduplicate redundant equi-conditions on an anti join
// Inner child builds t1.id = t2.id; upper RIGHT ANTI has both t3.id = t1.id
// and t3.id = t2.id. One of them should be removed.
#[test]
fn test_right_anti_deduplication() -> Result<()> {
let mut builder = ExprBuilder::new();

let t1_id = builder.column(
"t1.id",
0,
"id",
DataType::Number(NumberDataType::Int64),
"t1",
0,
);
let t2_id = builder.column(
"t2.id",
1,
"id",
DataType::Number(NumberDataType::Int64),
"t2",
1,
);
let t3_id = builder.column(
"t3.id",
2,
"id",
DataType::Number(NumberDataType::Int64),
"t3",
2,
);

let t1 = builder.table_scan(0, "t1");
let t2 = builder.table_scan(1, "t2");
let t3 = builder.table_scan(2, "t3");

let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false);
let cond_t3_t1 = builder.join_condition(t3_id.clone(), t1_id.clone(), false);
let cond_t3_t2 = builder.join_condition(t3_id.clone(), t2_id.clone(), false); // redundant

let join_t1_t2 = builder.join(t1, t2, vec![cond_t1_t2], JoinType::Inner);
let join_tree = builder.join(
t3,
join_t1_t2,
vec![cond_t3_t1.clone(), cond_t3_t2],
JoinType::RightAnti,
);

let before_patterns = [r#"
Join [t3.id = t1.id, t3.id = t2.id]
Table t2
Join [t1.id = t2.id]
Table t0
Table t1
"#];

let after_patterns = [r#"
Join [t3.id = t1.id]
Table t2
Join [t1.id = t2.id]
Table t0
Table t1
"#];

let optimized = run_optimizer(join_tree.clone())?;
compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?;

Ok(())
}

// Ensure anti join equivalence does not leak upward to remove parent join predicates.
// Child LEFT ANTI has t1.id = t2.id, parent INNER joins on both t1.id = t3.id and t2.id = t3.id.
// These two predicates must not be deduplicated using child's equality.
#[test]
fn test_anti_equivalence_not_leaking() -> Result<()> {
let mut builder = ExprBuilder::new();

let t1_id = builder.column(
"t1.id",
0,
"id",
DataType::Number(NumberDataType::Int64),
"t1",
0,
);
let t2_id = builder.column(
"t2.id",
1,
"id",
DataType::Number(NumberDataType::Int64),
"t2",
1,
);
let t3_id = builder.column(
"t3.id",
2,
"id",
DataType::Number(NumberDataType::Int64),
"t3",
2,
);

let t1 = builder.table_scan(0, "t1");
let t2 = builder.table_scan(1, "t2");
let t3 = builder.table_scan(2, "t3");

let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false);
let cond_t1_t3 = builder.join_condition(t1_id.clone(), t3_id.clone(), false);
let cond_t2_t3 = builder.join_condition(t2_id.clone(), t3_id.clone(), false);

let anti = builder.join(t1, t2, vec![cond_t1_t2], JoinType::LeftAnti);
let join_tree = builder.join(
anti,
t3,
vec![cond_t1_t3.clone(), cond_t2_t3.clone()],
JoinType::Inner,
);

let before_patterns = [r#"
Join [t1.id = t3.id, t2.id = t3.id]
Join [t1.id = t2.id]
Table t0
Table t1
Table t2
"#];

let after_patterns = [r#"
Join [t1.id = t3.id, t2.id = t3.id]
Join [t1.id = t2.id]
Table t0
Table t1
Table t2
"#];

let optimized = run_optimizer(join_tree.clone())?;
compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?;

Ok(())
}

// Ensure semi join equivalence does not leak upward to remove parent join predicates.
// Child LEFT SEMI has t1.id = t2.id, parent INNER joins on both t1.id = t3.id and t2.id = t3.id.
// These two predicates must not be deduplicated using child's equality.
#[test]
fn test_semi_equivalence_not_leaking() -> Result<()> {
let mut builder = ExprBuilder::new();

let t1_id = builder.column(
"t1.id",
0,
"id",
DataType::Number(NumberDataType::Int64),
"t1",
0,
);
let t2_id = builder.column(
"t2.id",
1,
"id",
DataType::Number(NumberDataType::Int64),
"t2",
1,
);
let t3_id = builder.column(
"t3.id",
2,
"id",
DataType::Number(NumberDataType::Int64),
"t3",
2,
);

let t1 = builder.table_scan(0, "t1");
let t2 = builder.table_scan(1, "t2");
let t3 = builder.table_scan(2, "t3");

let cond_t1_t2 = builder.join_condition(t1_id.clone(), t2_id.clone(), false);
let cond_t1_t3 = builder.join_condition(t1_id.clone(), t3_id.clone(), false);
let cond_t2_t3 = builder.join_condition(t2_id.clone(), t3_id.clone(), false);

let semi = builder.join(t1, t2, vec![cond_t1_t2], JoinType::LeftSemi);
let join_tree = builder.join(
semi,
t3,
vec![cond_t1_t3.clone(), cond_t2_t3.clone()],
JoinType::Inner,
);

let before_patterns = [r#"
Join [t1.id = t3.id, t2.id = t3.id]
Join [t1.id = t2.id]
Table t0
Table t1
Table t2
"#];

let after_patterns = [r#"
Join [t1.id = t3.id, t2.id = t3.id]
Join [t1.id = t2.id]
Table t0
Table t1
Table t2
"#];

let optimized = run_optimizer(join_tree.clone())?;
compare_trees(&join_tree, &optimized, &before_patterns, &after_patterns)?;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,34 @@ impl DeduplicateJoinConditionOptimizer {
#[recursive::recursive]
pub fn deduplicate(&mut self, s_expr: &SExpr) -> Result<SExpr> {
match s_expr.plan.as_ref() {
// Only optimize inner joins
RelOperator::Join(join) if matches!(join.join_type, JoinType::Inner) => {
self.optimize_inner_join(s_expr, join)
// Only optimize filtering joins that don't preserve nulls
RelOperator::Join(join) if join.join_type.is_filtering_join() => {
self.optimize_filtering_join(s_expr, join)
}
// Recursively process other nodes
_ => self.deduplicate_children(s_expr),
}
}

/// Optimize inner join by removing redundant conditions
fn optimize_inner_join(&mut self, s_expr: &SExpr, join: &Join) -> Result<SExpr> {
debug_assert!(matches!(join.join_type, JoinType::Inner));
/// Optimize filtering joins (inner/semi/anti) by removing redundant equi-conditions
fn optimize_filtering_join(&mut self, s_expr: &SExpr, join: &Join) -> Result<SExpr> {
debug_assert!(join.join_type.is_filtering_join());

// Recursively optimize left and right subtrees
let left = self.deduplicate(s_expr.child(0)?)?;
let right = self.deduplicate(s_expr.child(1)?)?;

let mut join = join.clone();
let mut non_redundant_conditions = Vec::new();
// Anti / Semi joins should not contribute new equivalence to ancestor nodes.
let snapshot = if matches!(
join.join_type,
JoinType::LeftAnti | JoinType::RightAnti | JoinType::LeftSemi | JoinType::RightSemi
) {
Some(self.snapshot())
} else {
None
};

// Check each equi-join condition
for condition in &join.equi_conditions {
Expand All @@ -149,6 +158,11 @@ impl DeduplicateJoinConditionOptimizer {
join.equi_conditions = non_redundant_conditions;
}

// Restore union-find state for anti joins to avoid leaking equivalence upward.
if let Some(snapshot) = snapshot {
self.restore(snapshot);
}

// Create new expression
let new_plan = Arc::new(RelOperator::Join(join));
let new_children = vec![Arc::new(left), Arc::new(right)];
Expand Down Expand Up @@ -233,6 +247,28 @@ impl DeduplicateJoinConditionOptimizer {
fn union(&mut self, idx1: usize, idx2: usize) {
self.column_group.insert(idx2, idx1);
}

/// Snapshot the current union-find state so we can rollback after
/// optimizing an anti join.
fn snapshot(&self) -> UfSnapshot {
UfSnapshot {
expr_to_index: self.expr_to_index.clone(),
column_group: self.column_group.clone(),
next_index: self.next_index,
}
}

fn restore(&mut self, snapshot: UfSnapshot) {
self.expr_to_index = snapshot.expr_to_index;
self.column_group = snapshot.column_group;
self.next_index = snapshot.next_index;
}
}

struct UfSnapshot {
expr_to_index: HashMap<ScalarExpr, usize>,
column_group: HashMap<usize, usize>,
next_index: usize,
}

#[async_trait::async_trait]
Expand Down
14 changes: 14 additions & 0 deletions src/query/sql/src/planner/plans/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ impl JoinType {
JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof
)
}

/// Joins that behave like filters (no null preserving side) so
/// equi-join conditions can be deduplicated safely.
pub fn is_filtering_join(&self) -> bool {
matches!(
self,
JoinType::Inner
| JoinType::InnerAny
| JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
)
}
}

impl Display for JoinType {
Expand Down
Loading