Skip to content

Commit 759ebba

Browse files
authored
chore(query): add probe rows log for hash join (#19165)
chore(query): add probe rows for hash join
1 parent 31c24ee commit 759ebba

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ impl HashJoin {
452452
let joined_output = OutputPort::create();
453453

454454
let hash_join = TransformHashJoin::create(
455+
self.get_id(),
455456
build_input.clone(),
456457
probe_input.clone(),
457458
joined_output.clone(),

src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::any::Any;
1616
use std::fmt::Debug;
1717
use std::fmt::Formatter;
1818
use std::sync::Arc;
19+
use std::time::Instant;
1920

2021
use databend_common_exception::Result;
2122
use databend_common_expression::DataBlock;
@@ -37,17 +38,21 @@ pub struct TransformHashJoin {
3738
probe_port: Arc<InputPort>,
3839
joined_port: Arc<OutputPort>,
3940

41+
plan_id: u32,
4042
stage: Stage,
4143
join: Box<dyn Join>,
4244
joined_data: Option<DataBlock>,
4345
stage_sync_barrier: Arc<Barrier>,
4446
projection: ColumnSet,
4547
rf_desc: Arc<RuntimeFiltersDesc>,
48+
instant: Instant,
49+
probe_rows: usize,
4650
runtime_filter_builder: Option<RuntimeFilterLocalBuilder>,
4751
}
4852

4953
impl TransformHashJoin {
5054
pub fn create(
55+
plan_id: u32,
5156
build_port: Arc<InputPort>,
5257
probe_port: Arc<InputPort>,
5358
joined_port: Arc<OutputPort>,
@@ -65,6 +70,7 @@ impl TransformHashJoin {
6570
)?;
6671

6772
Ok(ProcessorPtr::create(Box::new(TransformHashJoin {
73+
plan_id,
6874
build_port,
6975
probe_port,
7076
joined_port,
@@ -78,6 +84,8 @@ impl TransformHashJoin {
7884
finished: false,
7985
build_data: None,
8086
}),
87+
probe_rows: 0,
88+
instant: Instant::now(),
8189
})))
8290
}
8391
}
@@ -156,6 +164,7 @@ impl Processor for TransformHashJoin {
156164
}
157165
Stage::Probe(state) => {
158166
if let Some(probe_data) = state.input_data.take() {
167+
self.probe_rows += probe_data.num_rows();
159168
let stream = self.join.probe_block(probe_data)?;
160169
// This is safe because both join and stream are properties of the struct.
161170
state.stream = Some(unsafe { std::mem::transmute(stream) });
@@ -216,8 +225,19 @@ impl Processor for TransformHashJoin {
216225

217226
Stage::BuildFinal(BuildFinalState::new())
218227
}
219-
Stage::BuildFinal(_) => Stage::Probe(ProbeState::new()),
220-
Stage::Probe(_) => Stage::ProbeFinal(ProbeFinalState::new()),
228+
Stage::BuildFinal(_) => {
229+
self.instant = Instant::now();
230+
Stage::Probe(ProbeState::new())
231+
}
232+
Stage::Probe(_) => {
233+
log::info!(
234+
"[NewHashJoin] {} HashJoin Probe completed: {} rows in {:?}",
235+
self.plan_id,
236+
self.probe_rows,
237+
self.instant.elapsed()
238+
);
239+
Stage::ProbeFinal(ProbeFinalState::new())
240+
}
221241
Stage::ProbeFinal(state) => match state.finished {
222242
true => Stage::Finished,
223243
false => Stage::ProbeFinal(ProbeFinalState {

0 commit comments

Comments
 (0)