Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4efd685
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 8, 2025
ff5cb93
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 9, 2025
d60ce52
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 9, 2025
236a74a
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 9, 2025
c2ebd41
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 10, 2025
1eda119
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 11, 2025
8e91bcb
Merge branch 'main' into refactor/const
zhang2014 Dec 11, 2025
6511eb4
feat: improve shuffle bloom filter
SkyFan2002 Dec 15, 2025
b4cd827
Merge branch 'main' into 12-15
SkyFan2002 Dec 15, 2025
0542033
fix
SkyFan2002 Dec 15, 2025
a79b557
update
SkyFan2002 Dec 15, 2025
9377d17
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Dec 15, 2025
03ac255
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 15, 2025
f94ee0c
Merge branch 'refactor/const' of github.com:zhang2014/datafuse into r…
zhang2014 Dec 15, 2025
54a410e
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 15, 2025
88c52c4
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 16, 2025
3a5ce47
chore(query): refactor the cardinality of anti-join
zhang2014 Dec 16, 2025
df7f12d
Merge branch 'merge-pr-19104' into merge-pr-19076-19104
zhang2014 Dec 16, 2025
51ffa3c
Merge branch 'merge-pr-19076' into merge-pr-19076-19104
zhang2014 Dec 16, 2025
39c8d84
refactor(query): Serialize data in runtime filters using Arrow
zhang2014 Dec 17, 2025
367408c
refactor(query): Serialize data in runtime filters using Arrow
zhang2014 Dec 17, 2025
0130e22
refactor(query): Serialize data in runtime filters using Arrow
zhang2014 Dec 17, 2025
6a55a5c
refactor(query): Serialize data in runtime filters using Arrow
zhang2014 Dec 18, 2025
37e704d
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Dec 18, 2025
b1532b7
refactor(query): Serialize data in runtime filters using Arrow
zhang2014 Dec 19, 2025
61a540a
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Dec 22, 2025
1c410ac
refactor(query): Serialize data in runtime filters using Arrow
zhang2014 Dec 22, 2025
da7396a
refactor(query): Serialize data in runtime filters using Arrow
zhang2014 Dec 22, 2025
9c67cf9
Merge branch 'main' into merge-pr-19076-19104
zhang2014 Dec 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/common/base/src/mem_allocator/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ pub mod not_linux {

#[inline(always)]
unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: Layout) {
StdAllocator.deallocate(ptr, layout)
unsafe { StdAllocator.deallocate(ptr, layout) }
}

unsafe fn grow(
Expand All @@ -258,7 +258,7 @@ pub mod not_linux {
old_layout: Layout,
new_layout: Layout,
) -> Result<NonNull<[u8]>, AllocError> {
StdAllocator.grow(ptr, old_layout, new_layout)
unsafe { StdAllocator.grow(ptr, old_layout, new_layout) }
}

unsafe fn grow_zeroed(
Expand All @@ -267,7 +267,7 @@ pub mod not_linux {
old_layout: Layout,
new_layout: Layout,
) -> Result<NonNull<[u8]>, AllocError> {
StdAllocator.grow_zeroed(ptr, old_layout, new_layout)
unsafe { StdAllocator.grow_zeroed(ptr, old_layout, new_layout) }
}

unsafe fn shrink(
Expand All @@ -276,7 +276,7 @@ pub mod not_linux {
old_layout: Layout,
new_layout: Layout,
) -> Result<NonNull<[u8]>, AllocError> {
StdAllocator.shrink(ptr, old_layout, new_layout)
unsafe { StdAllocator.shrink(ptr, old_layout, new_layout) }
}
}
}
1 change: 1 addition & 0 deletions src/common/exception/src/exception_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct ResolvedStackFrame {
pub column: Option<u32>,
}

#[cfg(target_os = "linux")]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct PhysicalAddr {
pub physical_addr: usize,
Expand Down
50 changes: 48 additions & 2 deletions src/query/catalog/src/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Sbbf {

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(num_bytes % size_of::<Block>(), 0);
let num_blocks = num_bytes / size_of::<Block>();
Expand Down Expand Up @@ -307,6 +307,52 @@ impl Sbbf {
pub fn estimated_memory_size(&self) -> usize {
self.0.capacity() * std::mem::size_of::<Block>()
}

/// Serialize the bloom filter into a little-endian byte array.
/// The layout is a contiguous sequence of blocks, each block consisting
/// of 8 u32 values in little-endian order.
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(self.0.len() * size_of::<Block>());
for block in &self.0 {
for value in block.0 {
bytes.extend_from_slice(&value.to_le_bytes());
}
}
bytes
}

/// Deserialize a bloom filter from bytes produced by `to_bytes`.
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
if !bytes.len().is_multiple_of(size_of::<Block>()) {
return Err(format!(
"Invalid bloom filter bytes length {}, expected multiple of {}",
bytes.len(),
size_of::<Block>()
));
}

let num_blocks = bytes.len() / size_of::<Block>();
if num_blocks == 0 {
return Ok(Sbbf(Vec::new()));
}

let mut blocks = Vec::with_capacity(num_blocks);
let mut offset = 0;
for _ in 0..num_blocks {
let mut arr = [0u32; 8];
for value in &mut arr {
let end = offset + size_of::<u32>();
let chunk = bytes
.get(offset..end)
.ok_or_else(|| "Invalid bloom filter bytes".to_string())?;
*value = u32::from_le_bytes(chunk.try_into().unwrap());
offset = end;
}
blocks.push(Block(arr));
}

Ok(Sbbf(blocks))
}
}

impl SbbfAtomic {
Expand All @@ -320,7 +366,7 @@ impl SbbfAtomic {
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}

pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(size_of::<BlockAtomic>(), size_of::<Block>());
assert_eq!(num_bytes % size_of::<BlockAtomic>(), 0);
Expand Down
6 changes: 6 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ pub trait BlockMetaInfo: Debug + Send + Sync + Any + 'static {
"The reason for not implementing clone_self is usually because the higher-level logic doesn't allow/need the associated block to be cloned."
)
}

/// Overrides the global schema for a specific block, attaching a custom schema that will be used
/// exclusively for this block instead of the global default.
fn override_block_schema(&self) -> Option<DataSchemaRef> {
None
}
}

pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use async_channel::Sender;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::DataBlock;
use databend_common_pipeline::core::Event;
use databend_common_pipeline::core::InputPort;
Expand Down Expand Up @@ -70,8 +69,8 @@ pub struct SortSampleState<C: BroadcastChannel> {
}

pub trait BroadcastChannel: Clone + Send + 'static {
fn sender(&self) -> Sender<BlockMetaInfoPtr>;
fn receiver(&self) -> Receiver<BlockMetaInfoPtr>;
fn sender(&self) -> Sender<DataBlock>;
fn receiver(&self) -> Receiver<DataBlock>;
}

impl<C: BroadcastChannel> SortSampleState<C> {
Expand All @@ -91,16 +90,16 @@ impl<C: BroadcastChannel> SortSampleState<C> {
let is_empty = meta.is_none();
let meta = meta.map(|meta| meta.boxed()).unwrap_or(().boxed());
sender
.send(meta)
.send(DataBlock::empty_with_meta(meta))
.await
.map_err(|_| ErrorCode::TokioError("send sort bounds failed"))?;
sender.close();
log::debug!(is_empty; "sample has sent");

let receiver = self.channel.receiver();
let mut all = Vec::new();
while let Ok(r) = receiver.recv().await {
match SortExchangeMeta::downcast_from_err(r) {
while let Ok(mut r) = receiver.recv().await {
match SortExchangeMeta::downcast_from_err(r.take_meta().unwrap()) {
Ok(meta) => all.push(meta),
Err(r) => {
debug_assert!(().boxed().equals(&r))
Expand Down
40 changes: 36 additions & 4 deletions src/query/service/src/physical_plans/runtime_filter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::Expr;
use databend_common_expression::RemoteExpr;
use databend_common_expression::types::DataType;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_sql::ColumnEntry;
use databend_common_sql::IndexType;
use databend_common_sql::MetadataRef;
use databend_common_sql::TypeCheck;
use databend_common_sql::optimizer::ir::ColumnStatSet;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::Exchange;
use databend_common_sql::plans::Join;
Expand Down Expand Up @@ -113,6 +115,11 @@ pub async fn build_runtime_filter(

let mut filters = Vec::new();

// Derive statistics for the build side to estimate NDV of join keys.
let build_rel_expr = databend_common_sql::optimizer::ir::RelExpr::with_s_expr(build_side);
let build_stat_info = build_rel_expr.derive_cardinality()?;
let build_column_stats = &build_stat_info.statistics.column_stats;

let probe_side = s_expr.probe_side_child();

// Process each probe key that has runtime filter information
Expand Down Expand Up @@ -144,10 +151,17 @@ pub async fn build_runtime_filter(
let build_table_rows =
get_build_table_rows(ctx.clone(), metadata, build_table_index).await?;

let data_type = build_key
.as_expr(&BUILTIN_FUNCTIONS)
.data_type()
.remove_nullable();
let build_key_expr = build_key.as_expr(&BUILTIN_FUNCTIONS);

// Estimate NDV for the build side join key using optimizer statistics.
// Handles all RemoteExpr variants by looking at the column references inside
// the expression. If the expression is constant, NDV is 1. If it contains
// exactly one column reference, reuse that column's NDV. Otherwise, fall
// back to the overall build-side cardinality.
let build_key_ndv = estimate_build_key_ndv(&build_key_expr, build_column_stats)
.unwrap_or_else(|| build_stat_info.cardinality.ceil() as u64);

let data_type = build_key_expr.data_type().remove_nullable();
let id = metadata.write().next_runtime_filter_id();

let enable_bloom_runtime_filter = is_type_supported_for_bloom_filter(&data_type);
Expand All @@ -159,6 +173,7 @@ pub async fn build_runtime_filter(
id,
build_key: build_key.clone(),
probe_targets,
build_key_ndv,
build_table_rows,
enable_bloom_runtime_filter,
enable_inlist_runtime_filter: true,
Expand All @@ -170,6 +185,23 @@ pub async fn build_runtime_filter(
Ok(PhysicalRuntimeFilters { filters })
}

fn estimate_build_key_ndv(
build_key: &Expr<IndexType>,
build_column_stats: &ColumnStatSet,
) -> Option<u64> {
let mut column_refs = build_key.column_refs();
if column_refs.is_empty() {
return Some(1);
}

if column_refs.len() == 1 {
let (id, _) = column_refs.drain().next().unwrap();
build_column_stats.get(&id).map(|s| s.ndv.ceil() as u64)
} else {
None
}
}

async fn get_build_table_rows(
ctx: Arc<dyn TableContext>,
metadata: &MetadataRef,
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/physical_plans/runtime_filter/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub struct PhysicalRuntimeFilter {
/// All probe targets in this list are in the same equivalence class
pub probe_targets: Vec<(RemoteExpr<String>, usize)>,

/// Estimated NDV of the build side join key, derived from optimizer statistics.
pub build_key_ndv: u64,

pub build_table_rows: Option<u64>,

/// Enable bloom filter for this runtime filter
Expand Down
27 changes: 10 additions & 17 deletions src/query/service/src/pipelines/processors/transforms/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use async_channel::Sender;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::DataBlock;
use databend_common_pipeline::core::InputPort;
use databend_common_pipeline::core::OutputPort;
Expand All @@ -30,13 +29,13 @@ use databend_common_pipeline::sources::AsyncSource;
use databend_common_pipeline::sources::AsyncSourcer;

pub struct BroadcastSourceProcessor {
pub receiver: Receiver<BlockMetaInfoPtr>,
pub receiver: Receiver<DataBlock>,
}

impl BroadcastSourceProcessor {
pub fn create(
ctx: Arc<dyn TableContext>,
receiver: Receiver<BlockMetaInfoPtr>,
receiver: Receiver<DataBlock>,
output_port: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx.get_scan_progress(), output_port, Self { receiver })
Expand All @@ -50,23 +49,20 @@ impl AsyncSource for BroadcastSourceProcessor {

#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
let received = self.receiver.recv().await;
match received {
Ok(meta) => Ok(Some(DataBlock::empty_with_meta(meta))),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
}
match self.receiver.recv().await {
Ok(block) => Ok(Some(block)),
// The channel is closed, we should return None to stop generating
Err(_) => Ok(None),
}
}
}

pub struct BroadcastSinkProcessor {
sender: Sender<BlockMetaInfoPtr>,
sender: Sender<DataBlock>,
}

impl BroadcastSinkProcessor {
pub fn create(input: Arc<InputPort>, sender: Sender<BlockMetaInfoPtr>) -> Result<ProcessorPtr> {
pub fn create(input: Arc<InputPort>, sender: Sender<DataBlock>) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {
sender,
})))
Expand All @@ -82,12 +78,9 @@ impl AsyncSink for BroadcastSinkProcessor {
Ok(())
}

async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
let meta = data_block
.take_meta()
.ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?;
async fn consume(&mut self, data_block: DataBlock) -> Result<bool> {
self.sender
.send(meta)
.send(data_block)
.await
.map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?;
Ok(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct RuntimeFilterDesc {
pub id: usize,
pub build_key: Expr,
pub probe_targets: Vec<(Expr<String>, usize)>,
pub build_key_ndv: u64,
pub build_table_rows: Option<u64>,
pub enable_bloom_runtime_filter: bool,
pub enable_inlist_runtime_filter: bool,
Expand Down Expand Up @@ -98,6 +99,7 @@ impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc {
.iter()
.map(|(probe_key, scan_id)| (probe_key.as_expr(&BUILTIN_FUNCTIONS), *scan_id))
.collect(),
build_key_ndv: runtime_filter.build_key_ndv,
build_table_rows: runtime_filter.build_table_rows,
enable_bloom_runtime_filter: runtime_filter.enable_bloom_runtime_filter,
enable_inlist_runtime_filter: runtime_filter.enable_inlist_runtime_filter,
Expand Down
Loading
Loading