Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl RuntimeFilterInfo {
pub struct RuntimeFilterEntry {
pub id: usize,
pub probe_expr: Expr<String>,
pub bloom: Option<RuntimeFilterBloom>,
pub bloom: Option<Arc<RuntimeFilterBloom>>,
pub inlist: Option<Expr<String>>,
pub min_max: Option<Expr<String>>,
pub stats: Arc<RuntimeFilterStats>,
Expand All @@ -69,7 +69,6 @@ pub struct RuntimeFilterEntry {
pub enabled: bool,
}

#[derive(Clone)]
pub struct RuntimeFilterBloom {
pub column_name: String,
pub filter: RuntimeBloomFilter,
Expand Down
12 changes: 12 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,18 @@ pub trait TableContext: Send + Sync {

fn get_runtime_filter_ready(&self, table_index: usize) -> Vec<Arc<RuntimeFilterReady>>;

/// Set the pushed runtime filter statistics for a scan_id
/// Parameters: scan_id, selectivity (0.0-1.0), row count
fn set_pushed_runtime_filter_stats(&self, _scan_id: usize, _selectivity: f64, _rows: u64) {
unimplemented!()
}

/// Get the pushed runtime filter statistics for a scan_id
/// Returns: Option<(selectivity, rows)>
fn get_pushed_runtime_filter_stats(&self, _scan_id: usize) -> Option<(f64, u64)> {
unimplemented!()
}

fn clear_runtime_filter(&self);
fn assert_no_runtime_filter_state(&self) -> Result<()> {
unimplemented!()
Expand Down
26 changes: 24 additions & 2 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use databend_common_sql::ScalarExpr;
use databend_common_sql::TypeCheck;
use tokio::sync::Barrier;

use super::runtime_filter::supported_join_type_for_runtime_filter;
use super::runtime_filter::PhysicalRuntimeFilters;
use super::PhysicalPlanCast;
use crate::physical_plans::explain::PlanStatsInfo;
Expand Down Expand Up @@ -75,6 +76,7 @@ type JoinConditionsResult = (
Vec<Option<(RemoteExpr<String>, usize, usize, IndexType)>>,
Vec<((usize, bool), usize)>,
Vec<Option<IndexType>>,
Vec<ScalarExpr>, // Build key ScalarExprs for equivalence class checking
);

type ProjectionsResult = (
Expand Down Expand Up @@ -508,16 +510,29 @@ impl HashJoin {
}

impl PhysicalPlanBuilder {
/// Builds the physical plans for both sides of the join
pub async fn build_join_sides(
&mut self,
s_expr: &SExpr,
join: Option<&Join>,
left_required: ColumnSet,
right_required: ColumnSet,
) -> Result<(PhysicalPlan, PhysicalPlan)> {
let probe_side = self.build(s_expr.left_child(), left_required).await?;

let should_track = join
.map(|j| supported_join_type_for_runtime_filter(&j.join_type))
.unwrap_or(false);

if should_track {
self.runtime_filter_anchors.push(Arc::new(s_expr.clone()));
}

let build_side = self.build(s_expr.right_child(), right_required).await?;

if should_track {
self.runtime_filter_anchors.pop();
}

Ok((probe_side, build_side))
}

Expand Down Expand Up @@ -788,6 +803,7 @@ impl PhysicalPlanBuilder {
) -> Result<JoinConditionsResult> {
let mut left_join_conditions = Vec::new();
let mut right_join_conditions = Vec::new();
let mut right_join_conditions_scalar = Vec::new();
let mut is_null_equal = Vec::new();
let mut left_join_conditions_rt = Vec::new();
let mut probe_to_build_index = Vec::new();
Expand Down Expand Up @@ -890,6 +906,7 @@ impl PhysicalPlanBuilder {
// Add to result collections
left_join_conditions.push(left_expr.as_remote_expr());
right_join_conditions.push(right_expr.as_remote_expr());
right_join_conditions_scalar.push(right_condition.clone());
is_null_equal.push(condition.is_null_equal);
left_join_conditions_rt.push(left_expr_for_runtime_filter.map(
|(expr, scan_id, table_index, column_idx)| {
Expand All @@ -906,6 +923,7 @@ impl PhysicalPlanBuilder {
left_join_conditions_rt,
probe_to_build_index,
build_table_indexes,
right_join_conditions_scalar,
))
}

Expand Down Expand Up @@ -1270,7 +1288,7 @@ impl PhysicalPlanBuilder {
) -> Result<PhysicalPlan> {
// Step 1: Build probe and build sides
let (mut probe_side, mut build_side) = self
.build_join_sides(s_expr, left_required, right_required)
.build_join_sides(s_expr, Some(join), left_required, right_required)
.await?;

// Step 2: Prepare column projections
Expand All @@ -1292,6 +1310,7 @@ impl PhysicalPlanBuilder {
left_join_conditions_rt,
mut probe_to_build_index,
build_table_indexes,
right_join_conditions_scalar,
) = self.process_equi_conditions(
join,
&probe_schema,
Expand Down Expand Up @@ -1337,8 +1356,11 @@ impl PhysicalPlanBuilder {
join,
s_expr,
&right_join_conditions,
&right_join_conditions_scalar,
left_join_conditions_rt,
build_table_indexes,
&self.runtime_filter_anchors,
&mut self.join_equivalence_classes,
)
.await?;

Expand Down
11 changes: 11 additions & 0 deletions src/query/service/src/physical_plans/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot;

use crate::physical_plans::explain::PlanStatsInfo;
use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::runtime_filter::JoinEquivalenceClasses;

pub struct PhysicalPlanBuilder {
pub metadata: MetadataRef,
Expand All @@ -39,6 +40,9 @@ pub struct PhysicalPlanBuilder {
pub dry_run: bool,
// DataMutation info, used to build MergeInto physical plan
pub mutation_build_info: Option<MutationBuildInfo>,
pub runtime_filter_anchors: Vec<Arc<SExpr>>,
pub join_equivalence_classes: JoinEquivalenceClasses,
pub inited: bool,
}

impl PhysicalPlanBuilder {
Expand All @@ -50,6 +54,9 @@ impl PhysicalPlanBuilder {
func_ctx,
dry_run,
mutation_build_info: None,
runtime_filter_anchors: Vec::new(),
join_equivalence_classes: JoinEquivalenceClasses::default(),
inited: false,
}
}

Expand All @@ -63,6 +70,10 @@ impl PhysicalPlanBuilder {
}

pub async fn build(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result<PhysicalPlan> {
if !self.inited {
self.join_equivalence_classes = JoinEquivalenceClasses::build_from_sexpr(s_expr);
self.inited = true;
}
let mut plan = self.build_physical_plan(s_expr, required).await?;
plan.adjust_plan_id(&mut 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl PhysicalPlanBuilder {

// Construct IEJoin
let (right_side, left_side) = self
.build_join_sides(s_expr, left_required, right_required)
.build_join_sides(s_expr, None, left_required, right_required)
.await?;

let left_schema = self.prepare_probe_schema(join_type, &left_side)?;
Expand Down
Loading
Loading