Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/common/base/src/runtime/profile/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub enum ProfileStatisticsName {
MemoryUsage,
ExternalServerRetryCount,
ExternalServerRequestCount,
ScheduleTime,
}

#[derive(Clone, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Debug)]
Expand Down Expand Up @@ -368,6 +369,13 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
unit: StatisticsUnit::Count,
plain_statistics: true,
}),
(ProfileStatisticsName::ScheduleTime, ProfileDesc {
display_name: "schedule time",
desc: "The time spent to wait in nanoseconds, usually used to measure the time spent on waiting for Executor",
index: ProfileStatisticsName::ScheduleTime as usize,
unit: StatisticsUnit::NanoSeconds,
plain_statistics: false,
}),
]))
}).clone()
}
8 changes: 6 additions & 2 deletions src/query/pipeline/src/core/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::btree_map::Entry;
use std::sync::Arc;
use std::sync::atomic::Ordering;

use databend_common_base::hints::assume;
use databend_common_base::runtime::error_info::NodeErrorType;
use databend_common_base::runtime::metrics::MetricSample;
use databend_common_base::runtime::metrics::ScopedRegistry;
Expand Down Expand Up @@ -76,7 +77,7 @@ pub struct PlanProfile {
pub title: Arc<String>,
pub labels: Arc<Vec<ProfileLabel>>,

pub statistics: [usize; std::mem::variant_count::<ProfileStatisticsName>()],
pub statistics: Vec<usize>,
#[serde(skip_serializing_if = "std::collections::BTreeMap::is_empty")]
#[serde(default)]
pub metrics: BTreeMap<String, Vec<MetricSample>>,
Expand Down Expand Up @@ -114,12 +115,13 @@ impl PlanProfile {
title: profile.title.clone(),
labels: profile.labels.clone(),
metrics: BTreeMap::new(),
statistics: std::array::from_fn(|_| 0),
statistics: vec![0; std::mem::variant_count::<ProfileStatisticsName>()],
errors: Self::get_profile_error(profile),
}
}

pub fn accumulate(&mut self, profile: &Profile) {
assume(self.statistics.len() == std::mem::variant_count::<ProfileStatisticsName>());
for index in 0..std::mem::variant_count::<ProfileStatisticsName>() {
self.statistics[index] += profile.statistics[index].load(Ordering::SeqCst);
}
Expand All @@ -132,6 +134,8 @@ impl PlanProfile {
self.parent_id = profile.parent_id;
}

assume(self.statistics.len() == std::mem::variant_count::<ProfileStatisticsName>());
assume(profile.statistics.len() == std::mem::variant_count::<ProfileStatisticsName>());
for index in 0..std::mem::variant_count::<ProfileStatisticsName>() {
self.statistics[index] += profile.statistics[index];
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/physical_plans/format/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::HashMap;

use databend_common_ast::ast::FormatTreeNode;
use databend_common_base::base::format_byte_size;
use databend_common_base::hints::assume;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::profile::get_statistics_desc;
use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReport;
Expand Down Expand Up @@ -212,6 +214,7 @@ pub fn append_output_rows_info(
plan_id: u32,
) {
if let Some(prof) = profs.get(&plan_id) {
assume(prof.statistics.len() == std::mem::variant_count::<ProfileStatisticsName>());
for (_, desc) in get_statistics_desc().iter() {
if desc.display_name != "output rows" {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use databend_common_ast::ast::FormatTreeNode;
use databend_common_base::hints::assume;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::profile::get_statistics_desc;
use databend_common_exception::Result;

Expand All @@ -27,6 +29,7 @@ pub trait PhysicalFormat {
// explain analyze
if let Some(prof) = ctx.profs.get(&self.get_meta().plan_id) {
let mut children = Vec::with_capacity(format_node.children.len() + 10);
assume(prof.statistics.len() == std::mem::variant_count::<ProfileStatisticsName>());
for (_, desc) in get_statistics_desc().iter() {
if prof.statistics[desc.index] != 0 {
children.push(FormatTreeNode::new(format!(
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ impl HashJoin {
let joined_output = OutputPort::create();

let hash_join = TransformHashJoin::create(
self.get_id(),
build_input.clone(),
probe_input.clone(),
joined_output.clone(),
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::atomic::Ordering;
use std::time::SystemTime;

use databend_common_base::base::WatchNotify;
use databend_common_base::hints::assume;
use databend_common_base::runtime::ExecutorStats;
use databend_common_base::runtime::ExecutorStatsSnapshot;
use databend_common_base::runtime::QueryTimeSeriesProfileBuilder;
Expand Down Expand Up @@ -761,6 +762,10 @@ impl RunningGraph {
match plans_profile.entry(*plan_id) {
Entry::Occupied(mut v) => {
let plan_profile = v.get_mut();
assume(
plan_profile.statistics.len()
== std::mem::variant_count::<ProfileStatisticsName>(),
);
for index in 0..std::mem::variant_count::<ProfileStatisticsName>() {
plan_profile.statistics[index] +=
profile.statistics[index].fetch_min(0, Ordering::SeqCst);
Expand All @@ -769,6 +774,10 @@ impl RunningGraph {
Entry::Vacant(v) => {
let plan_profile = v.insert(PlanProfile::create(profile));

assume(
plan_profile.statistics.len()
== std::mem::variant_count::<ProfileStatisticsName>(),
);
for index in 0..std::mem::variant_count::<ProfileStatisticsName>() {
plan_profile.statistics[index] +=
profile.statistics[index].fetch_min(0, Ordering::SeqCst);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct CompletedAsyncTask {
pub id: NodeIndex,
pub worker_id: usize,
pub res: Result<()>,
pub instant: Instant,
pub graph: Arc<RunningGraph>,
}

Expand All @@ -75,6 +76,7 @@ impl CompletedAsyncTask {
worker_id,
res,
graph,
instant: Instant::now(),
}
}
}
Expand Down Expand Up @@ -152,7 +154,18 @@ impl ExecutorWorkerContext {
}
}
ExecutorTask::AsyncCompleted(task) => match task.res {
Ok(_) => Ok(Some((task.id, task.graph))),
Ok(_) => {
let payload = task.graph.get_node_tracking_payload(task.id);
let _guard = ThreadTracker::tracking(payload.clone());
let nanos = task.instant.elapsed().as_nanos();
assume(nanos < 18446744073709551615_u128);
Profile::record_usize_profile(
ProfileStatisticsName::ScheduleTime,
nanos as usize,
);

Ok(Some((task.id, task.graph)))
}
Err(cause) => Err(Box::new(NodeErrorType::AsyncProcessError(cause))),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub struct ProcessorAsyncTask {
workers_condvar: Arc<WorkersCondvar>,
instant: Instant,
last_nanos: usize,
first_poll: bool,
graph: Arc<RunningGraph>,
inner: BoxFuture<'static, Result<()>>,
}
Expand Down Expand Up @@ -157,6 +158,7 @@ impl ProcessorAsyncTask {
instant,
graph,
inner: inner.boxed(),
first_poll: true,
}
}
}
Expand All @@ -171,11 +173,17 @@ impl Future for ProcessorAsyncTask {

let last_nanos = self.last_nanos;
let last_instant = self.instant;
let is_first_poll = std::mem::take(&mut self.first_poll);
let inner = self.inner.as_mut();

let before_poll_nanos = elapsed_nanos(last_instant);
let wait_nanos = before_poll_nanos - last_nanos;
Profile::record_usize_profile(ProfileStatisticsName::WaitTime, wait_nanos);

if is_first_poll {
Profile::record_usize_profile(ProfileStatisticsName::ScheduleTime, wait_nanos);
} else {
Profile::record_usize_profile(ProfileStatisticsName::WaitTime, wait_nanos);
}

let poll_res = catch_unwind(move || inner.poll(cx));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_exception::Result;
Expand Down Expand Up @@ -45,11 +46,13 @@ pub struct TransformHashJoin {
projection: ColumnSet,
rf_desc: Arc<RuntimeFiltersDesc>,
runtime_filter_builder: Option<RuntimeFilterLocalBuilder>,
plan_id: u32,
instant: Instant,
}

impl TransformHashJoin {
pub fn create(
plan_id: u32,
build_port: Arc<InputPort>,
probe_port: Arc<InputPort>,
joined_port: Arc<OutputPort>,
Expand All @@ -72,6 +75,7 @@ impl TransformHashJoin {
joined_port,
join,
rf_desc,
plan_id,
projection,
stage_sync_barrier,
joined_data: None,
Expand Down Expand Up @@ -158,6 +162,7 @@ impl Processor for TransformHashJoin {
Ok(())
}
Stage::Probe(state) => {
let instant = Instant::now();
if let Some(probe_data) = state.input_data.take() {
let stream = self.join.probe_block(probe_data)?;
// This is safe because both join and stream are properties of the struct.
Expand All @@ -171,6 +176,8 @@ impl Processor for TransformHashJoin {
}
}

state.cpu_time += instant.elapsed().as_nanos() as u64;

Ok(())
}
Stage::ProbeFinal(state) => {
Expand Down Expand Up @@ -222,7 +229,8 @@ impl Processor for TransformHashJoin {
let wait_rf_elapsed = self.instant.elapsed() - before_wait;

log::info!(
"HashJoin build stage, sync work elapsed: {:?}, build rf elapsed: {:?}, wait other node rf elapsed: {:?}",
"HashJoin({}) build stage, sync work elapsed: {:?}, build rf elapsed: {:?}, wait other node rf elapsed: {:?}",
self.plan_id,
elapsed,
rf_build_elapsed,
wait_rf_elapsed
Expand All @@ -234,20 +242,24 @@ impl Processor for TransformHashJoin {
Stage::BuildFinal(_) => {
let wait_elapsed = self.instant.elapsed() - elapsed;
log::info!(
"HashJoin build final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
"HashJoin({}) build final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
self.plan_id,
elapsed,
wait_elapsed
);

self.instant = Instant::now();
Stage::Probe(ProbeState::new())
}
Stage::Probe(_) => {
Stage::Probe(state) => {
let wait_elapsed = self.instant.elapsed() - elapsed;
log::info!(
"HashJoin probe stage, sync work elapsed: {:?}, wait elapsed: {:?}",
"HashJoin({}) probe stage, sync work elapsed: {:?}, wait elapsed: {:?}, cpu time: {:?}, probe rows: {}",
self.plan_id,
elapsed,
wait_elapsed
wait_elapsed,
Duration::from_nanos(state.cpu_time),
state.probe_rows
);

self.instant = Instant::now();
Expand All @@ -257,7 +269,8 @@ impl Processor for TransformHashJoin {
true => {
let wait_elapsed = self.instant.elapsed() - elapsed;
log::info!(
"HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
"HashJoin({}) probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
self.plan_id,
elapsed,
wait_elapsed
);
Expand All @@ -268,7 +281,8 @@ impl Processor for TransformHashJoin {
false => {
let wait_elapsed = self.instant.elapsed() - elapsed;
log::info!(
"HashJoin probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
"HashJoin({}) probe final stage, sync work elapsed: {:?}, wait elapsed: {:?}",
self.plan_id,
elapsed,
wait_elapsed
);
Expand Down Expand Up @@ -345,8 +359,10 @@ impl BuildFinalState {
}

struct ProbeState {
probe_rows: usize,
input_data: Option<DataBlock>,
stream: Option<Box<dyn JoinStream>>,
cpu_time: u64,
}

impl Debug for ProbeState {
Expand All @@ -358,8 +374,10 @@ impl Debug for ProbeState {
impl ProbeState {
pub fn new() -> ProbeState {
ProbeState {
probe_rows: 0,
input_data: None,
stream: None,
cpu_time: 0,
}
}

Expand All @@ -369,7 +387,9 @@ impl ProbeState {
}

if input.has_data() {
self.input_data = Some(input.pull_data().unwrap()?);
let data_block = input.pull_data().unwrap()?;
self.probe_rows += data_block.num_rows();
self.input_data = Some(data_block);
return Ok(Event::Sync);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_base::base::tokio::time::timeout;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
Expand Down Expand Up @@ -96,6 +97,7 @@ impl Processor for TransformRuntimeFilterWait {

#[async_backtrace::framed]
async fn async_process(&mut self) -> Result<()> {
let instant = Instant::now();
if self.runtime_filter_ready.is_empty() {
self.runtime_filter_ready = self.ctx.get_runtime_filter_ready(self.scan_id);
}
Expand Down Expand Up @@ -135,6 +137,13 @@ impl Processor for TransformRuntimeFilterWait {

self.runtime_filter_ready.clear();
self.wait_finished = true;

log::info!(
"RUNTIME-FILTER: scan_id={} ready_count={} elapsed={:?}",
self.scan_id,
self.runtime_filter_ready.len(),
instant.elapsed()
);
Ok(())
}
}
Loading