diff --git a/src/common/base/src/mem_allocator/jemalloc.rs b/src/common/base/src/mem_allocator/jemalloc.rs index cffdc5dcd1c02..7a6b15ba57768 100644 --- a/src/common/base/src/mem_allocator/jemalloc.rs +++ b/src/common/base/src/mem_allocator/jemalloc.rs @@ -249,7 +249,7 @@ pub mod not_linux { #[inline(always)] unsafe fn deallocate(&self, ptr: NonNull, layout: Layout) { - StdAllocator.deallocate(ptr, layout) + unsafe { StdAllocator.deallocate(ptr, layout) } } unsafe fn grow( @@ -258,7 +258,7 @@ pub mod not_linux { old_layout: Layout, new_layout: Layout, ) -> Result, AllocError> { - StdAllocator.grow(ptr, old_layout, new_layout) + unsafe { StdAllocator.grow(ptr, old_layout, new_layout) } } unsafe fn grow_zeroed( @@ -267,7 +267,7 @@ pub mod not_linux { old_layout: Layout, new_layout: Layout, ) -> Result, AllocError> { - StdAllocator.grow_zeroed(ptr, old_layout, new_layout) + unsafe { StdAllocator.grow_zeroed(ptr, old_layout, new_layout) } } unsafe fn shrink( @@ -276,7 +276,7 @@ pub mod not_linux { old_layout: Layout, new_layout: Layout, ) -> Result, AllocError> { - StdAllocator.shrink(ptr, old_layout, new_layout) + unsafe { StdAllocator.shrink(ptr, old_layout, new_layout) } } } } diff --git a/src/common/exception/src/exception_backtrace.rs b/src/common/exception/src/exception_backtrace.rs index a1f9569fb2774..7835913faf2db 100644 --- a/src/common/exception/src/exception_backtrace.rs +++ b/src/common/exception/src/exception_backtrace.rs @@ -77,6 +77,7 @@ pub struct ResolvedStackFrame { pub column: Option, } +#[cfg(target_os = "linux")] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct PhysicalAddr { pub physical_addr: usize, diff --git a/src/query/catalog/src/sbbf.rs b/src/query/catalog/src/sbbf.rs index 3ae85a46d7619..2eb1c5f184303 100644 --- a/src/query/catalog/src/sbbf.rs +++ b/src/query/catalog/src/sbbf.rs @@ -239,7 +239,7 @@ impl Sbbf { /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. - pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { + pub fn new_with_num_of_bytes(num_bytes: usize) -> Self { let num_bytes = optimal_num_of_bytes(num_bytes); assert_eq!(num_bytes % size_of::(), 0); let num_blocks = num_bytes / size_of::(); @@ -307,6 +307,52 @@ impl Sbbf { pub fn estimated_memory_size(&self) -> usize { self.0.capacity() * std::mem::size_of::() } + + /// Serialize the bloom filter into a little-endian byte array. + /// The layout is a contiguous sequence of blocks, each block consisting + /// of 8 u32 values in little-endian order. + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::with_capacity(self.0.len() * size_of::()); + for block in &self.0 { + for value in block.0 { + bytes.extend_from_slice(&value.to_le_bytes()); + } + } + bytes + } + + /// Deserialize a bloom filter from bytes produced by `to_bytes`. + pub fn from_bytes(bytes: &[u8]) -> Result { + if !bytes.len().is_multiple_of(size_of::()) { + return Err(format!( + "Invalid bloom filter bytes length {}, expected multiple of {}", + bytes.len(), + size_of::() + )); + } + + let num_blocks = bytes.len() / size_of::(); + if num_blocks == 0 { + return Ok(Sbbf(Vec::new())); + } + + let mut blocks = Vec::with_capacity(num_blocks); + let mut offset = 0; + for _ in 0..num_blocks { + let mut arr = [0u32; 8]; + for value in &mut arr { + let end = offset + size_of::(); + let chunk = bytes + .get(offset..end) + .ok_or_else(|| "Invalid bloom filter bytes".to_string())?; + *value = u32::from_le_bytes(chunk.try_into().unwrap()); + offset = end; + } + blocks.push(Block(arr)); + } + + Ok(Sbbf(blocks)) + } } impl SbbfAtomic { @@ -320,7 +366,7 @@ impl SbbfAtomic { Ok(Self::new_with_num_of_bytes(num_bits / 8)) } - pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { + pub fn new_with_num_of_bytes(num_bytes: usize) -> Self { let num_bytes = optimal_num_of_bytes(num_bytes); assert_eq!(size_of::(), size_of::()); assert_eq!(num_bytes % size_of::(), 0); diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index bd0fdd7be4cfa..a456617d92d00 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -368,6 +368,12 @@ pub trait BlockMetaInfo: Debug + Send + Sync + Any + 'static { "The reason for not implementing clone_self is usually because the higher-level logic doesn't allow/need the associated block to be cloned." ) } + + /// Overrides the global schema for a specific block, attaching a custom schema that will be used + /// exclusively for this block instead of the global default. + fn override_block_schema(&self) -> Option { + None + } } pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo { diff --git a/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs b/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs index 76dd17839b755..8661c9e3b4f96 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs @@ -20,7 +20,6 @@ use async_channel::Sender; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; use databend_common_pipeline::core::Event; use databend_common_pipeline::core::InputPort; @@ -70,8 +69,8 @@ pub struct SortSampleState { } pub trait BroadcastChannel: Clone + Send + 'static { - fn sender(&self) -> Sender; - fn receiver(&self) -> Receiver; + fn sender(&self) -> Sender; + fn receiver(&self) -> Receiver; } impl SortSampleState { @@ -91,7 +90,7 @@ impl SortSampleState { let is_empty = meta.is_none(); let meta = meta.map(|meta| meta.boxed()).unwrap_or(().boxed()); sender - .send(meta) + .send(DataBlock::empty_with_meta(meta)) .await .map_err(|_| ErrorCode::TokioError("send sort bounds failed"))?; sender.close(); @@ -99,8 +98,8 @@ impl SortSampleState { let receiver = self.channel.receiver(); let mut all = Vec::new(); - while let Ok(r) = receiver.recv().await { - match SortExchangeMeta::downcast_from_err(r) { + while let Ok(mut r) = receiver.recv().await { + match SortExchangeMeta::downcast_from_err(r.take_meta().unwrap()) { Ok(meta) => all.push(meta), Err(r) => { debug_assert!(().boxed().equals(&r)) diff --git a/src/query/service/src/physical_plans/runtime_filter/builder.rs b/src/query/service/src/physical_plans/runtime_filter/builder.rs index 2833f20caa018..c97e2b11c0184 100644 --- a/src/query/service/src/physical_plans/runtime_filter/builder.rs +++ b/src/query/service/src/physical_plans/runtime_filter/builder.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::Expr; use databend_common_expression::RemoteExpr; use databend_common_expression::types::DataType; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -24,6 +25,7 @@ use databend_common_sql::ColumnEntry; use databend_common_sql::IndexType; use databend_common_sql::MetadataRef; use databend_common_sql::TypeCheck; +use databend_common_sql::optimizer::ir::ColumnStatSet; use databend_common_sql::optimizer::ir::SExpr; use databend_common_sql::plans::Exchange; use databend_common_sql::plans::Join; @@ -113,6 +115,11 @@ pub async fn build_runtime_filter( let mut filters = Vec::new(); + // Derive statistics for the build side to estimate NDV of join keys. + let build_rel_expr = databend_common_sql::optimizer::ir::RelExpr::with_s_expr(build_side); + let build_stat_info = build_rel_expr.derive_cardinality()?; + let build_column_stats = &build_stat_info.statistics.column_stats; + let probe_side = s_expr.probe_side_child(); // Process each probe key that has runtime filter information @@ -144,10 +151,17 @@ pub async fn build_runtime_filter( let build_table_rows = get_build_table_rows(ctx.clone(), metadata, build_table_index).await?; - let data_type = build_key - .as_expr(&BUILTIN_FUNCTIONS) - .data_type() - .remove_nullable(); + let build_key_expr = build_key.as_expr(&BUILTIN_FUNCTIONS); + + // Estimate NDV for the build side join key using optimizer statistics. + // Handles all RemoteExpr variants by looking at the column references inside + // the expression. If the expression is constant, NDV is 1. If it contains + // exactly one column reference, reuse that column's NDV. Otherwise, fall + // back to the overall build-side cardinality. + let build_key_ndv = estimate_build_key_ndv(&build_key_expr, build_column_stats) + .unwrap_or_else(|| build_stat_info.cardinality.ceil() as u64); + + let data_type = build_key_expr.data_type().remove_nullable(); let id = metadata.write().next_runtime_filter_id(); let enable_bloom_runtime_filter = is_type_supported_for_bloom_filter(&data_type); @@ -159,6 +173,7 @@ pub async fn build_runtime_filter( id, build_key: build_key.clone(), probe_targets, + build_key_ndv, build_table_rows, enable_bloom_runtime_filter, enable_inlist_runtime_filter: true, @@ -170,6 +185,23 @@ pub async fn build_runtime_filter( Ok(PhysicalRuntimeFilters { filters }) } +fn estimate_build_key_ndv( + build_key: &Expr, + build_column_stats: &ColumnStatSet, +) -> Option { + let mut column_refs = build_key.column_refs(); + if column_refs.is_empty() { + return Some(1); + } + + if column_refs.len() == 1 { + let (id, _) = column_refs.drain().next().unwrap(); + build_column_stats.get(&id).map(|s| s.ndv.ceil() as u64) + } else { + None + } +} + async fn get_build_table_rows( ctx: Arc, metadata: &MetadataRef, diff --git a/src/query/service/src/physical_plans/runtime_filter/types.rs b/src/query/service/src/physical_plans/runtime_filter/types.rs index 11a7a9992f5c6..cf1f62e93ffe6 100644 --- a/src/query/service/src/physical_plans/runtime_filter/types.rs +++ b/src/query/service/src/physical_plans/runtime_filter/types.rs @@ -42,6 +42,9 @@ pub struct PhysicalRuntimeFilter { /// All probe targets in this list are in the same equivalence class pub probe_targets: Vec<(RemoteExpr, usize)>, + /// Estimated NDV of the build side join key, derived from optimizer statistics. + pub build_key_ndv: u64, + pub build_table_rows: Option, /// Enable bloom filter for this runtime filter diff --git a/src/query/service/src/pipelines/processors/transforms/broadcast.rs b/src/query/service/src/pipelines/processors/transforms/broadcast.rs index d68ce62653b4f..985b354cbfbc5 100644 --- a/src/query/service/src/pipelines/processors/transforms/broadcast.rs +++ b/src/query/service/src/pipelines/processors/transforms/broadcast.rs @@ -19,7 +19,6 @@ use async_channel::Sender; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; @@ -30,13 +29,13 @@ use databend_common_pipeline::sources::AsyncSource; use databend_common_pipeline::sources::AsyncSourcer; pub struct BroadcastSourceProcessor { - pub receiver: Receiver, + pub receiver: Receiver, } impl BroadcastSourceProcessor { pub fn create( ctx: Arc, - receiver: Receiver, + receiver: Receiver, output_port: Arc, ) -> Result { AsyncSourcer::create(ctx.get_scan_progress(), output_port, Self { receiver }) @@ -50,23 +49,20 @@ impl AsyncSource for BroadcastSourceProcessor { #[async_backtrace::framed] async fn generate(&mut self) -> Result> { - let received = self.receiver.recv().await; - match received { - Ok(meta) => Ok(Some(DataBlock::empty_with_meta(meta))), - Err(_) => { - // The channel is closed, we should return None to stop generating - Ok(None) - } + match self.receiver.recv().await { + Ok(block) => Ok(Some(block)), + // The channel is closed, we should return None to stop generating + Err(_) => Ok(None), } } } pub struct BroadcastSinkProcessor { - sender: Sender, + sender: Sender, } impl BroadcastSinkProcessor { - pub fn create(input: Arc, sender: Sender) -> Result { + pub fn create(input: Arc, sender: Sender) -> Result { Ok(ProcessorPtr::create(AsyncSinker::create(input, Self { sender, }))) @@ -82,12 +78,9 @@ impl AsyncSink for BroadcastSinkProcessor { Ok(()) } - async fn consume(&mut self, mut data_block: DataBlock) -> Result { - let meta = data_block - .take_meta() - .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?; + async fn consume(&mut self, data_block: DataBlock) -> Result { self.sender - .send(meta) + .send(data_block) .await .map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?; Ok(false) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index 1bd24c4750501..32845df3a9d0b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -70,6 +70,7 @@ pub struct RuntimeFilterDesc { pub id: usize, pub build_key: Expr, pub probe_targets: Vec<(Expr, usize)>, + pub build_key_ndv: u64, pub build_table_rows: Option, pub enable_bloom_runtime_filter: bool, pub enable_inlist_runtime_filter: bool, @@ -98,6 +99,7 @@ impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc { .iter() .map(|(probe_key, scan_id)| (probe_key.as_expr(&BUILTIN_FUNCTIONS), *scan_id)) .collect(), + build_key_ndv: runtime_filter.build_key_ndv, build_table_rows: runtime_filter.build_table_rows, enable_bloom_runtime_filter: runtime_filter.enable_bloom_runtime_filter, enable_inlist_runtime_filter: runtime_filter.enable_inlist_runtime_filter, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs index 90e7011adbda0..fec0479ac7b9b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs @@ -12,195 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::time::Instant; - -use databend_common_base::runtime::profile::Profile; -use databend_common_base::runtime::profile::ProfileStatisticsName; -use databend_common_exception::Result; -use databend_common_expression::Column; -use databend_common_expression::DataBlock; -use databend_common_expression::Evaluator; -use databend_common_expression::Expr; -use databend_common_expression::FunctionContext; -use databend_common_expression::RawExpr; -use databend_common_expression::Scalar; -use databend_common_expression::type_check; -use databend_common_expression::types::DataType; -use databend_common_functions::BUILTIN_FUNCTIONS; - -use super::packet::JoinRuntimeFilterPacket; -use super::packet::RuntimeFilterPacket; -use super::packet::SerializableDomain; use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc; -use crate::pipelines::processors::transforms::hash_join::util::hash_by_method_for_bloom; - -struct JoinRuntimeFilterPacketBuilder<'a> { - build_key_column: Column, - func_ctx: &'a FunctionContext, - inlist_threshold: usize, - bloom_threshold: usize, - min_max_threshold: usize, - selectivity_threshold: u64, -} - -impl<'a> JoinRuntimeFilterPacketBuilder<'a> { - fn new( - data_blocks: &'a [DataBlock], - func_ctx: &'a FunctionContext, - build_key: &Expr, - inlist_threshold: usize, - bloom_threshold: usize, - min_max_threshold: usize, - selectivity_threshold: u64, - ) -> Result { - let build_key_column = Self::eval_build_key_column(data_blocks, func_ctx, build_key)?; - Ok(Self { - func_ctx, - build_key_column, - inlist_threshold, - bloom_threshold, - min_max_threshold, - selectivity_threshold, - }) - } - fn build(&self, desc: &RuntimeFilterDesc) -> Result { - if !should_enable_runtime_filter( - desc, - self.build_key_column.len(), - self.selectivity_threshold, - ) { - return Ok(RuntimeFilterPacket { - id: desc.id, - inlist: None, - min_max: None, - bloom: None, - }); - } - let start = Instant::now(); - - let min_max_start = Instant::now(); - let min_max = self - .enable_min_max(desc) - .then(|| self.build_min_max()) - .transpose()?; - let min_max_time = min_max_start.elapsed(); - - let inlist_start = Instant::now(); - let inlist = self - .enable_inlist(desc) - .then(|| self.build_inlist()) - .transpose()?; - let inlist_time = inlist_start.elapsed(); - - let bloom_start = Instant::now(); - let bloom = self - .enable_bloom(desc) - .then(|| self.build_bloom(desc)) - .transpose()?; - let bloom_time = bloom_start.elapsed(); - - let total_time = start.elapsed(); - - Profile::record_usize_profile( - ProfileStatisticsName::RuntimeFilterBuildTime, - total_time.as_nanos() as usize, - ); - - log::info!( - "RUNTIME-FILTER: Built filter {} - total: {:?}, min_max: {:?}, inlist: {:?}, bloom: {:?}, rows: {}", - desc.id, - total_time, - min_max_time, - inlist_time, - bloom_time, - self.build_key_column.len() - ); - - Ok(RuntimeFilterPacket { - id: desc.id, - min_max, - inlist, - bloom, - }) - } - - fn enable_min_max(&self, desc: &RuntimeFilterDesc) -> bool { - desc.enable_min_max_runtime_filter && self.build_key_column.len() < self.min_max_threshold - } - - fn enable_inlist(&self, desc: &RuntimeFilterDesc) -> bool { - desc.enable_inlist_runtime_filter && self.build_key_column.len() < self.inlist_threshold - } - - fn enable_bloom(&self, desc: &RuntimeFilterDesc) -> bool { - if !desc.enable_bloom_runtime_filter { - return false; - } - - if self.build_key_column.len() >= self.bloom_threshold { - return false; - } - - true - } - - fn build_min_max(&self) -> Result { - let domain = self.build_key_column.remove_nullable().domain(); - let (min, max) = domain.to_minmax(); - Ok(SerializableDomain { min, max }) - } - - fn build_inlist(&self) -> Result { - self.dedup_column(&self.build_key_column) - } - - fn build_bloom(&self, desc: &RuntimeFilterDesc) -> Result> { - let data_type = desc.build_key.data_type(); - let num_rows = self.build_key_column.len(); - let method = DataBlock::choose_hash_method_with_types(&[data_type.clone()])?; - let mut hashes = Vec::with_capacity(num_rows); - let key_columns = &[self.build_key_column.clone().into()]; - hash_by_method_for_bloom(&method, key_columns.into(), num_rows, &mut hashes)?; - Ok(hashes) - } - - fn eval_build_key_column( - data_blocks: &[DataBlock], - func_ctx: &FunctionContext, - build_key: &Expr, - ) -> Result { - let mut columns = Vec::with_capacity(data_blocks.len()); - for block in data_blocks.iter() { - let evaluator = Evaluator::new(block, func_ctx, &BUILTIN_FUNCTIONS); - let column = evaluator - .run(build_key)? - .convert_to_full_column(build_key.data_type(), block.num_rows()); - columns.push(column); - } - Column::concat_columns(columns.into_iter()) - } - - fn dedup_column(&self, column: &Column) -> Result { - let array = RawExpr::Constant { - span: None, - scalar: Scalar::Array(column.clone()), - data_type: Some(DataType::Array(Box::new(column.data_type()))), - }; - let distinct_list = RawExpr::FunctionCall { - span: None, - name: "array_distinct".to_string(), - params: vec![], - args: vec![array], - }; - - let empty_key_block = DataBlock::empty(); - let evaluator = Evaluator::new(&empty_key_block, self.func_ctx, &BUILTIN_FUNCTIONS); - let value = evaluator.run(&type_check::check(&distinct_list, &BUILTIN_FUNCTIONS)?)?; - let array = value.into_scalar().unwrap().into_array().unwrap(); - Ok(array) - } -} pub(super) fn should_enable_runtime_filter( desc: &RuntimeFilterDesc, @@ -243,48 +55,3 @@ pub(super) fn should_enable_runtime_filter( false } } - -pub fn build_runtime_filter_packet( - build_chunks: &[DataBlock], - build_num_rows: usize, - runtime_filter_desc: &[RuntimeFilterDesc], - func_ctx: &FunctionContext, - inlist_threshold: usize, - bloom_threshold: usize, - min_max_threshold: usize, - selectivity_threshold: u64, - is_spill_happened: bool, -) -> Result { - if is_spill_happened { - return Ok(JoinRuntimeFilterPacket::disable_all( - runtime_filter_desc, - build_num_rows, - )); - } - if build_num_rows == 0 { - return Ok(JoinRuntimeFilterPacket { - packets: None, - build_rows: build_num_rows, - }); - } - let mut runtime_filters = HashMap::new(); - for rf in runtime_filter_desc { - runtime_filters.insert( - rf.id, - JoinRuntimeFilterPacketBuilder::new( - build_chunks, - func_ctx, - &rf.build_key, - inlist_threshold, - bloom_threshold, - min_max_threshold, - selectivity_threshold, - )? - .build(rf)?, - ); - } - Ok(JoinRuntimeFilterPacket { - packets: Some(runtime_filters), - build_rows: build_num_rows, - }) -} diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index e2dd6d45afea9..245fac5d2a5d1 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -36,6 +36,7 @@ use databend_common_expression::types::NumberScalar; use databend_common_functions::BUILTIN_FUNCTIONS; use super::builder::should_enable_runtime_filter; +use super::packet::BloomPayload; use super::packet::JoinRuntimeFilterPacket; use super::packet::SerializableDomain; use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc; @@ -119,6 +120,52 @@ pub async fn build_runtime_filter_infos( Ok(filters) } +/// Convert bloom payloads that are using hash lists into serialized bloom filters. +/// This is mainly used in distributed hash shuffle joins before global merging, +/// so that only compact bloom filters are transmitted between nodes. +pub fn convert_packet_bloom_hashes_to_filter( + _packet: &mut JoinRuntimeFilterPacket, + _runtime_filter_descs: &HashMap, + _max_threads: usize, +) -> Result<()> { + // let Some(ref mut map) = packet.packets else { + // return Ok(()); + // }; + // + // for (id, rf) in map.iter_mut() { + // let desc = match runtime_filter_descs.get(id) { + // Some(d) => *d, + // None => continue, + // }; + // + // // If we don't have global build table statistics, we cannot guarantee + // // consistent bloom filter size across nodes. Disable bloom for this + // // runtime filter in distributed mode. + // if desc.build_table_rows.is_none() { + // rf.bloom = None; + // continue; + // } + // + // if let Some(bloom) = rf.bloom.take() { + // rf.bloom = Some(match bloom { + // BloomPayload::Hashes(hashes) => { + // // If there are no hashes, keep bloom disabled. + // if hashes.is_empty() { + // continue; + // } + // + // let filter = build_sbbf_from_hashes(hashes, max_threads, rf.id, true)?; + // BloomPayload::Filter(SerializableBloomFilter { + // data: filter.to_bytes(), + // }) + // } + // BloomPayload::Filter(filter) => BloomPayload::Filter(filter), + // }); + // } + // } + Ok(()) +} + fn build_inlist_filter(inlist: Column, probe_key: &Expr) -> Result> { if inlist.len() == 0 { return Ok(Expr::Constant(Constant { @@ -256,38 +303,36 @@ fn build_min_max_filter( Ok(min_max_filter) } -async fn build_bloom_filter( +const FIXED_SBBF_BYTES: usize = 64 * 1024 * 1024; + +fn build_sbbf_from_hashes( bloom: Vec, - probe_key: &Expr, max_threads: usize, filter_id: usize, -) -> Result { - let probe_key = match probe_key { - Expr::ColumnRef(col) => col, - // Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T)) - Expr::Cast(cast) => match cast.expr.as_ref() { - Expr::ColumnRef(col) => col, - _ => unreachable!(), - }, - _ => unreachable!(), - }; - let column_name = probe_key.id.to_string(); + fixed_size: bool, +) -> Result { let total_items = bloom.len(); if total_items < 3_000_000 { - let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01) - .map_err(|e| ErrorCode::Internal(e.to_string()))?; + let mut filter = if fixed_size { + Sbbf::new_with_num_of_bytes(FIXED_SBBF_BYTES) + } else { + Sbbf::new_with_ndv_fpp(total_items as u64, 0.01) + .map_err(|e| ErrorCode::Internal(e.to_string()))? + }; filter.insert_hash_batch(&bloom); - return Ok(RuntimeFilterBloom { - column_name, - filter: Arc::new(filter), - }); + return Ok(filter); } let start = std::time::Instant::now(); - let builder = SbbfAtomic::new_with_ndv_fpp(total_items as u64, 0.01) - .map_err(|e| ErrorCode::Internal(e.to_string()))? - .insert_hash_batch_parallel(bloom, max_threads); + let builder = if fixed_size { + SbbfAtomic::new_with_num_of_bytes(FIXED_SBBF_BYTES) + .insert_hash_batch_parallel(bloom, max_threads) + } else { + SbbfAtomic::new_with_ndv_fpp(total_items as u64, 0.01) + .map_err(|e| ErrorCode::Internal(e.to_string()))? + .insert_hash_batch_parallel(bloom, max_threads) + }; let filter = builder.finish(); log::info!( "filter_id: {}, build_time: {:?}", @@ -295,10 +340,42 @@ async fn build_bloom_filter( start.elapsed() ); - Ok(RuntimeFilterBloom { - column_name, - filter: Arc::new(filter), - }) + Ok(filter) +} + +async fn build_bloom_filter( + bloom: BloomPayload, + probe_key: &Expr, + max_threads: usize, + filter_id: usize, +) -> Result { + let probe_key = match probe_key { + Expr::ColumnRef(col) => col, + // Support simple cast that only changes nullability, e.g. CAST(col AS Nullable(T)) + Expr::Cast(cast) => match cast.expr.as_ref() { + Expr::ColumnRef(col) => col, + _ => unreachable!(), + }, + _ => unreachable!(), + }; + let column_name = probe_key.id.to_string(); + match bloom { + BloomPayload::Hashes(hashes) => { + let filter = build_sbbf_from_hashes(hashes, max_threads, filter_id, false)?; + Ok(RuntimeFilterBloom { + column_name, + filter: Arc::new(filter), + }) + } + BloomPayload::Filter(serialized) => { + let filter = Sbbf::from_bytes(&serialized.data) + .map_err(|e| ErrorCode::Internal(e.to_string()))?; + Ok(RuntimeFilterBloom { + column_name, + filter: Arc::new(filter), + }) + } + } } #[cfg(test)] diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs index 581cbd28f0784..b463c634433a6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Instant; + use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoDowncast; use super::merge::merge_join_runtime_filter_packets; use super::packet::JoinRuntimeFilterPacket; @@ -29,14 +30,22 @@ pub async fn get_global_runtime_filter_packet( let receiver = ctx.broadcast_sink_receiver(broadcast_id); let mut received = vec![]; + let instant = Instant::now(); + sender - .send(Box::new(local_packet)) + .send(local_packet.try_into()?) .await .map_err(|_| ErrorCode::TokioError("send runtime filter shards failed"))?; sender.close(); - while let Ok(r) = receiver.recv().await { - received.push(JoinRuntimeFilterPacket::downcast_from(r).unwrap()); + while let Ok(data_block) = receiver.recv().await { + received.push(JoinRuntimeFilterPacket::try_from(data_block)?); } + + log::info!( + "RUNTIME-FILTER: broadcast runtime filter elapsed: {:?}", + instant.elapsed() + ); + merge_join_runtime_filter_packets(received) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs index 19b29bd197de6..1f3421d100966 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs @@ -18,6 +18,7 @@ use databend_common_exception::Result; use databend_common_storages_fuse::TableContext; use super::convert::build_runtime_filter_infos; +use super::convert::convert_packet_bloom_hashes_to_filter; use super::global::get_global_runtime_filter_packet; use crate::pipelines::processors::HashJoinBuildState; use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; @@ -28,7 +29,22 @@ pub async fn build_and_push_down_runtime_filter( ) -> Result<()> { let overall_start = Instant::now(); + let max_threads = join.ctx.get_settings().get_max_threads()? as usize; + let runtime_filter_descs: std::collections::HashMap< + usize, + &crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc, + > = join + .runtime_filter_desc() + .iter() + .map(|r| (r.id, r)) + .collect(); + if let Some(broadcast_id) = join.broadcast_id { + // For distributed hash shuffle joins, convert bloom hashes into compact + // bloom filters before performing global merge so that only filters are + // transmitted between nodes. + convert_packet_bloom_hashes_to_filter(&mut packet, &runtime_filter_descs, max_threads)?; + let merge_start = Instant::now(); packet = get_global_runtime_filter_packet(broadcast_id, packet, &join.ctx).await?; let merge_time = merge_start.elapsed(); @@ -38,16 +54,10 @@ pub async fn build_and_push_down_runtime_filter( ); } - let runtime_filter_descs = join - .runtime_filter_desc() - .iter() - .map(|r| (r.id, r)) - .collect(); let selectivity_threshold = join .ctx .get_settings() .get_join_runtime_filter_selectivity_threshold()?; - let max_threads = join.ctx.get_settings().get_max_threads()? as usize; let build_rows = packet.build_rows; let runtime_filter_infos = build_runtime_filter_infos( packet, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs index ab91a06cc8b6b..d2617853eccf3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs @@ -27,6 +27,7 @@ use databend_common_expression::types::DataType; use databend_common_functions::BUILTIN_FUNCTIONS; use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc; +use crate::pipelines::processors::transforms::hash_join::runtime_filter::packet::BloomPayload; use crate::pipelines::processors::transforms::hash_join::runtime_filter::packet::JoinRuntimeFilterPacket; use crate::pipelines::processors::transforms::hash_join::runtime_filter::packet::RuntimeFilterPacket; use crate::pipelines::processors::transforms::hash_join::runtime_filter::packet::SerializableDomain; @@ -153,7 +154,7 @@ impl SingleFilterBuilder { None }; - let bloom = self.bloom_hashes.take(); + let bloom = self.bloom_hashes.take().map(BloomPayload::Hashes); Ok(RuntimeFilterPacket { id: self.id, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/merge.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/merge.rs index 0f6d40a04da1a..e31dee482fc12 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/merge.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/merge.rs @@ -13,17 +13,22 @@ // limitations under the License. use std::collections::HashMap; +use std::time::Instant; +use databend_common_catalog::sbbf::Sbbf; use databend_common_exception::Result; use databend_common_expression::Column; +use super::packet::BloomPayload; use super::packet::JoinRuntimeFilterPacket; use super::packet::RuntimeFilterPacket; +use super::packet::SerializableBloomFilter; use super::packet::SerializableDomain; pub fn merge_join_runtime_filter_packets( packets: Vec, ) -> Result { + let instant = Instant::now(); log::info!( "RUNTIME-FILTER: merge_join_runtime_filter_packets input: {:?}", packets @@ -53,8 +58,9 @@ pub fn merge_join_runtime_filter_packets( } log::info!( - "RUNTIME-FILTER: merge_join_runtime_filter_packets output: {:?}", - result + "RUNTIME-FILTER: merge_join_runtime_filter_packets output: {:?}, elapsed: {:?}", + result, + instant.elapsed() ); Ok(JoinRuntimeFilterPacket { packets: Some(result), @@ -121,23 +127,93 @@ fn merge_min_max( Some(SerializableDomain { min, max }) } -fn merge_bloom(packets: &[HashMap], rf_id: usize) -> Option> { +fn merge_bloom( + packets: &[HashMap], + rf_id: usize, +) -> Option { if packets .iter() .any(|packet| packet.get(&rf_id).unwrap().bloom.is_none()) { return None; } - let mut bloom = packets[0] - .get(&rf_id) - .unwrap() - .bloom - .as_ref() - .unwrap() - .clone(); - for packet in packets.iter().skip(1) { - let other = packet.get(&rf_id).unwrap().bloom.as_ref().unwrap(); - bloom.extend_from_slice(other); + + let first = packets[0].get(&rf_id).unwrap().bloom.as_ref().unwrap(); + match first { + BloomPayload::Hashes(_) => { + // Local merge path: concatenate hashes + let mut merged = match first { + BloomPayload::Hashes(hashes) => hashes.clone(), + _ => unreachable!(), + }; + + for packet in packets.iter().skip(1) { + let other = packet.get(&rf_id).unwrap().bloom.as_ref().unwrap(); + match other { + BloomPayload::Hashes(hashes) => merged.extend_from_slice(hashes), + BloomPayload::Filter(_) => { + // Mixed variants are not expected today. Fallback to disabling bloom. + log::warn!( + "RUNTIME-FILTER: mixed bloom payload variants detected for id {}, disabling bloom merge", + rf_id + ); + return None; + } + } + } + Some(BloomPayload::Hashes(merged)) + } + BloomPayload::Filter(_) => { + // Global merge path: union serialized bloom filters + let mut base_bytes = match first { + BloomPayload::Filter(f) => f.data.clone(), + _ => unreachable!(), + }; + + let mut base = match Sbbf::from_bytes(&base_bytes) { + Ok(bf) => bf, + Err(e) => { + log::warn!( + "RUNTIME-FILTER: failed to deserialize bloom filter for id {}: {}", + rf_id, + e + ); + return None; + } + }; + + for packet in packets.iter().skip(1) { + let other = packet.get(&rf_id).unwrap().bloom.as_ref().unwrap(); + let bytes = match other { + BloomPayload::Filter(f) => &f.data, + BloomPayload::Hashes(_) => { + log::warn!( + "RUNTIME-FILTER: mixed bloom payload variants detected for id {}, disabling bloom merge", + rf_id + ); + return None; + } + }; + + let other_bf = match Sbbf::from_bytes(bytes) { + Ok(bf) => bf, + Err(e) => { + log::warn!( + "RUNTIME-FILTER: failed to deserialize bloom filter for id {}: {}", + rf_id, + e + ); + return None; + } + }; + + base.union(&other_bf); + } + + base_bytes = base.to_bytes(); + Some(BloomPayload::Filter(SerializableBloomFilter { + data: base_bytes, + })) + } } - Some(bloom) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs index 618512a3f5f79..603531ceff640 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs @@ -20,10 +20,11 @@ mod local_builder; mod merge; mod packet; -pub use builder::build_runtime_filter_packet; pub use convert::build_runtime_filter_infos; +pub use convert::convert_packet_bloom_hashes_to_filter; pub use global::get_global_runtime_filter_packet; pub use interface::build_and_push_down_runtime_filter; pub use local_builder::RuntimeFilterLocalBuilder; pub use merge::merge_join_runtime_filter_packets; pub use packet::JoinRuntimeFilterPacket; +pub use packet::SerializableBloomFilter; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs index 9e30ac18b5bb1..b968b4678bd50 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs @@ -16,13 +16,35 @@ use std::collections::HashMap; use std::fmt; use std::fmt::Debug; +use databend_common_column::buffer::Buffer; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::Column; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; use databend_common_expression::Scalar; +use databend_common_expression::types::ArrayColumn; +use databend_common_expression::types::NumberColumn; +use databend_common_expression::types::NumberColumnBuilder; +use databend_common_expression::types::array::ArrayColumnBuilder; use crate::pipelines::processors::transforms::RuntimeFilterDesc; +/// Bloom filter payload used in runtime filter packet. +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub enum BloomPayload { + Hashes(Vec), + Filter(SerializableBloomFilter), +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct SerializableBloomFilter { + pub data: Vec, +} + /// Represents a runtime filter that can be transmitted and merged. /// /// # Fields @@ -30,13 +52,13 @@ use crate::pipelines::processors::transforms::RuntimeFilterDesc; /// * `id` - Unique identifier for each runtime filter, corresponds one-to-one with `(build key, probe key)` pair /// * `inlist` - Deduplicated list of build key column /// * `min_max` - The min and max values of the build column -/// * `bloom` - The deduplicated hashes of the build column +/// * `bloom` - Bloom filter payload for the build column #[derive(serde::Serialize, serde::Deserialize, Clone, Default, PartialEq)] pub struct RuntimeFilterPacket { pub id: usize, pub inlist: Option, pub min_max: Option, - pub bloom: Option>, + pub bloom: Option, } impl Debug for RuntimeFilterPacket { @@ -58,11 +80,9 @@ impl Debug for RuntimeFilterPacket { /// /// * `packets` - A map of runtime filter packets, keyed by their unique identifier `RuntimeFilterPacket::id`. When `packets` is `None`, it means that `build_num_rows` is zero. /// * `build_rows` - Total number of rows used when building the runtime filters. -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] +#[derive(Clone, Debug, Default, PartialEq)] pub struct JoinRuntimeFilterPacket { - #[serde(default)] pub packets: Option>, - #[serde(default)] pub build_rows: usize, } @@ -84,15 +104,170 @@ impl JoinRuntimeFilterPacket { } } +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] +struct FlightRuntimeFilterPacket { + pub id: usize, + pub bloom: Option, + pub inlist: Option, + pub min_max: Option, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] +struct FlightJoinRuntimeFilterPacket { + #[serde(default)] + pub build_rows: usize, + #[serde(default)] + pub packets: Option>, + + pub schema: DataSchemaRef, +} + +impl TryInto for JoinRuntimeFilterPacket { + type Error = ErrorCode; + + fn try_into(mut self) -> Result { + let mut entities = vec![]; + let mut join_flight_packets = None; + + if let Some(packets) = self.packets.take() { + let mut flight_packets = HashMap::with_capacity(packets.len()); + + for (id, packet) in packets { + let mut inlist_pos = None; + if let Some(in_list) = packet.inlist { + let len = in_list.len() as u64; + inlist_pos = Some(entities.len()); + entities.push(Column::Array(Box::new(ArrayColumn::new( + in_list, + Buffer::from(vec![0, len]), + )))); + } + + let mut bloom_pos = None; + if let Some(bloom_filter) = packet.bloom { + match bloom_filter { + BloomPayload::Hashes(v) => { + let len = v.len() as u64; + bloom_pos = Some(entities.len()); + + let builder = ArrayColumnBuilder { + builder: ColumnBuilder::Number(NumberColumnBuilder::UInt64(v)), + offsets: vec![0, len], + }; + entities.push(Column::Array(Box::new(builder.build()))); + } + BloomPayload::Filter(v) => { + let len = v.data.len() as u64; + bloom_pos = Some(entities.len()); + + let builder = ArrayColumnBuilder { + builder: ColumnBuilder::Number(NumberColumnBuilder::UInt8(v.data)), + offsets: vec![0, len], + }; + entities.push(Column::Array(Box::new(builder.build()))); + } + } + } + + flight_packets.insert(id, FlightRuntimeFilterPacket { + id, + bloom: bloom_pos, + inlist: inlist_pos, + min_max: packet.min_max, + }); + } + + join_flight_packets = Some(flight_packets); + } + + let data_block = match entities.is_empty() { + true => DataBlock::empty(), + false => DataBlock::new_from_columns(entities), + }; + + let schema = DataSchemaRef::new(data_block.infer_schema()); + + data_block.add_meta(Some(Box::new(FlightJoinRuntimeFilterPacket { + build_rows: self.build_rows, + packets: join_flight_packets, + schema, + }))) + } +} + +impl TryFrom for JoinRuntimeFilterPacket { + type Error = ErrorCode; + + fn try_from(mut block: DataBlock) -> Result { + if let Some(meta) = block.take_meta() { + let flight_join_rf = FlightJoinRuntimeFilterPacket::downcast_from(meta) + .ok_or_else(|| ErrorCode::Internal("It's a bug"))?; + + let Some(packet) = flight_join_rf.packets else { + return Ok(JoinRuntimeFilterPacket { + packets: None, + build_rows: flight_join_rf.build_rows, + }); + }; + + let mut flight_packets = HashMap::with_capacity(packet.len()); + for (id, flight_packet) in packet { + let mut inlist = None; + if let Some(column_idx) = flight_packet.inlist { + let column = block.get_by_offset(column_idx).clone(); + let column = column.into_column().unwrap(); + let array_column = column.into_array().expect("it's a bug"); + inlist = Some(array_column.index(0).expect("It's a bug")); + } + + let mut bloom = None; + if let Some(column_idx) = flight_packet.bloom { + let column = block.get_by_offset(column_idx).clone(); + let column = column.into_column().unwrap(); + let array_column = column.into_array().expect("it's a bug"); + let bloom_value_column = array_column.index(0).expect("It's a bug"); + bloom = Some(match bloom_value_column { + Column::Number(NumberColumn::UInt8(v)) => { + BloomPayload::Filter(SerializableBloomFilter { data: v.to_vec() }) + } + Column::Number(NumberColumn::UInt64(v)) => BloomPayload::Hashes(v.to_vec()), + _ => unreachable!("Unexpected runtime bloom filter column type"), + }) + } + + flight_packets.insert(id, RuntimeFilterPacket { + bloom, + inlist, + id: flight_packet.id, + min_max: flight_packet.min_max, + }); + } + + return Ok(JoinRuntimeFilterPacket { + packets: Some(flight_packets), + build_rows: flight_join_rf.build_rows, + }); + } + + Err(ErrorCode::Internal( + "Unexpected runtime filter packet meta type. It's a bug", + )) + } +} + #[typetag::serde(name = "join_runtime_filter_packet")] -impl BlockMetaInfo for JoinRuntimeFilterPacket { +impl BlockMetaInfo for FlightJoinRuntimeFilterPacket { fn equals(&self, info: &Box) -> bool { - JoinRuntimeFilterPacket::downcast_ref_from(info).is_some_and(|other| self == other) + FlightJoinRuntimeFilterPacket::downcast_ref_from(info).is_some_and(|other| self == other) } fn clone_self(&self) -> Box { Box::new(self.clone()) } + + fn override_block_schema(&self) -> Option { + Some(self.schema.clone()) + } } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs index 37b522b543431..fd29bf30d20c6 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs @@ -24,6 +24,7 @@ use crate::physical_plans::HashJoin; use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; use crate::pipelines::processors::transforms::RuntimeFilterDesc; use crate::pipelines::processors::transforms::build_runtime_filter_infos; +use crate::pipelines::processors::transforms::convert_packet_bloom_hashes_to_filter; use crate::pipelines::processors::transforms::get_global_runtime_filter_packet; use crate::sessions::QueryContext; @@ -81,11 +82,19 @@ impl RuntimeFiltersDesc { } pub async fn globalization(&self, mut packet: JoinRuntimeFilterPacket) -> Result<()> { + let runtime_filter_descs: std::collections::HashMap = + self.filters_desc.iter().map(|r| (r.id, r)).collect(); + if let Some(broadcast_id) = self.broadcast_id { + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + // For distributed hash shuffle joins, convert bloom hashes into compact + // bloom filters before performing global merge so that only filters are + // transmitted between nodes. + convert_packet_bloom_hashes_to_filter(&mut packet, &runtime_filter_descs, max_threads)?; + packet = get_global_runtime_filter_packet(broadcast_id, packet, &self.ctx).await?; } - let runtime_filter_descs = self.filters_desc.iter().map(|r| (r.id, r)).collect(); let runtime_filter_infos = build_runtime_filter_infos( packet, runtime_filter_descs, diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs index e22e91901bc33..66780ee813ab5 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; +use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::SortColumnDescription; use databend_common_pipeline::core::InputPort; @@ -453,11 +453,11 @@ struct ContextChannel { } impl BroadcastChannel for ContextChannel { - fn sender(&self) -> async_channel::Sender { + fn sender(&self) -> async_channel::Sender { self.ctx.broadcast_source_sender(self.id) } - fn receiver(&self) -> async_channel::Receiver { + fn receiver(&self) -> async_channel::Receiver { self.ctx.broadcast_sink_receiver(self.id) } } diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs index a4882273e3039..1e7df7399bf79 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs @@ -76,8 +76,17 @@ impl TransformExchangeDeserializer { return Ok(DataBlock::new_with_meta(vec![], 0, meta)); } - let data_block = - deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())?; + let mut schema = self.schema.clone(); + let mut arrow_schema = self.arrow_schema.clone(); + + if let Some(metadata) = &meta { + if let Some(dynamic_schema) = metadata.override_block_schema() { + arrow_schema = Arc::new(ArrowSchema::from(dynamic_schema.as_ref())); + schema = dynamic_schema; + } + } + + let data_block = deserialize_block(dict, fragment_data, &schema, arrow_schema)?; if data_block.num_columns() == 0 { return Ok(DataBlock::new_with_meta(vec![], row_count as usize, meta)); } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index aab5b283837a5..3d57b985a534b 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -76,7 +76,6 @@ use databend_common_catalog::table_context::StageAttachment; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::BlockThresholds; use databend_common_expression::DataBlock; use databend_common_expression::Expr; @@ -322,14 +321,14 @@ impl QueryContext { self.shared.attach_table(catalog, database, name, table) } - pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { self.shared.broadcast_source_receiver(broadcast_id) } /// Get a sender to broadcast data /// /// Note: The channel must be closed by calling close() after data transmission is completed - pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { self.shared.broadcast_source_sender(broadcast_id) } @@ -337,11 +336,11 @@ impl QueryContext { /// /// Note: receive() can be called repeatedly until an Error is returned, indicating /// that the upstream channel has been closed - pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { self.shared.broadcast_sink_receiver(broadcast_id) } - pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { self.shared.broadcast_sink_sender(broadcast_id) } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index c0b372b1c8e44..5a3e614cbcc56 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -48,7 +48,6 @@ use databend_common_catalog::table_context::StageAttachment; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; use databend_common_meta_app::principal::OnErrorMode; use databend_common_meta_app::principal::RoleInfo; @@ -192,10 +191,10 @@ pub struct QueryContextShared { #[derive(Default)] pub struct BroadcastChannel { - pub source_sender: Option>, - pub source_receiver: Option>, - pub sink_sender: Option>, - pub sink_receiver: Option>, + pub source_sender: Option>, + pub source_receiver: Option>, + pub sink_sender: Option>, + pub sink_receiver: Option>, } impl QueryContextShared { @@ -273,7 +272,7 @@ impl QueryContextShared { })) } - pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.source_receiver.take() { @@ -285,7 +284,7 @@ impl QueryContextShared { } } } - pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.source_sender.take() { @@ -298,7 +297,7 @@ impl QueryContextShared { } } - pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.sink_receiver.take() { @@ -310,7 +309,7 @@ impl QueryContextShared { } } } - pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.sink_sender.take() { diff --git a/src/query/sql/src/planner/optimizer/ir/stats/column_stat.rs b/src/query/sql/src/planner/optimizer/ir/stats/column_stat.rs index 8fa854bd56864..5f3fb12db474c 100644 --- a/src/query/sql/src/planner/optimizer/ir/stats/column_stat.rs +++ b/src/query/sql/src/planner/optimizer/ir/stats/column_stat.rs @@ -36,6 +36,9 @@ pub struct ColumnStat { /// Count of null values pub null_count: u64, + pub num_rows: u64, + pub origin_ndv: f64, + /// Histogram of column pub histogram: Option, } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 2b9b8da721b62..c46fffecedf80 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -269,6 +269,10 @@ pub async fn optimize_query(opt_ctx: Arc, s_expr: SExpr) -> Re .add(SingleToInnerOptimizer::new()) // Deduplicate join conditions. .add(DeduplicateJoinConditionOptimizer::new()) + .add(RecursiveRuleOptimizer::new( + opt_ctx.clone(), + [RuleID::PushDownAntiJoin].as_slice(), + )) // Apply join commutativity to further optimize join ordering .add_if( opt_ctx.get_enable_join_reorder(), diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs index d1b700f98fc18..bd1b045fa573b 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs @@ -60,6 +60,7 @@ use crate::optimizer::optimizers::rule::RulePushDownRankLimitAggregate; use crate::optimizer::optimizers::rule::RulePushDownSortEvalScalar; use crate::optimizer::optimizers::rule::RulePushDownSortFilterScan; use crate::optimizer::optimizers::rule::RulePushDownSortScan; +use crate::optimizer::optimizers::rule::RulePushdownAntiJoin; use crate::optimizer::optimizers::rule::RuleSemiToInnerJoin; use crate::optimizer::optimizers::rule::RuleSplitAggregate; use crate::optimizer::optimizers::rule::RuleTryApplyAggIndex; @@ -130,6 +131,7 @@ impl RuleFactory { RuleID::MergeFilterIntoMutation => { Ok(Box::new(RuleMergeFilterIntoMutation::new(metadata))) } + RuleID::PushDownAntiJoin => Ok(Box::new(RulePushdownAntiJoin::new())), } } } diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs index 986d2e157e23e..2ca52d47c5b25 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs @@ -242,6 +242,7 @@ fn try_push_down_filter_join(s_expr: &SExpr, metadata: MetadataRef) -> Result<(b right_push_down = vec![]; } } + let join_prop = JoinProperty::new(&left_prop.output_columns, &right_prop.output_columns); let mut infer_filter = InferFilterOptimizer::new(Some(join_prop)); push_down_predicates = infer_filter.optimize(push_down_predicates)?; diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs index fba55fa30df63..2adad858b586a 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs @@ -16,6 +16,7 @@ mod push_down_filter_join; mod rule_commute_join; mod rule_commute_join_base_table; mod rule_left_exchange_join; +mod rule_push_down_anti_join; mod rule_semi_to_inner_join; mod util; @@ -23,5 +24,6 @@ pub use push_down_filter_join::*; pub use rule_commute_join::RuleCommuteJoin; pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable; pub use rule_left_exchange_join::RuleLeftExchangeJoin; +pub use rule_push_down_anti_join::RulePushdownAntiJoin; pub use rule_semi_to_inner_join::RuleSemiToInnerJoin; pub use util::get_join_predicates; diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_push_down_anti_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_push_down_anti_join.rs new file mode 100644 index 0000000000000..72fd3aed40908 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_push_down_anti_join.rs @@ -0,0 +1,177 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; + +use crate::ColumnSet; +use crate::binder::JoinPredicate; +use crate::optimizer::ir::Matcher; +use crate::optimizer::ir::RelExpr; +use crate::optimizer::ir::SExpr; +use crate::optimizer::optimizers::rule::Rule; +use crate::optimizer::optimizers::rule::RuleID; +use crate::optimizer::optimizers::rule::TransformResult; +use crate::plans::Join; +use crate::plans::JoinType; +use crate::plans::RelOp; +use crate::plans::RelOperator; + +/// Push `Left/Right Semi|Anti` join closer to the base table that participates +/// in the predicate so that fewer rows stay in the join tree. +pub struct RulePushdownAntiJoin { + id: RuleID, + matchers: Vec, +} + +impl RulePushdownAntiJoin { + pub fn new() -> Self { + Self { + id: RuleID::PushDownAntiJoin, + matchers: vec![Matcher::MatchOp { + op_type: RelOp::Join, + children: vec![Matcher::Leaf, Matcher::Leaf], + }], + } + } + + fn try_push_down(&self, left: &SExpr, right: &SExpr, join: Join) -> Result> { + let right_rel_expr = RelExpr::with_s_expr(right); + + if let Some(inner_join) = extract_inner_join(left)? { + let inner_join_rel_expr = RelExpr::with_s_expr(&inner_join); + let inner_join_left_prop = inner_join_rel_expr.derive_relational_prop_child(0)?; + let inner_join_right_prop = inner_join_rel_expr.derive_relational_prop_child(1)?; + + let equi_conditions = join + .equi_conditions + .iter() + .map(|condition| { + JoinPredicate::new( + &condition.left, + &inner_join_left_prop, + &inner_join_right_prop, + ) + }) + .collect::>(); + + if equi_conditions.iter().all(left_predicate) { + let right_prop = right_rel_expr.derive_relational_prop()?; + let mut union_output_columns = ColumnSet::new(); + union_output_columns.extend(right_prop.output_columns.clone()); + union_output_columns.extend(inner_join_left_prop.output_columns.clone()); + + if join + .non_equi_conditions + .iter() + .all(|x| x.used_columns().is_subset(&union_output_columns)) + { + let new_inner_join = inner_join.replace_children([ + Arc::new(SExpr::create_binary( + RelOperator::Join(join.clone()), + inner_join.child(0)?.clone(), + right.clone(), + )), + Arc::new(inner_join.child(1)?.clone()), + ]); + + return replace_inner_join(left, new_inner_join); + } + } else if equi_conditions.iter().all(right_predicate) { + let right_prop = right_rel_expr.derive_relational_prop()?; + let mut union_output_columns = ColumnSet::new(); + union_output_columns.extend(right_prop.output_columns.clone()); + union_output_columns.extend(inner_join_right_prop.output_columns.clone()); + + if join + .non_equi_conditions + .iter() + .all(|x| x.used_columns().is_subset(&union_output_columns)) + { + let new_inner_join = inner_join.replace_children([ + Arc::new(inner_join.child(0)?.clone()), + Arc::new(SExpr::create_binary( + RelOperator::Join(join.clone()), + inner_join.child(1)?.clone(), + right.clone(), + )), + ]); + + return replace_inner_join(left, new_inner_join); + } + } + } + + Ok(None) + } +} + +impl Rule for RulePushdownAntiJoin { + fn id(&self) -> RuleID { + self.id + } + + fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> { + let join: Join = s_expr.plan().clone().try_into()?; + + if matches!(join.join_type, JoinType::LeftAnti | JoinType::LeftSemi) { + if let Some(mut result) = + self.try_push_down(s_expr.child(0)?, s_expr.child(1)?, join)? + { + result.set_applied_rule(&self.id); + state.add_result(result); + } + } + + Ok(()) + } + + fn matchers(&self) -> &[Matcher] { + &self.matchers + } +} + +impl Default for RulePushdownAntiJoin { + fn default() -> Self { + Self::new() + } +} + +fn replace_inner_join(expr: &SExpr, new_join: SExpr) -> Result> { + match expr.plan() { + RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(new_join)), + RelOperator::Filter(_) => match replace_inner_join(expr.child(0)?, new_join)? { + None => Ok(None), + Some(new_child) => Ok(Some(expr.replace_children([Arc::new(new_child)]))), + }, + _ => Ok(None), + } +} + +fn extract_inner_join(expr: &SExpr) -> Result> { + match expr.plan() { + RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(expr.clone())), + RelOperator::Filter(_) => extract_inner_join(expr.child(0)?), + _ => Ok(None), + } +} + +fn left_predicate(tuple: &JoinPredicate) -> bool { + matches!(&tuple, JoinPredicate::Left(_)) +} + +fn right_predicate(tuple: &JoinPredicate) -> bool { + matches!(&tuple, JoinPredicate::Right(_)) +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs index 9eece3b4e9a92..c200dfd9651d4 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs @@ -122,6 +122,7 @@ pub enum RuleID { PushDownSortFilterScan, PushDownLimitFilterScan, SemiToInnerJoin, + PushDownAntiJoin, EliminateEvalScalar, EliminateFilter, EliminateSort, @@ -194,6 +195,7 @@ impl Display for RuleID { RuleID::EliminateUnion => write!(f, "EliminateUnion"), RuleID::MergeFilterIntoMutation => write!(f, "MergeFilterIntoMutation"), + RuleID::PushDownAntiJoin => write!(f, "PushDownAntiJoin"), } } } diff --git a/src/query/sql/src/planner/plans/constant_table_scan.rs b/src/query/sql/src/planner/plans/constant_table_scan.rs index e3b29ca1c530d..4d40e5ea924c6 100644 --- a/src/query/sql/src/planner/plans/constant_table_scan.rs +++ b/src/query/sql/src/planner/plans/constant_table_scan.rs @@ -225,6 +225,8 @@ impl Operator for ConstantTableScan { max, ndv: ndv as f64, null_count, + num_rows: self.num_rows as u64, + origin_ndv: ndv as f64, histogram, }; column_stats.insert(*index, column_stat); diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index 1d26198edb9b6..46c2995dfa3b1 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -517,10 +517,43 @@ impl Join { + f64::max(right_cardinality, inner_join_cardinality) - inner_join_cardinality } - JoinType::LeftSemi => f64::min(left_cardinality, inner_join_cardinality), - JoinType::RightSemi => f64::min(right_cardinality, inner_join_cardinality), - JoinType::LeftSingle | JoinType::RightMark | JoinType::LeftAnti => left_cardinality, - JoinType::RightSingle | JoinType::LeftMark | JoinType::RightAnti => right_cardinality, + JoinType::LeftSemi => { + let left_exprs = self + .equi_conditions + .iter() + .map(|x| &x.left) + .collect::>(); + + self.semi_cardinality(left_cardinality, &left_statistics, &left_exprs) + } + JoinType::RightSemi => { + let right_exprs = self + .equi_conditions + .iter() + .map(|x| &x.right) + .collect::>(); + + self.semi_cardinality(right_cardinality, &right_statistics, &right_exprs) + } + JoinType::LeftAnti => { + let left_exprs = self + .equi_conditions + .iter() + .map(|x| &x.left) + .collect::>(); + self.anti_cardinality(left_cardinality, &left_statistics, &left_exprs) + } + JoinType::RightAnti => { + let right_exprs = self + .equi_conditions + .iter() + .map(|x| &x.right) + .collect::>(); + + self.anti_cardinality(right_cardinality, &right_statistics, &right_exprs) + } + JoinType::LeftSingle | JoinType::RightMark => left_cardinality, + JoinType::RightSingle | JoinType::LeftMark => right_cardinality, }; // Derive column statistics let column_stats = if cardinality == 0.0 { @@ -568,6 +601,53 @@ impl Join { .iter() .any(|expr| expr.has_subquery()) } + + fn anti_cardinality( + &self, + cardinality: f64, + statistics: &Statistics, + exprs: &[&ScalarExpr], + ) -> f64 { + let mut anti_cardinality = cardinality; + for expr in exprs { + let mut used_columns = expr.used_columns(); + + let (Some(column), None) = (used_columns.pop_first(), used_columns.pop_first()) else { + continue; + }; + + if let Some(column_stat) = statistics.column_stats.get(&column) { + let semi_cardinality = cardinality * column_stat.ndv / column_stat.origin_ndv; + let column_cardinality = (cardinality - semi_cardinality).max(cardinality * 0.3); + anti_cardinality = 1_f64.max(column_cardinality.min(anti_cardinality)); + } + } + + anti_cardinality + } + + fn semi_cardinality( + &self, + cardinality: f64, + statistics: &Statistics, + exprs: &[&ScalarExpr], + ) -> f64 { + let mut semi_cardinality = cardinality; + for expr in exprs { + let mut used_columns = expr.used_columns(); + + let (Some(column), None) = (used_columns.pop_first(), used_columns.pop_first()) else { + continue; + }; + + if let Some(column_stat) = statistics.column_stats.get(&column) { + let column_cardinality = cardinality * column_stat.ndv / column_stat.origin_ndv; + semi_cardinality = 1_f64.max(column_cardinality.min(semi_cardinality)); + } + } + + semi_cardinality + } } impl Operator for Join { diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index 123f174e56d55..8d60bff5ec0a8 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -259,6 +259,7 @@ impl Operator for Scan { .unwrap_or(0); let mut column_stats: ColumnStatSet = Default::default(); + for (k, v) in &self.statistics.column_stats { // No need to cal histogram for unused columns if !used_columns.contains(k) { @@ -311,11 +312,14 @@ impl Operator for Scan { None } }; + let column_stat = ColumnStat { min, max, ndv: ndv as f64, null_count: col_stat.null_count, + origin_ndv: ndv as f64, + num_rows, histogram, }; column_stats.insert(*k as IndexType, column_stat); diff --git a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test index 29a426f6c1e31..52d6a6a898130 100644 --- a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test +++ b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test @@ -177,7 +177,7 @@ Exchange ├── probe keys: [table1.value (#0)] ├── keys is null equal: [false] ├── filters: [] - ├── estimated rows: 250.00 + ├── estimated rows: 225.00 ├── Exchange(Build) │ ├── output columns: [table2.value (#1)] │ ├── exchange type: Broadcast @@ -227,7 +227,7 @@ Exchange ├── filters: [] ├── build join filters(distributed): │ └── filter id:0, build key:table2.value (#1), probe targets:[table1.value (#0)@scan0], filter type:bloom,inlist,min_max - ├── estimated rows: 250.00 + ├── estimated rows: 234.13 ├── Exchange(Build) │ ├── output columns: [table2.value (#1)] │ ├── exchange type: Hash(table2.value (#1)) diff --git a/tests/sqllogictests/suites/mode/standalone/explain/delete.test b/tests/sqllogictests/suites/mode/standalone/explain/delete.test index a327075f37020..c2acb6f064ea2 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/delete.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/delete.test @@ -97,7 +97,7 @@ CommitSink ├── probe keys: [t1.a (#0)] ├── keys is null equal: [false] ├── filters: [] - ├── estimated rows: 2.00 + ├── estimated rows: 1.50 ├── TableScan(Build) │ ├── table: default.default.t2 │ ├── scan id: 1 diff --git a/tests/sqllogictests/suites/mode/standalone/explain/join_reorder/chain.test b/tests/sqllogictests/suites/mode/standalone/explain/join_reorder/chain.test index a15373c28d91b..91937bdf83f91 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/join_reorder/chain.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/join_reorder/chain.test @@ -635,7 +635,7 @@ HashJoin ├── probe keys: [t1.a (#1)] ├── keys is null equal: [false] ├── filters: [] -├── estimated rows: 10.00 +├── estimated rows: 9.00 ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── scan id: 0 diff --git a/tests/sqllogictests/suites/mode/standalone/explain/subquery.test b/tests/sqllogictests/suites/mode/standalone/explain/subquery.test index 43799dc3962bb..7128fa4391a4c 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/subquery.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/subquery.test @@ -339,7 +339,7 @@ HashJoin ├── probe keys: [t.number (#0)] ├── keys is null equal: [false] ├── filters: [] -├── estimated rows: 0.25 +├── estimated rows: 0.50 ├── Filter(Build) │ ├── output columns: [numbers.number (#1)] │ ├── filters: [numbers.number (#1) < 10, numbers.number (#1) = 0] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/update.test b/tests/sqllogictests/suites/mode/standalone/explain/update.test index 5bd0cb54e4bdb..43eae21639766 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/update.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/update.test @@ -103,7 +103,7 @@ CommitSink ├── probe keys: [t1.a (#0)] ├── keys is null equal: [false] ├── filters: [] - ├── estimated rows: 2.00 + ├── estimated rows: 1.50 ├── TableScan(Build) │ ├── table: default.default.t2 │ ├── scan id: 1 @@ -135,7 +135,7 @@ query T explain analyze partial update t1 set a = a + 1 where a in (select a from t2) and b > 2; ---- HashJoin: LEFT SEMI -├── estimated rows: 2.00 +├── estimated rows: 1.50 ├── output rows: 2 ├── TableScan │ ├── table: default.default.t2 diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/join_reorder/chain.test b/tests/sqllogictests/suites/mode/standalone/explain_native/join_reorder/chain.test index efa878f4e2d65..003ad890a1d19 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/join_reorder/chain.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/join_reorder/chain.test @@ -573,7 +573,7 @@ HashJoin ├── probe keys: [t1.a (#1)] ├── keys is null equal: [false] ├── filters: [] -├── estimated rows: 10.00 +├── estimated rows: 9.00 ├── TableScan(Build) │ ├── table: default.join_reorder.t │ ├── scan id: 0 diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_semi_anti.test b/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_semi_anti.test index af7e524cbbdb4..76602a501fdd8 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_semi_anti.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_join/push_down_filter_join_semi_anti.test @@ -28,7 +28,7 @@ HashJoin ├── probe keys: [t1.a (#0)] ├── keys is null equal: [false] ├── filters: [] -├── estimated rows: 0.00 +├── estimated rows: 1.00 ├── TableScan(Build) │ ├── table: default.default.t2 │ ├── scan id: 1 @@ -63,7 +63,7 @@ HashJoin ├── probe keys: [t1.a (#0)] ├── keys is null equal: [false] ├── filters: [] -├── estimated rows: 0.00 +├── estimated rows: 1.00 ├── TableScan(Build) │ ├── table: default.default.t2 │ ├── scan id: 1 diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test b/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test index 19eea51fc861e..4cc5e0b14a2d0 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test @@ -314,7 +314,7 @@ HashJoin ├── probe keys: [t.number (#0)] ├── keys is null equal: [false] ├── filters: [] -├── estimated rows: 0.25 +├── estimated rows: 0.50 ├── Filter(Build) │ ├── output columns: [numbers.number (#1)] │ ├── filters: [numbers.number (#1) < 10, numbers.number (#1) = 0] diff --git a/tests/sqllogictests/suites/tpch/join_order.test b/tests/sqllogictests/suites/tpch/join_order.test index 9113f51ca95db..e1993c198c261 100644 --- a/tests/sqllogictests/suites/tpch/join_order.test +++ b/tests/sqllogictests/suites/tpch/join_order.test @@ -976,23 +976,23 @@ where order by s_name; ---- -HashJoin: RIGHT SEMI +HashJoin: INNER ├── Build -│ └── HashJoin: INNER -│ ├── Build -│ │ └── Scan: default.tpch_test.nation (#1) (read rows: 25) -│ └── Probe -│ └── Scan: default.tpch_test.supplier (#0) (read rows: 10000) +│ └── Scan: default.tpch_test.nation (#1) (read rows: 25) └── Probe - └── HashJoin: INNER + └── HashJoin: RIGHT SEMI ├── Build - │ └── HashJoin: LEFT SEMI - │ ├── Build - │ │ └── Scan: default.tpch_test.part (#3) (read rows: 200000) - │ └── Probe - │ └── Scan: default.tpch_test.partsupp (#2) (read rows: 800000) + │ └── Scan: default.tpch_test.supplier (#0) (read rows: 10000) └── Probe - └── Scan: default.tpch_test.lineitem (#4) (read rows: 6001215) + └── HashJoin: INNER + ├── Build + │ └── HashJoin: LEFT SEMI + │ ├── Build + │ │ └── Scan: default.tpch_test.part (#3) (read rows: 200000) + │ └── Probe + │ └── Scan: default.tpch_test.partsupp (#2) (read rows: 800000) + └── Probe + └── Scan: default.tpch_test.lineitem (#4) (read rows: 6001215) # Q21 query I @@ -1036,27 +1036,27 @@ order by numwait desc, s_name; ---- -HashJoin: RIGHT ANTI +HashJoin: INNER ├── Build -│ └── HashJoin: RIGHT SEMI +│ └── HashJoin: INNER │ ├── Build │ │ └── HashJoin: INNER │ │ ├── Build -│ │ │ └── HashJoin: INNER -│ │ │ ├── Build -│ │ │ │ └── HashJoin: INNER -│ │ │ │ ├── Build -│ │ │ │ │ └── Scan: default.tpch_test.nation (#3) (read rows: 25) -│ │ │ │ └── Probe -│ │ │ │ └── Scan: default.tpch_test.supplier (#0) (read rows: 10000) -│ │ │ └── Probe -│ │ │ └── Scan: default.tpch_test.lineitem (#1) (read rows: 6001215) +│ │ │ └── Scan: default.tpch_test.nation (#3) (read rows: 25) │ │ └── Probe -│ │ └── Scan: default.tpch_test.orders (#2) (read rows: 1500000) +│ │ └── Scan: default.tpch_test.supplier (#0) (read rows: 10000) │ └── Probe -│ └── Scan: default.tpch_test.lineitem (#4) (read rows: 6001215) +│ └── HashJoin: RIGHT ANTI +│ ├── Build +│ │ └── HashJoin: RIGHT SEMI +│ │ ├── Build +│ │ │ └── Scan: default.tpch_test.lineitem (#1) (read rows: 6001215) +│ │ └── Probe +│ │ └── Scan: default.tpch_test.lineitem (#4) (read rows: 6001215) +│ └── Probe +│ └── Scan: default.tpch_test.lineitem (#5) (read rows: 6001215) └── Probe - └── Scan: default.tpch_test.lineitem (#5) (read rows: 6001215) + └── Scan: default.tpch_test.orders (#2) (read rows: 1500000) # Q22 query I @@ -1098,12 +1098,12 @@ group by order by cntrycode; ---- -HashJoin: RIGHT ANTI +HashJoin: INNER ├── Build -│ └── HashJoin: INNER -│ ├── Build -│ │ └── Scan: default.tpch_test.customer (#1) (read rows: 150000) -│ └── Probe -│ └── Scan: default.tpch_test.customer (#0) (read rows: 150000) +│ └── Scan: default.tpch_test.customer (#1) (read rows: 150000) └── Probe - └── Scan: default.tpch_test.orders (#2) (read rows: 1500000) + └── HashJoin: RIGHT ANTI + ├── Build + │ └── Scan: default.tpch_test.customer (#0) (read rows: 150000) + └── Probe + └── Scan: default.tpch_test.orders (#2) (read rows: 1500000)