Skip to content

Commit 964a6e4

Browse files
authored
chore(query): log hash join stage timings (#19179)
1 parent 4a0b82a commit 964a6e4

File tree

1 file changed

+63
-8
lines changed

1 file changed

+63
-8
lines changed

src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::any::Any;
1616
use std::fmt::Debug;
1717
use std::fmt::Formatter;
1818
use std::sync::Arc;
19+
use std::time::Instant;
1920

2021
use databend_common_exception::Result;
2122
use databend_common_expression::DataBlock;
@@ -44,6 +45,7 @@ pub struct TransformHashJoin {
4445
projection: ColumnSet,
4546
rf_desc: Arc<RuntimeFiltersDesc>,
4647
runtime_filter_builder: Option<RuntimeFilterLocalBuilder>,
48+
instant: Instant,
4749
}
4850

4951
impl TransformHashJoin {
@@ -78,6 +80,7 @@ impl TransformHashJoin {
7880
finished: false,
7981
build_data: None,
8082
}),
83+
instant: Instant::now(),
8184
})))
8285
}
8386
}
@@ -196,6 +199,7 @@ impl Processor for TransformHashJoin {
196199
}
197200

198201
async fn async_process(&mut self) -> Result<()> {
202+
let elapsed = self.instant.elapsed();
199203
let wait_res = self.stage_sync_barrier.wait().await;
200204

201205
self.stage = match &mut self.stage {
@@ -205,26 +209,77 @@ impl Processor for TransformHashJoin {
205209
self.join.add_runtime_filter_packet(packet);
206210
}
207211

212+
let rf_build_elapsed = self.instant.elapsed() - elapsed;
208213
let _wait_res = self.stage_sync_barrier.wait().await;
214+
let before_wait = self.instant.elapsed();
209215

210216
if wait_res.is_leader() {
211217
let packet = self.join.build_runtime_filter()?;
212218
self.rf_desc.globalization(packet).await?;
213219
}
214220

215221
let _wait_res = self.stage_sync_barrier.wait().await;
222+
let wait_rf_elapsed = self.instant.elapsed() - before_wait;
216223

224+
log::info!(
225+
"HashJoin build stage, sync work elapsed: {:?}, build rf elapsed: {:?}, wait other node rf elapsed: {:?}",
226+
elapsed,
227+
rf_build_elapsed,
228+
wait_rf_elapsed
229+
);
230+
231+
self.instant = Instant::now();
217232
Stage::BuildFinal(BuildFinalState::new())
218233
}
219-
Stage::BuildFinal(_) => Stage::Probe(ProbeState::new()),
220-
Stage::Probe(_) => Stage::ProbeFinal(ProbeFinalState::new()),
234+
Stage::BuildFinal(_) => {
235+
let wait_elapsed = self.instant.elapsed() - elapsed;
236+
log::info!(
237+
"HashJoin build final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
238+
elapsed,
239+
wait_elapsed
240+
);
241+
242+
self.instant = Instant::now();
243+
Stage::Probe(ProbeState::new())
244+
}
245+
Stage::Probe(_) => {
246+
let wait_elapsed = self.instant.elapsed() - elapsed;
247+
log::info!(
248+
"HashJoin probe stage, sync work elapsed: {:?}, wait elapsed: {:?}",
249+
elapsed,
250+
wait_elapsed
251+
);
252+
253+
self.instant = Instant::now();
254+
Stage::ProbeFinal(ProbeFinalState::new())
255+
}
221256
Stage::ProbeFinal(state) => match state.finished {
222-
true => Stage::Finished,
223-
false => Stage::ProbeFinal(ProbeFinalState {
224-
initialize: true,
225-
finished: state.finished,
226-
stream: state.stream.take(),
227-
}),
257+
true => {
258+
let wait_elapsed = self.instant.elapsed() - elapsed;
259+
log::info!(
260+
"HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
261+
elapsed,
262+
wait_elapsed
263+
);
264+
265+
self.instant = Instant::now();
266+
Stage::Finished
267+
}
268+
false => {
269+
let wait_elapsed = self.instant.elapsed() - elapsed;
270+
log::info!(
271+
"HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
272+
elapsed,
273+
wait_elapsed
274+
);
275+
276+
self.instant = Instant::now();
277+
Stage::ProbeFinal(ProbeFinalState {
278+
initialize: true,
279+
finished: state.finished,
280+
stream: state.stream.take(),
281+
})
282+
}
228283
},
229284
Stage::Finished => Stage::Finished,
230285
};

0 commit comments

Comments
 (0)