From 1fa66ab01914ba75e6a7e0bf1d7e3c1cc8ec5c7d Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 26 Dec 2025 18:56:31 +0800 Subject: [PATCH 1/2] refactor(query): send runtime filter packets as data blocks --- src/query/expression/src/block.rs | 4 + .../transforms/sorts/sort_broadcast.rs | 11 +- .../processors/transforms/broadcast.rs | 27 ++-- .../hash_join/runtime_filter/global.rs | 7 +- .../hash_join/runtime_filter/packet.rs | 152 +++++++++++++++++- .../transforms/sort/sort_builder.rs | 6 +- .../exchange/serde/exchange_deserializer.rs | 13 +- src/query/service/src/sessions/query_ctx.rs | 9 +- .../service/src/sessions/query_ctx_shared.rs | 17 +- 9 files changed, 198 insertions(+), 48 deletions(-) diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index bd0fdd7be4cfa..ca32b8e9ac73b 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -368,6 +368,10 @@ pub trait BlockMetaInfo: Debug + Send + Sync + Any + 'static { "The reason for not implementing clone_self is usually because the higher-level logic doesn't allow/need the associated block to be cloned." ) } + + fn override_block_schema(&self) -> Option { + None + } } pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo { diff --git a/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs b/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs index 76dd17839b755..8661c9e3b4f96 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs @@ -20,7 +20,6 @@ use async_channel::Sender; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; use databend_common_pipeline::core::Event; use databend_common_pipeline::core::InputPort; @@ -70,8 +69,8 @@ pub struct SortSampleState { } pub trait BroadcastChannel: Clone + Send + 'static { - fn sender(&self) -> Sender; - fn receiver(&self) -> Receiver; + fn sender(&self) -> Sender; + fn receiver(&self) -> Receiver; } impl SortSampleState { @@ -91,7 +90,7 @@ impl SortSampleState { let is_empty = meta.is_none(); let meta = meta.map(|meta| meta.boxed()).unwrap_or(().boxed()); sender - .send(meta) + .send(DataBlock::empty_with_meta(meta)) .await .map_err(|_| ErrorCode::TokioError("send sort bounds failed"))?; sender.close(); @@ -99,8 +98,8 @@ impl SortSampleState { let receiver = self.channel.receiver(); let mut all = Vec::new(); - while let Ok(r) = receiver.recv().await { - match SortExchangeMeta::downcast_from_err(r) { + while let Ok(mut r) = receiver.recv().await { + match SortExchangeMeta::downcast_from_err(r.take_meta().unwrap()) { Ok(meta) => all.push(meta), Err(r) => { debug_assert!(().boxed().equals(&r)) diff --git a/src/query/service/src/pipelines/processors/transforms/broadcast.rs b/src/query/service/src/pipelines/processors/transforms/broadcast.rs index d68ce62653b4f..985b354cbfbc5 100644 --- a/src/query/service/src/pipelines/processors/transforms/broadcast.rs +++ b/src/query/service/src/pipelines/processors/transforms/broadcast.rs @@ -19,7 +19,6 @@ use async_channel::Sender; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; @@ -30,13 +29,13 @@ use databend_common_pipeline::sources::AsyncSource; use databend_common_pipeline::sources::AsyncSourcer; pub struct BroadcastSourceProcessor { - pub receiver: Receiver, + pub receiver: Receiver, } impl BroadcastSourceProcessor { pub fn create( ctx: Arc, - receiver: Receiver, + receiver: Receiver, output_port: Arc, ) -> Result { AsyncSourcer::create(ctx.get_scan_progress(), output_port, Self { receiver }) @@ -50,23 +49,20 @@ impl AsyncSource for BroadcastSourceProcessor { #[async_backtrace::framed] async fn generate(&mut self) -> Result> { - let received = self.receiver.recv().await; - match received { - Ok(meta) => Ok(Some(DataBlock::empty_with_meta(meta))), - Err(_) => { - // The channel is closed, we should return None to stop generating - Ok(None) - } + match self.receiver.recv().await { + Ok(block) => Ok(Some(block)), + // The channel is closed, we should return None to stop generating + Err(_) => Ok(None), } } } pub struct BroadcastSinkProcessor { - sender: Sender, + sender: Sender, } impl BroadcastSinkProcessor { - pub fn create(input: Arc, sender: Sender) -> Result { + pub fn create(input: Arc, sender: Sender) -> Result { Ok(ProcessorPtr::create(AsyncSinker::create(input, Self { sender, }))) @@ -82,12 +78,9 @@ impl AsyncSink for BroadcastSinkProcessor { Ok(()) } - async fn consume(&mut self, mut data_block: DataBlock) -> Result { - let meta = data_block - .take_meta() - .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?; + async fn consume(&mut self, data_block: DataBlock) -> Result { self.sender - .send(meta) + .send(data_block) .await .map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?; Ok(false) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs index 581cbd28f0784..fd7504930164b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs @@ -14,7 +14,6 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoDowncast; use super::merge::merge_join_runtime_filter_packets; use super::packet::JoinRuntimeFilterPacket; @@ -30,13 +29,13 @@ pub async fn get_global_runtime_filter_packet( let mut received = vec![]; sender - .send(Box::new(local_packet)) + .send(local_packet.try_into()?) .await .map_err(|_| ErrorCode::TokioError("send runtime filter shards failed"))?; sender.close(); - while let Ok(r) = receiver.recv().await { - received.push(JoinRuntimeFilterPacket::downcast_from(r).unwrap()); + while let Ok(data_block) = receiver.recv().await { + received.push(JoinRuntimeFilterPacket::try_from(data_block)?); } merge_join_runtime_filter_packets(received) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs index 9e30ac18b5bb1..3f9f94c09b399 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs @@ -16,10 +16,20 @@ use std::collections::HashMap; use std::fmt; use std::fmt::Debug; +use databend_common_column::buffer::Buffer; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::Column; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; use databend_common_expression::Scalar; +use databend_common_expression::types::ArrayColumn; +use databend_common_expression::types::NumberColumn; +use databend_common_expression::types::NumberColumnBuilder; +use databend_common_expression::types::array::ArrayColumnBuilder; use crate::pipelines::processors::transforms::RuntimeFilterDesc; @@ -84,15 +94,153 @@ impl JoinRuntimeFilterPacket { } } +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] +struct FlightRuntimeFilterPacket { + pub id: usize, + pub bloom: Option, + pub inlist: Option, + pub min_max: Option, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] +struct FlightJoinRuntimeFilterPacket { + #[serde(default)] + pub build_rows: usize, + #[serde(default)] + pub packets: Option>, + + pub schema: DataSchemaRef, +} + +impl TryInto for JoinRuntimeFilterPacket { + type Error = ErrorCode; + + fn try_into(mut self) -> Result { + let mut entities = vec![]; + let mut join_flight_packets = None; + + if let Some(packets) = self.packets.take() { + let mut flight_packets = HashMap::with_capacity(packets.len()); + + for (id, packet) in packets { + let mut inlist_pos = None; + if let Some(in_list) = packet.inlist { + let len = in_list.len() as u64; + inlist_pos = Some(entities.len()); + entities.push(Column::Array(Box::new(ArrayColumn::new( + in_list, + Buffer::from(vec![0, len]), + )))); + } + + let mut bloom_pos = None; + if let Some(bloom_filter) = packet.bloom { + let len = bloom_filter.len() as u64; + bloom_pos = Some(entities.len()); + + let builder = ArrayColumnBuilder { + builder: ColumnBuilder::Number(NumberColumnBuilder::UInt64(bloom_filter)), + offsets: vec![0, len], + }; + entities.push(Column::Array(Box::new(builder.build()))); + } + + flight_packets.insert(id, FlightRuntimeFilterPacket { + id, + bloom: bloom_pos, + inlist: inlist_pos, + min_max: packet.min_max, + }); + } + + join_flight_packets = Some(flight_packets); + } + + let data_block = match entities.is_empty() { + true => DataBlock::empty(), + false => DataBlock::new_from_columns(entities), + }; + + let schema = DataSchemaRef::new(data_block.infer_schema()); + + data_block.add_meta(Some(Box::new(FlightJoinRuntimeFilterPacket { + build_rows: self.build_rows, + packets: join_flight_packets, + schema, + }))) + } +} + +impl TryFrom for JoinRuntimeFilterPacket { + type Error = ErrorCode; + + fn try_from(mut block: DataBlock) -> Result { + if let Some(meta) = block.take_meta() { + let flight_join_rf = FlightJoinRuntimeFilterPacket::downcast_from(meta) + .ok_or_else(|| ErrorCode::Internal("It's a bug"))?; + + let Some(packet) = flight_join_rf.packets else { + return Ok(JoinRuntimeFilterPacket { + packets: None, + build_rows: flight_join_rf.build_rows, + }); + }; + + let mut flight_packets = HashMap::with_capacity(packet.len()); + for (id, flight_packet) in packet { + let mut inlist = None; + if let Some(column_idx) = flight_packet.inlist { + let column = block.get_by_offset(column_idx).clone(); + let column = column.into_column().unwrap(); + let array_column = column.into_array().expect("it's a bug"); + inlist = Some(array_column.index(0).expect("It's a bug")); + } + + let mut bloom = None; + if let Some(column_idx) = flight_packet.bloom { + let column = block.get_by_offset(column_idx).clone(); + let column = column.into_column().unwrap(); + let array_column = column.into_array().expect("it's a bug"); + let bloom_value_column = array_column.index(0).expect("It's a bug"); + bloom = Some(match bloom_value_column { + Column::Number(NumberColumn::UInt64(v)) => v.to_vec(), + _ => unreachable!("Unexpected runtime bloom filter column type"), + }) + } + + flight_packets.insert(id, RuntimeFilterPacket { + bloom, + inlist, + id: flight_packet.id, + min_max: flight_packet.min_max, + }); + } + + return Ok(JoinRuntimeFilterPacket { + packets: Some(flight_packets), + build_rows: flight_join_rf.build_rows, + }); + } + + Err(ErrorCode::Internal( + "Unexpected runtime filter packet meta type. It's a bug", + )) + } +} + #[typetag::serde(name = "join_runtime_filter_packet")] -impl BlockMetaInfo for JoinRuntimeFilterPacket { +impl BlockMetaInfo for FlightJoinRuntimeFilterPacket { fn equals(&self, info: &Box) -> bool { - JoinRuntimeFilterPacket::downcast_ref_from(info).is_some_and(|other| self == other) + FlightJoinRuntimeFilterPacket::downcast_ref_from(info).is_some_and(|other| self == other) } fn clone_self(&self) -> Box { Box::new(self.clone()) } + + fn override_block_schema(&self) -> Option { + Some(self.schema.clone()) + } } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs index e22e91901bc33..66780ee813ab5 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; +use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::SortColumnDescription; use databend_common_pipeline::core::InputPort; @@ -453,11 +453,11 @@ struct ContextChannel { } impl BroadcastChannel for ContextChannel { - fn sender(&self) -> async_channel::Sender { + fn sender(&self) -> async_channel::Sender { self.ctx.broadcast_source_sender(self.id) } - fn receiver(&self) -> async_channel::Receiver { + fn receiver(&self) -> async_channel::Receiver { self.ctx.broadcast_sink_receiver(self.id) } } diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs index a4882273e3039..1e7df7399bf79 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs @@ -76,8 +76,17 @@ impl TransformExchangeDeserializer { return Ok(DataBlock::new_with_meta(vec![], 0, meta)); } - let data_block = - deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())?; + let mut schema = self.schema.clone(); + let mut arrow_schema = self.arrow_schema.clone(); + + if let Some(metadata) = &meta { + if let Some(dynamic_schema) = metadata.override_block_schema() { + arrow_schema = Arc::new(ArrowSchema::from(dynamic_schema.as_ref())); + schema = dynamic_schema; + } + } + + let data_block = deserialize_block(dict, fragment_data, &schema, arrow_schema)?; if data_block.num_columns() == 0 { return Ok(DataBlock::new_with_meta(vec![], row_count as usize, meta)); } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index aab5b283837a5..3d57b985a534b 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -76,7 +76,6 @@ use databend_common_catalog::table_context::StageAttachment; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::BlockThresholds; use databend_common_expression::DataBlock; use databend_common_expression::Expr; @@ -322,14 +321,14 @@ impl QueryContext { self.shared.attach_table(catalog, database, name, table) } - pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { self.shared.broadcast_source_receiver(broadcast_id) } /// Get a sender to broadcast data /// /// Note: The channel must be closed by calling close() after data transmission is completed - pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { self.shared.broadcast_source_sender(broadcast_id) } @@ -337,11 +336,11 @@ impl QueryContext { /// /// Note: receive() can be called repeatedly until an Error is returned, indicating /// that the upstream channel has been closed - pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { self.shared.broadcast_sink_receiver(broadcast_id) } - pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { self.shared.broadcast_sink_sender(broadcast_id) } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index c0b372b1c8e44..5a3e614cbcc56 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -48,7 +48,6 @@ use databend_common_catalog::table_context::StageAttachment; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; use databend_common_meta_app::principal::OnErrorMode; use databend_common_meta_app::principal::RoleInfo; @@ -192,10 +191,10 @@ pub struct QueryContextShared { #[derive(Default)] pub struct BroadcastChannel { - pub source_sender: Option>, - pub source_receiver: Option>, - pub sink_sender: Option>, - pub sink_receiver: Option>, + pub source_sender: Option>, + pub source_receiver: Option>, + pub sink_sender: Option>, + pub sink_receiver: Option>, } impl QueryContextShared { @@ -273,7 +272,7 @@ impl QueryContextShared { })) } - pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.source_receiver.take() { @@ -285,7 +284,7 @@ impl QueryContextShared { } } } - pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_source_sender(&self, broadcast_id: u32) -> Sender { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.source_sender.take() { @@ -298,7 +297,7 @@ impl QueryContextShared { } } - pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { + pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.sink_receiver.take() { @@ -310,7 +309,7 @@ impl QueryContextShared { } } } - pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { + pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender { let mut broadcast_channels = self.broadcast_channels.lock(); let entry = broadcast_channels.entry(broadcast_id).or_default(); match entry.sink_sender.take() { From 0cd62f157754951090232825580f066cab824254 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 27 Dec 2025 15:55:32 +0800 Subject: [PATCH 2/2] z --- .../src/physical_plans/physical_hash_join.rs | 1 - .../new_hash_join/transform_hash_join.rs | 24 ++----------------- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 4525eef3ca374..9a2e5802f105f 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -452,7 +452,6 @@ impl HashJoin { let joined_output = OutputPort::create(); let hash_join = TransformHashJoin::create( - self.get_id(), build_input.clone(), probe_input.clone(), joined_output.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index 433173717df9a..02684f029594c 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -16,7 +16,6 @@ use std::any::Any; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use std::time::Instant; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -38,21 +37,17 @@ pub struct TransformHashJoin { probe_port: Arc, joined_port: Arc, - plan_id: u32, stage: Stage, join: Box, joined_data: Option, stage_sync_barrier: Arc, projection: ColumnSet, rf_desc: Arc, - instant: Instant, - probe_rows: usize, runtime_filter_builder: Option, } impl TransformHashJoin { pub fn create( - plan_id: u32, build_port: Arc, probe_port: Arc, joined_port: Arc, @@ -70,7 +65,6 @@ impl TransformHashJoin { )?; Ok(ProcessorPtr::create(Box::new(TransformHashJoin { - plan_id, build_port, probe_port, joined_port, @@ -84,8 +78,6 @@ impl TransformHashJoin { finished: false, build_data: None, }), - probe_rows: 0, - instant: Instant::now(), }))) } } @@ -164,7 +156,6 @@ impl Processor for TransformHashJoin { } Stage::Probe(state) => { if let Some(probe_data) = state.input_data.take() { - self.probe_rows += probe_data.num_rows(); let stream = self.join.probe_block(probe_data)?; // This is safe because both join and stream are properties of the struct. state.stream = Some(unsafe { std::mem::transmute(stream) }); @@ -225,19 +216,8 @@ impl Processor for TransformHashJoin { Stage::BuildFinal(BuildFinalState::new()) } - Stage::BuildFinal(_) => { - self.instant = Instant::now(); - Stage::Probe(ProbeState::new()) - } - Stage::Probe(_) => { - log::info!( - "[NewHashJoin] {} HashJoin Probe completed: {} rows in {:?}", - self.plan_id, - self.probe_rows, - self.instant.elapsed() - ); - Stage::ProbeFinal(ProbeFinalState::new()) - } + Stage::BuildFinal(_) => Stage::Probe(ProbeState::new()), + Stage::Probe(_) => Stage::ProbeFinal(ProbeFinalState::new()), Stage::ProbeFinal(state) => match state.finished { true => Stage::Finished, false => Stage::ProbeFinal(ProbeFinalState {