Skip to content

Commit 0120a0d

Browse files
authored
refactor(query): send runtime filter packets as data blocks (#19170)
* refactor(query): send runtime filter packets as data blocks * z
1 parent 2599a5f commit 0120a0d

File tree

11 files changed

+200
-71
lines changed

11 files changed

+200
-71
lines changed

src/query/expression/src/block.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ pub trait BlockMetaInfo: Debug + Send + Sync + Any + 'static {
368368
"The reason for not implementing clone_self is usually because the higher-level logic doesn't allow/need the associated block to be cloned."
369369
)
370370
}
371+
372+
fn override_block_schema(&self) -> Option<DataSchemaRef> {
373+
None
374+
}
371375
}
372376

373377
pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo {

src/query/pipeline/transforms/src/processors/transforms/sorts/sort_broadcast.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use async_channel::Sender;
2020
use databend_common_exception::ErrorCode;
2121
use databend_common_exception::Result;
2222
use databend_common_expression::BlockMetaInfoDowncast;
23-
use databend_common_expression::BlockMetaInfoPtr;
2423
use databend_common_expression::DataBlock;
2524
use databend_common_pipeline::core::Event;
2625
use databend_common_pipeline::core::InputPort;
@@ -70,8 +69,8 @@ pub struct SortSampleState<C: BroadcastChannel> {
7069
}
7170

7271
pub trait BroadcastChannel: Clone + Send + 'static {
73-
fn sender(&self) -> Sender<BlockMetaInfoPtr>;
74-
fn receiver(&self) -> Receiver<BlockMetaInfoPtr>;
72+
fn sender(&self) -> Sender<DataBlock>;
73+
fn receiver(&self) -> Receiver<DataBlock>;
7574
}
7675

7776
impl<C: BroadcastChannel> SortSampleState<C> {
@@ -91,16 +90,16 @@ impl<C: BroadcastChannel> SortSampleState<C> {
9190
let is_empty = meta.is_none();
9291
let meta = meta.map(|meta| meta.boxed()).unwrap_or(().boxed());
9392
sender
94-
.send(meta)
93+
.send(DataBlock::empty_with_meta(meta))
9594
.await
9695
.map_err(|_| ErrorCode::TokioError("send sort bounds failed"))?;
9796
sender.close();
9897
log::debug!(is_empty; "sample has sent");
9998

10099
let receiver = self.channel.receiver();
101100
let mut all = Vec::new();
102-
while let Ok(r) = receiver.recv().await {
103-
match SortExchangeMeta::downcast_from_err(r) {
101+
while let Ok(mut r) = receiver.recv().await {
102+
match SortExchangeMeta::downcast_from_err(r.take_meta().unwrap()) {
104103
Ok(meta) => all.push(meta),
105104
Err(r) => {
106105
debug_assert!(().boxed().equals(&r))

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,6 @@ impl HashJoin {
452452
let joined_output = OutputPort::create();
453453

454454
let hash_join = TransformHashJoin::create(
455-
self.get_id(),
456455
build_input.clone(),
457456
probe_input.clone(),
458457
joined_output.clone(),

src/query/service/src/pipelines/processors/transforms/broadcast.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use async_channel::Sender;
1919
use databend_common_catalog::table_context::TableContext;
2020
use databend_common_exception::ErrorCode;
2121
use databend_common_exception::Result;
22-
use databend_common_expression::BlockMetaInfoPtr;
2322
use databend_common_expression::DataBlock;
2423
use databend_common_pipeline::core::InputPort;
2524
use databend_common_pipeline::core::OutputPort;
@@ -30,13 +29,13 @@ use databend_common_pipeline::sources::AsyncSource;
3029
use databend_common_pipeline::sources::AsyncSourcer;
3130

3231
pub struct BroadcastSourceProcessor {
33-
pub receiver: Receiver<BlockMetaInfoPtr>,
32+
pub receiver: Receiver<DataBlock>,
3433
}
3534

3635
impl BroadcastSourceProcessor {
3736
pub fn create(
3837
ctx: Arc<dyn TableContext>,
39-
receiver: Receiver<BlockMetaInfoPtr>,
38+
receiver: Receiver<DataBlock>,
4039
output_port: Arc<OutputPort>,
4140
) -> Result<ProcessorPtr> {
4241
AsyncSourcer::create(ctx.get_scan_progress(), output_port, Self { receiver })
@@ -50,23 +49,20 @@ impl AsyncSource for BroadcastSourceProcessor {
5049

5150
#[async_backtrace::framed]
5251
async fn generate(&mut self) -> Result<Option<DataBlock>> {
53-
let received = self.receiver.recv().await;
54-
match received {
55-
Ok(meta) => Ok(Some(DataBlock::empty_with_meta(meta))),
56-
Err(_) => {
57-
// The channel is closed, we should return None to stop generating
58-
Ok(None)
59-
}
52+
match self.receiver.recv().await {
53+
Ok(block) => Ok(Some(block)),
54+
// The channel is closed, we should return None to stop generating
55+
Err(_) => Ok(None),
6056
}
6157
}
6258
}
6359

6460
pub struct BroadcastSinkProcessor {
65-
sender: Sender<BlockMetaInfoPtr>,
61+
sender: Sender<DataBlock>,
6662
}
6763

6864
impl BroadcastSinkProcessor {
69-
pub fn create(input: Arc<InputPort>, sender: Sender<BlockMetaInfoPtr>) -> Result<ProcessorPtr> {
65+
pub fn create(input: Arc<InputPort>, sender: Sender<DataBlock>) -> Result<ProcessorPtr> {
7066
Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {
7167
sender,
7268
})))
@@ -82,12 +78,9 @@ impl AsyncSink for BroadcastSinkProcessor {
8278
Ok(())
8379
}
8480

85-
async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
86-
let meta = data_block
87-
.take_meta()
88-
.ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?;
81+
async fn consume(&mut self, data_block: DataBlock) -> Result<bool> {
8982
self.sender
90-
.send(meta)
83+
.send(data_block)
9184
.await
9285
.map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?;
9386
Ok(false)

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/global.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use databend_common_exception::ErrorCode;
1616
use databend_common_exception::Result;
17-
use databend_common_expression::BlockMetaInfoDowncast;
1817

1918
use super::merge::merge_join_runtime_filter_packets;
2019
use super::packet::JoinRuntimeFilterPacket;
@@ -30,13 +29,13 @@ pub async fn get_global_runtime_filter_packet(
3029
let mut received = vec![];
3130

3231
sender
33-
.send(Box::new(local_packet))
32+
.send(local_packet.try_into()?)
3433
.await
3534
.map_err(|_| ErrorCode::TokioError("send runtime filter shards failed"))?;
3635
sender.close();
3736

38-
while let Ok(r) = receiver.recv().await {
39-
received.push(JoinRuntimeFilterPacket::downcast_from(r).unwrap());
37+
while let Ok(data_block) = receiver.recv().await {
38+
received.push(JoinRuntimeFilterPacket::try_from(data_block)?);
4039
}
4140
merge_join_runtime_filter_packets(received)
4241
}

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,20 @@ use std::collections::HashMap;
1616
use std::fmt;
1717
use std::fmt::Debug;
1818

19+
use databend_common_column::buffer::Buffer;
20+
use databend_common_exception::ErrorCode;
21+
use databend_common_exception::Result;
1922
use databend_common_expression::BlockMetaInfo;
2023
use databend_common_expression::BlockMetaInfoDowncast;
2124
use databend_common_expression::Column;
25+
use databend_common_expression::ColumnBuilder;
26+
use databend_common_expression::DataBlock;
27+
use databend_common_expression::DataSchemaRef;
2228
use databend_common_expression::Scalar;
29+
use databend_common_expression::types::ArrayColumn;
30+
use databend_common_expression::types::NumberColumn;
31+
use databend_common_expression::types::NumberColumnBuilder;
32+
use databend_common_expression::types::array::ArrayColumnBuilder;
2333

2434
use crate::pipelines::processors::transforms::RuntimeFilterDesc;
2535

@@ -84,15 +94,153 @@ impl JoinRuntimeFilterPacket {
8494
}
8595
}
8696

97+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
98+
struct FlightRuntimeFilterPacket {
99+
pub id: usize,
100+
pub bloom: Option<usize>,
101+
pub inlist: Option<usize>,
102+
pub min_max: Option<SerializableDomain>,
103+
}
104+
105+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
106+
struct FlightJoinRuntimeFilterPacket {
107+
#[serde(default)]
108+
pub build_rows: usize,
109+
#[serde(default)]
110+
pub packets: Option<HashMap<usize, FlightRuntimeFilterPacket>>,
111+
112+
pub schema: DataSchemaRef,
113+
}
114+
115+
impl TryInto<DataBlock> for JoinRuntimeFilterPacket {
116+
type Error = ErrorCode;
117+
118+
fn try_into(mut self) -> Result<DataBlock> {
119+
let mut entities = vec![];
120+
let mut join_flight_packets = None;
121+
122+
if let Some(packets) = self.packets.take() {
123+
let mut flight_packets = HashMap::with_capacity(packets.len());
124+
125+
for (id, packet) in packets {
126+
let mut inlist_pos = None;
127+
if let Some(in_list) = packet.inlist {
128+
let len = in_list.len() as u64;
129+
inlist_pos = Some(entities.len());
130+
entities.push(Column::Array(Box::new(ArrayColumn::new(
131+
in_list,
132+
Buffer::from(vec![0, len]),
133+
))));
134+
}
135+
136+
let mut bloom_pos = None;
137+
if let Some(bloom_filter) = packet.bloom {
138+
let len = bloom_filter.len() as u64;
139+
bloom_pos = Some(entities.len());
140+
141+
let builder = ArrayColumnBuilder {
142+
builder: ColumnBuilder::Number(NumberColumnBuilder::UInt64(bloom_filter)),
143+
offsets: vec![0, len],
144+
};
145+
entities.push(Column::Array(Box::new(builder.build())));
146+
}
147+
148+
flight_packets.insert(id, FlightRuntimeFilterPacket {
149+
id,
150+
bloom: bloom_pos,
151+
inlist: inlist_pos,
152+
min_max: packet.min_max,
153+
});
154+
}
155+
156+
join_flight_packets = Some(flight_packets);
157+
}
158+
159+
let data_block = match entities.is_empty() {
160+
true => DataBlock::empty(),
161+
false => DataBlock::new_from_columns(entities),
162+
};
163+
164+
let schema = DataSchemaRef::new(data_block.infer_schema());
165+
166+
data_block.add_meta(Some(Box::new(FlightJoinRuntimeFilterPacket {
167+
build_rows: self.build_rows,
168+
packets: join_flight_packets,
169+
schema,
170+
})))
171+
}
172+
}
173+
174+
impl TryFrom<DataBlock> for JoinRuntimeFilterPacket {
175+
type Error = ErrorCode;
176+
177+
fn try_from(mut block: DataBlock) -> Result<Self> {
178+
if let Some(meta) = block.take_meta() {
179+
let flight_join_rf = FlightJoinRuntimeFilterPacket::downcast_from(meta)
180+
.ok_or_else(|| ErrorCode::Internal("It's a bug"))?;
181+
182+
let Some(packet) = flight_join_rf.packets else {
183+
return Ok(JoinRuntimeFilterPacket {
184+
packets: None,
185+
build_rows: flight_join_rf.build_rows,
186+
});
187+
};
188+
189+
let mut flight_packets = HashMap::with_capacity(packet.len());
190+
for (id, flight_packet) in packet {
191+
let mut inlist = None;
192+
if let Some(column_idx) = flight_packet.inlist {
193+
let column = block.get_by_offset(column_idx).clone();
194+
let column = column.into_column().unwrap();
195+
let array_column = column.into_array().expect("it's a bug");
196+
inlist = Some(array_column.index(0).expect("It's a bug"));
197+
}
198+
199+
let mut bloom = None;
200+
if let Some(column_idx) = flight_packet.bloom {
201+
let column = block.get_by_offset(column_idx).clone();
202+
let column = column.into_column().unwrap();
203+
let array_column = column.into_array().expect("it's a bug");
204+
let bloom_value_column = array_column.index(0).expect("It's a bug");
205+
bloom = Some(match bloom_value_column {
206+
Column::Number(NumberColumn::UInt64(v)) => v.to_vec(),
207+
_ => unreachable!("Unexpected runtime bloom filter column type"),
208+
})
209+
}
210+
211+
flight_packets.insert(id, RuntimeFilterPacket {
212+
bloom,
213+
inlist,
214+
id: flight_packet.id,
215+
min_max: flight_packet.min_max,
216+
});
217+
}
218+
219+
return Ok(JoinRuntimeFilterPacket {
220+
packets: Some(flight_packets),
221+
build_rows: flight_join_rf.build_rows,
222+
});
223+
}
224+
225+
Err(ErrorCode::Internal(
226+
"Unexpected runtime filter packet meta type. It's a bug",
227+
))
228+
}
229+
}
230+
87231
#[typetag::serde(name = "join_runtime_filter_packet")]
88-
impl BlockMetaInfo for JoinRuntimeFilterPacket {
232+
impl BlockMetaInfo for FlightJoinRuntimeFilterPacket {
89233
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
90-
JoinRuntimeFilterPacket::downcast_ref_from(info).is_some_and(|other| self == other)
234+
FlightJoinRuntimeFilterPacket::downcast_ref_from(info).is_some_and(|other| self == other)
91235
}
92236

93237
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
94238
Box::new(self.clone())
95239
}
240+
241+
fn override_block_schema(&self) -> Option<DataSchemaRef> {
242+
Some(self.schema.clone())
243+
}
96244
}
97245

98246
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]

0 commit comments

Comments
 (0)