diff --git a/src/common/base/src/runtime/profile/profiles.rs b/src/common/base/src/runtime/profile/profiles.rs index ed2a1e4e8828f..4a5f1cfaf50f2 100644 --- a/src/common/base/src/runtime/profile/profiles.rs +++ b/src/common/base/src/runtime/profile/profiles.rs @@ -65,6 +65,7 @@ pub enum ProfileStatisticsName { MemoryUsage, ExternalServerRetryCount, ExternalServerRequestCount, + ScheduleTime, } #[derive(Clone, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Debug)] @@ -368,6 +369,13 @@ pub fn get_statistics_desc() -> Arc unit: StatisticsUnit::Count, plain_statistics: true, }), + (ProfileStatisticsName::ScheduleTime, ProfileDesc { + display_name: "schedule time", + desc: "The time spent to wait in nanoseconds, usually used to measure the time spent on waiting for Executor", + index: ProfileStatisticsName::ScheduleTime as usize, + unit: StatisticsUnit::NanoSeconds, + plain_statistics: false, + }), ])) }).clone() } diff --git a/src/query/pipeline/src/core/profile.rs b/src/query/pipeline/src/core/profile.rs index 850e5ce59a706..14967ee3651b6 100644 --- a/src/query/pipeline/src/core/profile.rs +++ b/src/query/pipeline/src/core/profile.rs @@ -18,6 +18,7 @@ use std::collections::btree_map::Entry; use std::sync::Arc; use std::sync::atomic::Ordering; +use databend_common_base::hints::assume; use databend_common_base::runtime::error_info::NodeErrorType; use databend_common_base::runtime::metrics::MetricSample; use databend_common_base::runtime::metrics::ScopedRegistry; @@ -76,7 +77,7 @@ pub struct PlanProfile { pub title: Arc, pub labels: Arc>, - pub statistics: [usize; std::mem::variant_count::()], + pub statistics: Vec, #[serde(skip_serializing_if = "std::collections::BTreeMap::is_empty")] #[serde(default)] pub metrics: BTreeMap>, @@ -114,12 +115,13 @@ impl PlanProfile { title: profile.title.clone(), labels: profile.labels.clone(), metrics: BTreeMap::new(), - statistics: std::array::from_fn(|_| 0), + statistics: vec![0; std::mem::variant_count::()], errors: Self::get_profile_error(profile), } } pub fn accumulate(&mut self, profile: &Profile) { + assume(self.statistics.len() == std::mem::variant_count::()); for index in 0..std::mem::variant_count::() { self.statistics[index] += profile.statistics[index].load(Ordering::SeqCst); } @@ -132,6 +134,8 @@ impl PlanProfile { self.parent_id = profile.parent_id; } + assume(self.statistics.len() == std::mem::variant_count::()); + assume(profile.statistics.len() == std::mem::variant_count::()); for index in 0..std::mem::variant_count::() { self.statistics[index] += profile.statistics[index]; } diff --git a/src/query/service/src/physical_plans/format/common.rs b/src/query/service/src/physical_plans/format/common.rs index 22e353080d9fb..50ba2f04879aa 100644 --- a/src/query/service/src/physical_plans/format/common.rs +++ b/src/query/service/src/physical_plans/format/common.rs @@ -16,6 +16,8 @@ use std::collections::HashMap; use databend_common_ast::ast::FormatTreeNode; use databend_common_base::base::format_byte_size; +use databend_common_base::hints::assume; +use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::profile::get_statistics_desc; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::runtime_filter_info::RuntimeFilterReport; @@ -212,6 +214,7 @@ pub fn append_output_rows_info( plan_id: u32, ) { if let Some(prof) = profs.get(&plan_id) { + assume(prof.statistics.len() == std::mem::variant_count::()); for (_, desc) in get_statistics_desc().iter() { if desc.display_name != "output rows" { continue; diff --git a/src/query/service/src/physical_plans/format/physical_format.rs b/src/query/service/src/physical_plans/format/physical_format.rs index 0d96916898113..2e90cb905d9cd 100644 --- a/src/query/service/src/physical_plans/format/physical_format.rs +++ b/src/query/service/src/physical_plans/format/physical_format.rs @@ -13,6 +13,8 @@ // limitations under the License. use databend_common_ast::ast::FormatTreeNode; +use databend_common_base::hints::assume; +use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::profile::get_statistics_desc; use databend_common_exception::Result; @@ -27,6 +29,7 @@ pub trait PhysicalFormat { // explain analyze if let Some(prof) = ctx.profs.get(&self.get_meta().plan_id) { let mut children = Vec::with_capacity(format_node.children.len() + 10); + assume(prof.statistics.len() == std::mem::variant_count::()); for (_, desc) in get_statistics_desc().iter() { if prof.statistics[desc.index] != 0 { children.push(FormatTreeNode::new(format!( diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 9a2e5802f105f..4525eef3ca374 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -452,6 +452,7 @@ impl HashJoin { let joined_output = OutputPort::create(); let hash_join = TransformHashJoin::create( + self.get_id(), build_input.clone(), probe_input.clone(), joined_output.clone(), diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 43d4659187955..15d8f6f4bb569 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -26,6 +26,7 @@ use std::sync::atomic::Ordering; use std::time::SystemTime; use databend_common_base::base::WatchNotify; +use databend_common_base::hints::assume; use databend_common_base::runtime::ExecutorStats; use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::QueryTimeSeriesProfileBuilder; @@ -761,6 +762,10 @@ impl RunningGraph { match plans_profile.entry(*plan_id) { Entry::Occupied(mut v) => { let plan_profile = v.get_mut(); + assume( + plan_profile.statistics.len() + == std::mem::variant_count::(), + ); for index in 0..std::mem::variant_count::() { plan_profile.statistics[index] += profile.statistics[index].fetch_min(0, Ordering::SeqCst); @@ -769,6 +774,10 @@ impl RunningGraph { Entry::Vacant(v) => { let plan_profile = v.insert(PlanProfile::create(profile)); + assume( + plan_profile.statistics.len() + == std::mem::variant_count::(), + ); for index in 0..std::mem::variant_count::() { plan_profile.statistics[index] += profile.statistics[index].fetch_min(0, Ordering::SeqCst); diff --git a/src/query/service/src/pipelines/executor/executor_worker_context.rs b/src/query/service/src/pipelines/executor/executor_worker_context.rs index 17b6dd343d6e9..7677cd4031c93 100644 --- a/src/query/service/src/pipelines/executor/executor_worker_context.rs +++ b/src/query/service/src/pipelines/executor/executor_worker_context.rs @@ -60,6 +60,7 @@ pub struct CompletedAsyncTask { pub id: NodeIndex, pub worker_id: usize, pub res: Result<()>, + pub instant: Instant, pub graph: Arc, } @@ -75,6 +76,7 @@ impl CompletedAsyncTask { worker_id, res, graph, + instant: Instant::now(), } } } @@ -152,7 +154,18 @@ impl ExecutorWorkerContext { } } ExecutorTask::AsyncCompleted(task) => match task.res { - Ok(_) => Ok(Some((task.id, task.graph))), + Ok(_) => { + let payload = task.graph.get_node_tracking_payload(task.id); + let _guard = ThreadTracker::tracking(payload.clone()); + let nanos = task.instant.elapsed().as_nanos(); + assume(nanos < 18446744073709551615_u128); + Profile::record_usize_profile( + ProfileStatisticsName::ScheduleTime, + nanos as usize, + ); + + Ok(Some((task.id, task.graph))) + } Err(cause) => Err(Box::new(NodeErrorType::AsyncProcessError(cause))), }, } diff --git a/src/query/service/src/pipelines/executor/processor_async_task.rs b/src/query/service/src/pipelines/executor/processor_async_task.rs index d040a62e58044..692f666c75d1e 100644 --- a/src/query/service/src/pipelines/executor/processor_async_task.rs +++ b/src/query/service/src/pipelines/executor/processor_async_task.rs @@ -90,6 +90,7 @@ pub struct ProcessorAsyncTask { workers_condvar: Arc, instant: Instant, last_nanos: usize, + first_poll: bool, graph: Arc, inner: BoxFuture<'static, Result<()>>, } @@ -157,6 +158,7 @@ impl ProcessorAsyncTask { instant, graph, inner: inner.boxed(), + first_poll: true, } } } @@ -171,11 +173,17 @@ impl Future for ProcessorAsyncTask { let last_nanos = self.last_nanos; let last_instant = self.instant; + let is_first_poll = std::mem::take(&mut self.first_poll); let inner = self.inner.as_mut(); let before_poll_nanos = elapsed_nanos(last_instant); let wait_nanos = before_poll_nanos - last_nanos; - Profile::record_usize_profile(ProfileStatisticsName::WaitTime, wait_nanos); + + if is_first_poll { + Profile::record_usize_profile(ProfileStatisticsName::ScheduleTime, wait_nanos); + } else { + Profile::record_usize_profile(ProfileStatisticsName::WaitTime, wait_nanos); + } let poll_res = catch_unwind(move || inner.poll(cx)); diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index 76df5ca47280f..c1a23b265224b 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use std::time::Duration; use std::time::Instant; use databend_common_exception::Result; @@ -45,11 +46,13 @@ pub struct TransformHashJoin { projection: ColumnSet, rf_desc: Arc, runtime_filter_builder: Option, + plan_id: u32, instant: Instant, } impl TransformHashJoin { pub fn create( + plan_id: u32, build_port: Arc, probe_port: Arc, joined_port: Arc, @@ -72,6 +75,7 @@ impl TransformHashJoin { joined_port, join, rf_desc, + plan_id, projection, stage_sync_barrier, joined_data: None, @@ -158,6 +162,7 @@ impl Processor for TransformHashJoin { Ok(()) } Stage::Probe(state) => { + let instant = Instant::now(); if let Some(probe_data) = state.input_data.take() { let stream = self.join.probe_block(probe_data)?; // This is safe because both join and stream are properties of the struct. @@ -171,6 +176,8 @@ impl Processor for TransformHashJoin { } } + state.cpu_time += instant.elapsed().as_nanos() as u64; + Ok(()) } Stage::ProbeFinal(state) => { @@ -222,7 +229,8 @@ impl Processor for TransformHashJoin { let wait_rf_elapsed = self.instant.elapsed() - before_wait; log::info!( - "HashJoin build stage, sync work elapsed: {:?}, build rf elapsed: {:?}, wait other node rf elapsed: {:?}", + "HashJoin({}) build stage, sync work elapsed: {:?}, build rf elapsed: {:?}, wait other node rf elapsed: {:?}", + self.plan_id, elapsed, rf_build_elapsed, wait_rf_elapsed @@ -234,7 +242,8 @@ impl Processor for TransformHashJoin { Stage::BuildFinal(_) => { let wait_elapsed = self.instant.elapsed() - elapsed; log::info!( - "HashJoin build final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + "HashJoin({}) build final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + self.plan_id, elapsed, wait_elapsed ); @@ -242,12 +251,15 @@ impl Processor for TransformHashJoin { self.instant = Instant::now(); Stage::Probe(ProbeState::new()) } - Stage::Probe(_) => { + Stage::Probe(state) => { let wait_elapsed = self.instant.elapsed() - elapsed; log::info!( - "HashJoin probe stage, sync work elapsed: {:?}, wait elapsed: {:?}", + "HashJoin({}) probe stage, sync work elapsed: {:?}, wait elapsed: {:?}, cpu time: {:?}, probe rows: {}", + self.plan_id, elapsed, - wait_elapsed + wait_elapsed, + Duration::from_nanos(state.cpu_time), + state.probe_rows ); self.instant = Instant::now(); @@ -257,7 +269,8 @@ impl Processor for TransformHashJoin { true => { let wait_elapsed = self.instant.elapsed() - elapsed; log::info!( - "HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + "HashJoin({}) probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + self.plan_id, elapsed, wait_elapsed ); @@ -268,7 +281,8 @@ impl Processor for TransformHashJoin { false => { let wait_elapsed = self.instant.elapsed() - elapsed; log::info!( - "HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + "HashJoin({}) probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}", + self.plan_id, elapsed, wait_elapsed ); @@ -345,8 +359,10 @@ impl BuildFinalState { } struct ProbeState { + probe_rows: usize, input_data: Option, stream: Option>, + cpu_time: u64, } impl Debug for ProbeState { @@ -358,8 +374,10 @@ impl Debug for ProbeState { impl ProbeState { pub fn new() -> ProbeState { ProbeState { + probe_rows: 0, input_data: None, stream: None, + cpu_time: 0, } } @@ -369,7 +387,9 @@ impl ProbeState { } if input.has_data() { - self.input_data = Some(input.pull_data().unwrap()?); + let data_block = input.pull_data().unwrap()?; + self.probe_rows += data_block.num_rows(); + self.input_data = Some(data_block); return Ok(Event::Sync); } diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_wait.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_wait.rs index 40d4bfeb464c1..fb388b6919ec2 100644 --- a/src/query/storages/fuse/src/operations/read/runtime_filter_wait.rs +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_wait.rs @@ -15,6 +15,7 @@ use std::any::Any; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use databend_common_base::base::tokio::time::timeout; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; @@ -96,6 +97,7 @@ impl Processor for TransformRuntimeFilterWait { #[async_backtrace::framed] async fn async_process(&mut self) -> Result<()> { + let instant = Instant::now(); if self.runtime_filter_ready.is_empty() { self.runtime_filter_ready = self.ctx.get_runtime_filter_ready(self.scan_id); } @@ -135,6 +137,13 @@ impl Processor for TransformRuntimeFilterWait { self.runtime_filter_ready.clear(); self.wait_finished = true; + + log::info!( + "RUNTIME-FILTER: scan_id={} ready_count={} elapsed={:?}", + self.scan_id, + self.runtime_filter_ready.len(), + instant.elapsed() + ); Ok(()) } }