diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 9a2e5802f105f..4525eef3ca374 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -452,6 +452,7 @@ impl HashJoin { let joined_output = OutputPort::create(); let hash_join = TransformHashJoin::create( + self.get_id(), build_input.clone(), probe_input.clone(), joined_output.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index 02684f029594c..433173717df9a 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use std::time::Instant; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -37,17 +38,21 @@ pub struct TransformHashJoin { probe_port: Arc, joined_port: Arc, + plan_id: u32, stage: Stage, join: Box, joined_data: Option, stage_sync_barrier: Arc, projection: ColumnSet, rf_desc: Arc, + instant: Instant, + probe_rows: usize, runtime_filter_builder: Option, } impl TransformHashJoin { pub fn create( + plan_id: u32, build_port: Arc, probe_port: Arc, joined_port: Arc, @@ -65,6 +70,7 @@ impl TransformHashJoin { )?; Ok(ProcessorPtr::create(Box::new(TransformHashJoin { + plan_id, build_port, probe_port, joined_port, @@ -78,6 +84,8 @@ impl TransformHashJoin { finished: false, build_data: None, }), + probe_rows: 0, + instant: Instant::now(), }))) } } @@ -156,6 +164,7 @@ impl Processor for TransformHashJoin { } Stage::Probe(state) => { if let Some(probe_data) = state.input_data.take() { + self.probe_rows += probe_data.num_rows(); let stream = self.join.probe_block(probe_data)?; // This is safe because both join and stream are properties of the struct. state.stream = Some(unsafe { std::mem::transmute(stream) }); @@ -216,8 +225,19 @@ impl Processor for TransformHashJoin { Stage::BuildFinal(BuildFinalState::new()) } - Stage::BuildFinal(_) => Stage::Probe(ProbeState::new()), - Stage::Probe(_) => Stage::ProbeFinal(ProbeFinalState::new()), + Stage::BuildFinal(_) => { + self.instant = Instant::now(); + Stage::Probe(ProbeState::new()) + } + Stage::Probe(_) => { + log::info!( + "[NewHashJoin] {} HashJoin Probe completed: {} rows in {:?}", + self.plan_id, + self.probe_rows, + self.instant.elapsed() + ); + Stage::ProbeFinal(ProbeFinalState::new()) + } Stage::ProbeFinal(state) => match state.finished { true => Stage::Finished, false => Stage::ProbeFinal(ProbeFinalState {