From 5897e9a238c9c3b22525eb1338df5877209175ee Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 29 Dec 2025 13:05:32 +0800 Subject: [PATCH] chore(query): log hash join stage timings --- .../new_hash_join/transform_hash_join.rs | 71 ++++++++++++++++--- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index 02684f029594c..76df5ca47280f 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use std::time::Instant; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -44,6 +45,7 @@ pub struct TransformHashJoin { projection: ColumnSet, rf_desc: Arc, runtime_filter_builder: Option, + instant: Instant, } impl TransformHashJoin { @@ -78,6 +80,7 @@ impl TransformHashJoin { finished: false, build_data: None, }), + instant: Instant::now(), }))) } } @@ -196,6 +199,7 @@ impl Processor for TransformHashJoin { } async fn async_process(&mut self) -> Result<()> { + let elapsed = self.instant.elapsed(); let wait_res = self.stage_sync_barrier.wait().await; self.stage = match &mut self.stage { @@ -205,7 +209,9 @@ impl Processor for TransformHashJoin { self.join.add_runtime_filter_packet(packet); } + let rf_build_elapsed = self.instant.elapsed() - elapsed; let _wait_res = self.stage_sync_barrier.wait().await; + let before_wait = self.instant.elapsed(); if wait_res.is_leader() { let packet = self.join.build_runtime_filter()?; @@ -213,18 +219,67 @@ impl Processor for TransformHashJoin { } let _wait_res = self.stage_sync_barrier.wait().await; + let wait_rf_elapsed = self.instant.elapsed() - before_wait; + log::info!( + "HashJoin build stage, sync work elapsed: {:?}, build rf elapsed: {:?}, wait other node rf elapsed: {:?}", + elapsed, + rf_build_elapsed, + wait_rf_elapsed + ); + + self.instant = Instant::now(); Stage::BuildFinal(BuildFinalState::new()) } - Stage::BuildFinal(_) => Stage::Probe(ProbeState::new()), - Stage::Probe(_) => Stage::ProbeFinal(ProbeFinalState::new()), + Stage::BuildFinal(_) => { + let wait_elapsed = self.instant.elapsed() - elapsed; + log::info!( + "HashJoin build final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + elapsed, + wait_elapsed + ); + + self.instant = Instant::now(); + Stage::Probe(ProbeState::new()) + } + Stage::Probe(_) => { + let wait_elapsed = self.instant.elapsed() - elapsed; + log::info!( + "HashJoin probe stage, sync work elapsed: {:?}, wait elapsed: {:?}", + elapsed, + wait_elapsed + ); + + self.instant = Instant::now(); + Stage::ProbeFinal(ProbeFinalState::new()) + } Stage::ProbeFinal(state) => match state.finished { - true => Stage::Finished, - false => Stage::ProbeFinal(ProbeFinalState { - initialize: true, - finished: state.finished, - stream: state.stream.take(), - }), + true => { + let wait_elapsed = self.instant.elapsed() - elapsed; + log::info!( + "HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + elapsed, + wait_elapsed + ); + + self.instant = Instant::now(); + Stage::Finished + } + false => { + let wait_elapsed = self.instant.elapsed() - elapsed; + log::info!( + "HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + elapsed, + wait_elapsed + ); + + self.instant = Instant::now(); + Stage::ProbeFinal(ProbeFinalState { + initialize: true, + finished: state.finished, + stream: state.stream.take(), + }) + } }, Stage::Finished => Stage::Finished, };