diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs index 5b108f96fd46f..e53621765fd55 100644 --- a/src/query/catalog/src/runtime_filter_info.rs +++ b/src/query/catalog/src/runtime_filter_info.rs @@ -60,7 +60,7 @@ impl RuntimeFilterInfo { pub struct RuntimeFilterEntry { pub id: usize, pub probe_expr: Expr, - pub bloom: Option, + pub bloom: Option>, pub inlist: Option>, pub min_max: Option>, pub stats: Arc, @@ -69,7 +69,6 @@ pub struct RuntimeFilterEntry { pub enabled: bool, } -#[derive(Clone)] pub struct RuntimeFilterBloom { pub column_name: String, pub filter: RuntimeBloomFilter, diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 06f28ab50669b..d02a19dafa3c7 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -359,6 +359,18 @@ pub trait TableContext: Send + Sync { fn get_runtime_filter_ready(&self, table_index: usize) -> Vec>; + /// Set the pushed runtime filter statistics for a scan_id + /// Parameters: scan_id, selectivity (0.0-1.0), row count + fn set_pushed_runtime_filter_stats(&self, _scan_id: usize, _selectivity: f64, _rows: u64) { + unimplemented!() + } + + /// Get the pushed runtime filter statistics for a scan_id + /// Returns: Option<(selectivity, rows)> + fn get_pushed_runtime_filter_stats(&self, _scan_id: usize) -> Option<(f64, u64)> { + unimplemented!() + } + fn clear_runtime_filter(&self); fn assert_no_runtime_filter_state(&self) -> Result<()> { unimplemented!() diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index f5c1bbef6464b..8e0254a652c58 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -44,6 +44,7 @@ use databend_common_sql::ScalarExpr; use databend_common_sql::TypeCheck; use tokio::sync::Barrier; +use super::runtime_filter::supported_join_type_for_runtime_filter; use super::runtime_filter::PhysicalRuntimeFilters; use super::PhysicalPlanCast; use crate::physical_plans::explain::PlanStatsInfo; @@ -75,6 +76,7 @@ type JoinConditionsResult = ( Vec, usize, usize, IndexType)>>, Vec<((usize, bool), usize)>, Vec>, + Vec, // Build key ScalarExprs for equivalence class checking ); type ProjectionsResult = ( @@ -508,16 +510,29 @@ impl HashJoin { } impl PhysicalPlanBuilder { - /// Builds the physical plans for both sides of the join pub async fn build_join_sides( &mut self, s_expr: &SExpr, + join: Option<&Join>, left_required: ColumnSet, right_required: ColumnSet, ) -> Result<(PhysicalPlan, PhysicalPlan)> { let probe_side = self.build(s_expr.left_child(), left_required).await?; + + let should_track = join + .map(|j| supported_join_type_for_runtime_filter(&j.join_type)) + .unwrap_or(false); + + if should_track { + self.runtime_filter_anchors.push(Arc::new(s_expr.clone())); + } + let build_side = self.build(s_expr.right_child(), right_required).await?; + if should_track { + self.runtime_filter_anchors.pop(); + } + Ok((probe_side, build_side)) } @@ -788,6 +803,7 @@ impl PhysicalPlanBuilder { ) -> Result { let mut left_join_conditions = Vec::new(); let mut right_join_conditions = Vec::new(); + let mut right_join_conditions_scalar = Vec::new(); let mut is_null_equal = Vec::new(); let mut left_join_conditions_rt = Vec::new(); let mut probe_to_build_index = Vec::new(); @@ -890,6 +906,7 @@ impl PhysicalPlanBuilder { // Add to result collections left_join_conditions.push(left_expr.as_remote_expr()); right_join_conditions.push(right_expr.as_remote_expr()); + right_join_conditions_scalar.push(right_condition.clone()); is_null_equal.push(condition.is_null_equal); left_join_conditions_rt.push(left_expr_for_runtime_filter.map( |(expr, scan_id, table_index, column_idx)| { @@ -906,6 +923,7 @@ impl PhysicalPlanBuilder { left_join_conditions_rt, probe_to_build_index, build_table_indexes, + right_join_conditions_scalar, )) } @@ -1270,7 +1288,7 @@ impl PhysicalPlanBuilder { ) -> Result { // Step 1: Build probe and build sides let (mut probe_side, mut build_side) = self - .build_join_sides(s_expr, left_required, right_required) + .build_join_sides(s_expr, Some(join), left_required, right_required) .await?; // Step 2: Prepare column projections @@ -1292,6 +1310,7 @@ impl PhysicalPlanBuilder { left_join_conditions_rt, mut probe_to_build_index, build_table_indexes, + right_join_conditions_scalar, ) = self.process_equi_conditions( join, &probe_schema, @@ -1337,8 +1356,11 @@ impl PhysicalPlanBuilder { join, s_expr, &right_join_conditions, + &right_join_conditions_scalar, left_join_conditions_rt, build_table_indexes, + &self.runtime_filter_anchors, + &mut self.join_equivalence_classes, ) .await?; diff --git a/src/query/service/src/physical_plans/physical_plan_builder.rs b/src/query/service/src/physical_plans/physical_plan_builder.rs index 5190a683affe3..1337e063f8f5c 100644 --- a/src/query/service/src/physical_plans/physical_plan_builder.rs +++ b/src/query/service/src/physical_plans/physical_plan_builder.rs @@ -31,6 +31,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::physical_plans::explain::PlanStatsInfo; use crate::physical_plans::physical_plan::PhysicalPlan; +use crate::physical_plans::runtime_filter::JoinEquivalenceClasses; pub struct PhysicalPlanBuilder { pub metadata: MetadataRef, @@ -39,6 +40,9 @@ pub struct PhysicalPlanBuilder { pub dry_run: bool, // DataMutation info, used to build MergeInto physical plan pub mutation_build_info: Option, + pub runtime_filter_anchors: Vec>, + pub join_equivalence_classes: JoinEquivalenceClasses, + pub inited: bool, } impl PhysicalPlanBuilder { @@ -50,6 +54,9 @@ impl PhysicalPlanBuilder { func_ctx, dry_run, mutation_build_info: None, + runtime_filter_anchors: Vec::new(), + join_equivalence_classes: JoinEquivalenceClasses::default(), + inited: false, } } @@ -63,6 +70,10 @@ impl PhysicalPlanBuilder { } pub async fn build(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result { + if !self.inited { + self.join_equivalence_classes = JoinEquivalenceClasses::build_from_sexpr(s_expr); + self.inited = true; + } let mut plan = self.build_physical_plan(s_expr, required).await?; plan.adjust_plan_id(&mut 0); diff --git a/src/query/service/src/physical_plans/physical_range_join.rs b/src/query/service/src/physical_plans/physical_range_join.rs index 9c9723edf6e73..358376549d0b7 100644 --- a/src/query/service/src/physical_plans/physical_range_join.rs +++ b/src/query/service/src/physical_plans/physical_range_join.rs @@ -226,7 +226,7 @@ impl PhysicalPlanBuilder { // Construct IEJoin let (right_side, left_side) = self - .build_join_sides(s_expr, left_required, right_required) + .build_join_sides(s_expr, None, left_required, right_required) .await?; let left_schema = self.prepare_probe_schema(join_type, &left_side)?; diff --git a/src/query/service/src/physical_plans/runtime_filter/builder.rs b/src/query/service/src/physical_plans/runtime_filter/builder.rs index 60cf42e3685b5..587a79d57dc1a 100644 --- a/src/query/service/src/physical_plans/runtime_filter/builder.rs +++ b/src/query/service/src/physical_plans/runtime_filter/builder.rs @@ -23,7 +23,6 @@ use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::optimizer::ir::SExpr; use databend_common_sql::plans::Exchange; use databend_common_sql::plans::Join; -use databend_common_sql::plans::JoinEquiCondition; use databend_common_sql::plans::JoinType; use databend_common_sql::plans::RelOperator; use databend_common_sql::plans::ScalarExpr; @@ -35,6 +34,60 @@ use databend_common_sql::TypeCheck; use super::types::PhysicalRuntimeFilter; use super::types::PhysicalRuntimeFilters; +type ScalarExprId = usize; + +#[derive(Default)] +pub struct JoinEquivalenceClasses { + expr_to_id: HashMap, + uf: UnionFind, + next_id: ScalarExprId, +} + +impl JoinEquivalenceClasses { + pub fn build_from_sexpr(s_expr: &SExpr) -> Self { + let mut ec = Self::default(); + ec.collect_equi_conditions_recursive(s_expr); + ec + } + + fn collect_equi_conditions_recursive(&mut self, s_expr: &SExpr) { + if let RelOperator::Join(join) = s_expr.plan() { + if matches!(join.join_type, JoinType::Inner) { + for cond in &join.equi_conditions { + let left_id = self.get_or_create_id(&cond.left); + let right_id = self.get_or_create_id(&cond.right); + self.uf.union(left_id, right_id); + } + } + } + + for child in s_expr.children() { + self.collect_equi_conditions_recursive(child); + } + } + + fn get_or_create_id(&mut self, expr: &ScalarExpr) -> ScalarExprId { + if let Some(&id) = self.expr_to_id.get(expr) { + return id; + } + + let id = self.next_id; + self.next_id += 1; + self.expr_to_id.insert(expr.clone(), id); + id + } + + pub fn are_equivalent(&mut self, expr1: &ScalarExpr, expr2: &ScalarExpr) -> bool { + let id1 = self.expr_to_id.get(expr1); + let id2 = self.expr_to_id.get(expr2); + + match (id1, id2) { + (Some(&id1), Some(&id2)) => self.uf.find(id1) == self.uf.find(id2), + _ => false, + } + } +} + /// Type alias for probe keys with runtime filter information /// Contains: (RemoteExpr, scan_id, table_index, column_idx) type ProbeKeysWithRuntimeFilter = Vec, usize, usize, IndexType)>>; @@ -89,17 +142,16 @@ pub async fn build_runtime_filter( join: &Join, s_expr: &SExpr, build_keys: &[RemoteExpr], + build_keys_scalar: &[ScalarExpr], probe_keys: ProbeKeysWithRuntimeFilter, build_table_indexes: Vec>, + runtime_filter_anchors: &[Arc], + join_equivalence_classes: &mut JoinEquivalenceClasses, ) -> Result { if !ctx.get_settings().get_enable_join_runtime_filter()? { return Ok(Default::default()); } - if !supported_join_type_for_runtime_filter(&join.join_type) { - return Ok(Default::default()); - } - let build_side = s_expr.build_side_child(); let build_side_data_distribution = build_side.get_data_distribution()?; if build_side_data_distribution.as_ref().is_some_and(|e| { @@ -113,33 +165,74 @@ pub async fn build_runtime_filter( let mut filters = Vec::new(); - let probe_side = s_expr.probe_side_child(); - // Process each probe key that has runtime filter information - for (build_key, probe_key, scan_id, _table_index, column_idx, build_table_index) in build_keys + for ( + build_key, + build_key_scalar, + _probe_key, + _scan_id, + _table_index, + _column_idx, + build_table_index, + ) in build_keys .iter() + .zip(build_keys_scalar.iter()) .zip(probe_keys.into_iter()) .zip(build_table_indexes.into_iter()) - .filter_map(|((b, p), table_idx)| { + .filter_map(|(((b, b_scalar), p), table_idx)| { p.map(|(p, scan_id, table_index, column_idx)| { - (b, p, scan_id, table_index, column_idx, table_idx) + (b, b_scalar, p, scan_id, table_index, column_idx, table_idx) }) }) { - // Skip if the probe expression is neither a direct column reference nor a - // cast from not null to nullable type (e.g. CAST(col AS Nullable(T))). - match &probe_key { - RemoteExpr::ColumnRef { .. } => {} - RemoteExpr::Cast { - expr: box RemoteExpr::ColumnRef { data_type, .. }, - dest_type, - .. - } if &dest_type.remove_nullable() == data_type => {} - _ => continue, + let mut probe_targets = Vec::new(); + + for anchor in runtime_filter_anchors + .iter() + .map(|a| a.as_ref()) + .chain(supported_join_type_for_runtime_filter(&join.join_type).then_some(s_expr)) + { + // Get anchor's equi_conditions + let anchor_join = if let RelOperator::Join(anchor_join) = anchor.plan() { + anchor_join + } else { + continue; + }; + + let anchor_probe_key = anchor_join + .equi_conditions + .iter() + .find(|cond| join_equivalence_classes.are_equivalent(build_key_scalar, &cond.right)) + .map(|cond| &cond.left); + + let anchor_probe_key = match anchor_probe_key { + Some(key) => key, + None => continue, + }; + + // First, add the anchor's probe key itself as a probe target + if let Some((remote_expr, scan_id, _)) = + scalar_to_remote_expr(metadata, anchor_probe_key)? + { + if is_valid_probe_target(&remote_expr) { + probe_targets.push((remote_expr, scan_id)); + } + } + + // Then search only in the anchor's probe side subtree + let anchor_probe_side = anchor.probe_side_child(); + find_probe_targets_in_tree( + anchor_probe_side, + anchor_probe_key, + join_equivalence_classes, + metadata, + &mut probe_targets, + )?; } - let probe_targets = - find_probe_targets(metadata, probe_side, &probe_key, scan_id, column_idx)?; + if probe_targets.is_empty() { + continue; + } let build_table_rows = get_build_table_rows(ctx.clone(), metadata, build_table_index).await?; @@ -188,58 +281,68 @@ async fn get_build_table_rows( Ok(None) } -fn find_probe_targets( - metadata: &MetadataRef, - s_expr: &SExpr, - probe_key: &RemoteExpr, - probe_scan_id: usize, - probe_key_col_idx: IndexType, -) -> Result, usize)>> { - let mut uf = UnionFind::new(); - let mut column_to_remote: HashMap, usize)> = HashMap::new(); - column_to_remote.insert(probe_key_col_idx, (probe_key.clone(), probe_scan_id)); - - let equi_conditions = collect_equi_conditions(s_expr)?; - for cond in equi_conditions { - if let ( - Some((left_remote, left_scan_id, left_idx)), - Some((right_remote, right_scan_id, right_idx)), - ) = ( - scalar_to_remote_expr(metadata, &cond.left)?, - scalar_to_remote_expr(metadata, &cond.right)?, - ) { - uf.union(left_idx, right_idx); - column_to_remote.insert(left_idx, (left_remote, left_scan_id)); - column_to_remote.insert(right_idx, (right_remote, right_scan_id)); - } - } - - let equiv_class = uf.get_equivalence_class(probe_key_col_idx); - - let mut result = Vec::new(); - for idx in equiv_class { - if let Some((remote_expr, scan_id)) = column_to_remote.get(&idx) { - result.push((remote_expr.clone(), *scan_id)); - } +/// Check if a RemoteExpr is valid for probe target +/// +/// Valid probe targets are: +/// 1. Direct column reference +/// 2. Cast from not null to nullable type (e.g. CAST(col AS Nullable(T))) +fn is_valid_probe_target(remote_expr: &RemoteExpr) -> bool { + match remote_expr { + RemoteExpr::ColumnRef { .. } => true, + RemoteExpr::Cast { + expr: box RemoteExpr::ColumnRef { data_type, .. }, + dest_type, + .. + } if &dest_type.remove_nullable() == data_type => true, + _ => false, } - - Ok(result) } -fn collect_equi_conditions(s_expr: &SExpr) -> Result> { - let mut conditions = Vec::new(); - +/// Find probe targets in an SExpr tree by checking equivalence with the target probe key +fn find_probe_targets_in_tree( + s_expr: &SExpr, + target_probe_key: &ScalarExpr, + join_equivalence_classes: &mut JoinEquivalenceClasses, + metadata: &MetadataRef, + results: &mut Vec<(RemoteExpr, usize)>, +) -> Result<()> { if let RelOperator::Join(join) = s_expr.plan() { - if matches!(join.join_type, JoinType::Inner) { - conditions.extend(join.equi_conditions.clone()); + for cond in &join.equi_conditions { + // Check if left (probe) key is equivalent to target + if join_equivalence_classes.are_equivalent(target_probe_key, &cond.left) { + if let Some((remote_expr, scan_id, _)) = + scalar_to_remote_expr(metadata, &cond.left)? + { + if is_valid_probe_target(&remote_expr) { + results.push((remote_expr, scan_id)); + } + } + } + // Check if right (build) key is equivalent to target + if join_equivalence_classes.are_equivalent(target_probe_key, &cond.right) { + if let Some((remote_expr, scan_id, _)) = + scalar_to_remote_expr(metadata, &cond.right)? + { + if is_valid_probe_target(&remote_expr) { + results.push((remote_expr, scan_id)); + } + } + } } } + // Recursively traverse children for child in s_expr.children() { - conditions.extend(collect_equi_conditions(child)?); + find_probe_targets_in_tree( + child, + target_probe_key, + join_equivalence_classes, + metadata, + results, + )?; } - Ok(conditions) + Ok(()) } fn scalar_to_remote_expr( @@ -270,17 +373,12 @@ fn scalar_to_remote_expr( Ok(None) } +#[derive(Default)] struct UnionFind { parent: HashMap, } impl UnionFind { - fn new() -> Self { - Self { - parent: HashMap::new(), - } - } - fn find(&mut self, x: IndexType) -> IndexType { if !self.parent.contains_key(&x) { self.parent.insert(x, x); @@ -302,13 +400,4 @@ impl UnionFind { self.parent.insert(root_x, root_y); } } - - fn get_equivalence_class(&mut self, x: IndexType) -> Vec { - let root = self.find(x); - let all_keys: Vec = self.parent.keys().copied().collect(); - all_keys - .into_iter() - .filter(|&k| self.find(k) == root) - .collect() - } } diff --git a/src/query/service/src/physical_plans/runtime_filter/mod.rs b/src/query/service/src/physical_plans/runtime_filter/mod.rs index d28e5ff6a8968..f82b82dfe0be5 100644 --- a/src/query/service/src/physical_plans/runtime_filter/mod.rs +++ b/src/query/service/src/physical_plans/runtime_filter/mod.rs @@ -16,4 +16,6 @@ mod builder; mod types; pub use builder::build_runtime_filter; +pub use builder::supported_join_type_for_runtime_filter; +pub use builder::JoinEquivalenceClasses; pub use types::*; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index 90f2a8a7737fa..c76e43fe97c0a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -21,6 +21,7 @@ use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterStats; use databend_common_catalog::sbbf::Sbbf; use databend_common_catalog::sbbf::SbbfAtomic; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::type_check; @@ -49,76 +50,133 @@ use crate::pipelines::processors::transforms::hash_join::util::min_max_filter; /// Each runtime filter (identified by packet.id) is built once and then applied to multiple scans. /// The probe_targets in RuntimeFilterDesc specify all (probe_key, scan_id) pairs where this filter should be applied. pub async fn build_runtime_filter_infos( + ctx: Arc, packet: JoinRuntimeFilterPacket, runtime_filter_descs: HashMap, selectivity_threshold: u64, max_threads: usize, ) -> Result> { - let total_build_rows = packet.build_rows; let Some(packets) = packet.packets else { return Ok(HashMap::new()); }; + let build_rows = packet.build_rows; let mut filters: HashMap = HashMap::new(); - // Iterate over all runtime filter packets for packet in packets.into_values() { let desc = runtime_filter_descs.get(&packet.id).unwrap(); - let enabled = should_enable_runtime_filter(desc, total_build_rows, selectivity_threshold); + let enabled = should_enable_runtime_filter(desc, build_rows, selectivity_threshold); + let selectivity = calc_selectivity(desc.build_table_rows, build_rows); + + // Collect valid targets (not skipped by selectivity check) + let valid_targets: Vec<_> = desc + .probe_targets + .iter() + .filter(|(_, scan_id)| { + !should_skip_filter(&ctx, enabled, *scan_id, selectivity, build_rows, desc.id) + }) + .collect(); + + if valid_targets.is_empty() { + continue; + } + + // Build bloom filter once (expensive) + let bloom_filter = match (enabled, &packet.bloom) { + (true, Some(bloom)) => Some(build_sbbf(bloom.clone(), max_threads, desc.id).await?), + _ => None, + }; + + for (probe_key, scan_id) in valid_targets { + if enabled { + ctx.set_pushed_runtime_filter_stats(*scan_id, selectivity, build_rows as u64); + } - // Apply this single runtime filter to all probe targets (scan_id, probe_key pairs) - // This implements the design goal: "one runtime filter built once, pushed down to multiple scans" - for (probe_key, scan_id) in &desc.probe_targets { - let entry = filters.entry(*scan_id).or_default(); + let bloom = bloom_filter + .as_ref() + .map(|f| build_bloom_filter_from_sbbf(f.clone(), probe_key)) + .transpose()?; + + let inlist = match (enabled, &packet.inlist) { + (true, Some(inlist)) => Some(build_inlist_filter(inlist.clone(), probe_key)?), + _ => None, + }; + + let min_max = match (enabled, &packet.min_max) { + (true, Some(min_max)) => Some(build_min_max_filter( + min_max.clone(), + probe_key, + &desc.build_key, + )?), + _ => None, + }; let runtime_entry = RuntimeFilterEntry { id: desc.id, probe_expr: probe_key.clone(), - bloom: if enabled { - if let Some(ref bloom) = packet.bloom { - Some( - build_bloom_filter(bloom.clone(), probe_key, max_threads, desc.id) - .await?, - ) - } else { - None - } - } else { - None - }, - inlist: if enabled { - if let Some(ref inlist) = packet.inlist { - Some(build_inlist_filter(inlist.clone(), probe_key)?) - } else { - None - } - } else { - None - }, - min_max: if enabled { - if let Some(ref min_max) = packet.min_max { - Some(build_min_max_filter( - min_max.clone(), - probe_key, - &desc.build_key, - )?) - } else { - None - } - } else { - None - }, + bloom, + inlist, + min_max, stats: Arc::new(RuntimeFilterStats::new()), - build_rows: total_build_rows, + build_rows, build_table_rows: desc.build_table_rows, enabled, }; - entry.filters.push(runtime_entry); + filters + .entry(*scan_id) + .or_default() + .filters + .push(runtime_entry); } } Ok(filters) } +fn calc_selectivity(build_table_rows: Option, total_build_rows: usize) -> f64 { + build_table_rows + .filter(|&rows| rows > 0) + .map(|rows| total_build_rows as f64 / rows as f64) + .unwrap_or(1.0) +} + +fn should_skip_filter( + ctx: &Arc, + enabled: bool, + scan_id: usize, + selectivity: f64, + total_build_rows: usize, + filter_id: usize, +) -> bool { + if !enabled { + return true; + } + + let Some((existing_selectivity, existing_rows)) = ctx.get_pushed_runtime_filter_stats(scan_id) + else { + return false; + }; + + // Skip if rows increased AND selectivity didn't improve by at least 50% + let rows_increased = total_build_rows as u64 >= existing_rows; + let selectivity_not_improved = selectivity > existing_selectivity * 0.5; + + if rows_increased && selectivity_not_improved { + log::info!( + "RUNTIME-FILTER: Skip filter {} for scan_id {} - rows increased ({} > {}) \ + without significant selectivity improvement ({:.4} vs {:.4})", + filter_id, + scan_id, + total_build_rows, + existing_rows, + selectivity, + existing_selectivity + ); + return true; + } + + false +} + fn build_inlist_filter(inlist: Column, probe_key: &Expr) -> Result> { if inlist.len() == 0 { return Ok(Expr::Constant(Constant { @@ -256,32 +314,15 @@ fn build_min_max_filter( Ok(min_max_filter) } -async fn build_bloom_filter( - bloom: Vec, - probe_key: &Expr, - max_threads: usize, - filter_id: usize, -) -> Result { - let probe_key = match probe_key { - Expr::ColumnRef(col) => col, - // Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T)) - Expr::Cast(cast) => match cast.expr.as_ref() { - Expr::ColumnRef(col) => col, - _ => unreachable!(), - }, - _ => unreachable!(), - }; - let column_name = probe_key.id.to_string(); +/// Build the Sbbf filter from bloom data (expensive operation, should be done once per packet) +async fn build_sbbf(bloom: Vec, max_threads: usize, filter_id: usize) -> Result> { let total_items = bloom.len(); if total_items < 3_000_000 { let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01) .map_err(|e| ErrorCode::Internal(e.to_string()))?; filter.insert_hash_batch(&bloom); - return Ok(RuntimeFilterBloom { - column_name, - filter: Arc::new(filter), - }); + return Ok(Arc::new(filter)); } let start = std::time::Instant::now(); @@ -295,10 +336,40 @@ async fn build_bloom_filter( start.elapsed() ); - Ok(RuntimeFilterBloom { + Ok(Arc::new(filter)) +} + +/// Build RuntimeFilterBloom from pre-built Sbbf filter (cheap operation, can be done per probe key) +fn build_bloom_filter_from_sbbf( + filter: Arc, + probe_key: &Expr, +) -> Result> { + let probe_key = match probe_key { + Expr::ColumnRef(col) => col, + // Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T)) + Expr::Cast(cast) => match cast.expr.as_ref() { + Expr::ColumnRef(col) => col, + _ => unreachable!(), + }, + _ => unreachable!(), + }; + let column_name = probe_key.id.to_string(); + + Ok(Arc::new(RuntimeFilterBloom { column_name, - filter: Arc::new(filter), - }) + filter, + })) +} + +#[allow(dead_code)] +async fn build_bloom_filter( + bloom: Vec, + probe_key: &Expr, + max_threads: usize, + filter_id: usize, +) -> Result> { + let filter = build_sbbf(bloom, max_threads, filter_id).await?; + build_bloom_filter_from_sbbf(filter, probe_key) } #[cfg(test)] diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs index d0f1fb003fb1e..6f47122c9c461 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs @@ -50,6 +50,7 @@ pub async fn build_and_push_down_runtime_filter( let max_threads = join.ctx.get_settings().get_max_threads()? as usize; let build_rows = packet.build_rows; let runtime_filter_infos = build_runtime_filter_infos( + join.ctx.clone(), packet, runtime_filter_descs, selectivity_threshold, diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs index 4757d0510a7d9..fe8d29ac37cea 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs @@ -87,6 +87,7 @@ impl RuntimeFiltersDesc { let runtime_filter_descs = self.filters_desc.iter().map(|r| (r.id, r)).collect(); let runtime_filter_infos = build_runtime_filter_infos( + self.ctx.clone(), packet, runtime_filter_descs, self.selectivity_threshold, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 525729e5a27e8..10276400e6fef 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1740,6 +1740,16 @@ impl TableContext for QueryContext { } } + fn set_pushed_runtime_filter_stats(&self, scan_id: usize, selectivity: f64, rows: u64) { + let mut stats = self.shared.pushed_runtime_filter_stats.write(); + stats.insert(scan_id, (selectivity, rows)); + } + + fn get_pushed_runtime_filter_stats(&self, scan_id: usize) -> Option<(f64, u64)> { + let stats = self.shared.pushed_runtime_filter_stats.read(); + stats.get(&scan_id).copied() + } + fn get_merge_into_join(&self) -> MergeIntoJoin { let merge_into_join = self.shared.merge_into_join.read(); MergeIntoJoin { @@ -1760,7 +1770,11 @@ impl TableContext for QueryContext { fn get_bloom_runtime_filter_with_id(&self, id: IndexType) -> Vec<(String, RuntimeBloomFilter)> { self.get_runtime_filters(id) .into_iter() - .filter_map(|entry| entry.bloom.map(|bloom| (bloom.column_name, bloom.filter))) + .filter_map(|entry| { + entry + .bloom + .map(|bloom| (bloom.column_name.clone(), bloom.filter.clone())) + }) .collect() } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index eb8d331e0ce52..5ac9e419e727c 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -159,6 +159,10 @@ pub struct QueryContextShared { pub(super) runtime_filter_ready: Arc>>>>, + /// Tracks pushed down filter statistics for each scan_id + /// Key: scan_id, Value: (selectivity, rows) + pub(super) pushed_runtime_filter_stats: Arc>>, + pub(super) merge_into_join: Arc>, // Records query level data cache metrics @@ -252,6 +256,7 @@ impl QueryContextShared { query_profiles: Arc::new(RwLock::new(HashMap::new())), runtime_filters: Default::default(), runtime_filter_ready: Default::default(), + pushed_runtime_filter_stats: Default::default(), merge_into_join: Default::default(), multi_table_insert_status: Default::default(), query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))),