Skip to content

Commit

Permalink
metrics: add worker_park_unpark_count (#6696)
Browse files Browse the repository at this point in the history
  • Loading branch information
surban committed Jul 23, 2024
1 parent 6e845b7 commit b69f16a
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 0 deletions.
13 changes: 13 additions & 0 deletions tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub(crate) struct MetricsBatch {
/// Number of times the worker parked.
park_count: u64,

/// Number of times the worker parked and unparked.
park_unpark_count: u64,

/// Number of times the worker woke w/o doing work.
noop_count: u64,

Expand Down Expand Up @@ -54,6 +57,7 @@ impl MetricsBatch {

MetricsBatch {
park_count: 0,
park_unpark_count: 0,
noop_count: 0,
steal_count: 0,
steal_operations: 0,
Expand All @@ -76,6 +80,9 @@ impl MetricsBatch {
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
worker.mean_poll_time.store(mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker
.park_unpark_count
.store(self.park_unpark_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker
Expand All @@ -101,6 +108,7 @@ impl MetricsBatch {
/// The worker is about to park.
pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
self.park_unpark_count += 1;

if self.poll_count_on_last_park == self.poll_count {
self.noop_count += 1;
Expand All @@ -109,6 +117,11 @@ impl MetricsBatch {
}
}

/// The worker was unparked.
pub(crate) fn unparked(&mut self) {
self.park_unpark_count += 1;
}

/// Start processing a batch of tasks
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.processing_scheduled_tasks_started_at = Instant::now();
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl MetricsBatch {

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
Expand Down
55 changes: 55 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,61 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the total number of times the given worker thread has parked
/// and unparked.
///
/// The worker park/unpark count starts at zero when the runtime is created
/// and increases by one each time the worker parks the thread waiting for
/// new inbound events to process. This usually means the worker has processed
/// all pending work and is currently idle. When new work becomes available,
/// the worker is unparked and the park/unpark count is again increased by one.
///
/// An odd count means that the worker is currently parked.
/// An even count means that the worker is currently active.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
/// let n = metrics.worker_park_unpark_count(0);
///
/// println!("worker 0 parked and unparked {} times", n);
///
/// if n % 2 == 0 {
/// println!("worker 0 is active");
/// } else {
/// println!("worker 0 is parked");
/// }
/// }
/// ```
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.park_unpark_count
.load(Relaxed)
}


/// Returns the number of times the given worker thread unparked but
/// performed no work before parking again.
///
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub(crate) struct WorkerMetrics {
/// Number of times the worker parked.
pub(crate) park_count: MetricAtomicU64,

/// Number of times the worker parked and unparked.
pub(crate) park_unpark_count: MetricAtomicU64,

/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: MetricAtomicU64,

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ impl Context {
});

core = c;

core.metrics.unparked();
core.submit_metrics(handle);
}

if let Some(f) = &handle.shared.config.after_unpark {
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl Stats {
self.batch.about_to_park();
}

pub(crate) fn unparked(&mut self) {
self.batch.unparked();
}

pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,13 @@ impl Context {
if core.transition_to_parked(&self.worker) {
while !core.is_shutdown && !core.is_traced {
core.stats.about_to_park();
core.stats
.submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);

core = self.park_timeout(core, None);

core.stats.unparked();

// Run regularly scheduled maintenance
core.maintenance(&self.worker);

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl Stats {
self.batch.about_to_park();
}

pub(crate) fn unparked(&mut self) {
self.batch.unparked();
}

pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,9 @@ impl Worker {
let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);

core.stats.unparked();
self.flush_metrics(cx, &mut core);

Ok((maybe_task, core))
}

Expand Down
38 changes: 38 additions & 0 deletions tokio/tests/rt_unstable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,44 @@ fn worker_park_count() {
assert!(1 <= metrics.worker_park_count(1));
}

#[test]
fn worker_park_unpark_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(rt.spawn(async {})).unwrap();
drop(rt);
assert!(2 <= metrics.worker_park_unpark_count(0));

let rt = threaded();
let metrics = rt.metrics();

// Wait for workers to be parked after runtime startup.
for _ in 0..100 {
if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert_eq!(1, metrics.worker_park_unpark_count(0));
assert_eq!(1, metrics.worker_park_unpark_count(1));

// Spawn a task to unpark and then park a worker.
rt.block_on(rt.spawn(async {})).unwrap();
for _ in 0..100 {
if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));

// Both threads unpark for runtime shutdown.
drop(rt);
assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
}

#[test]
fn worker_noop_count() {
// There isn't really a great way to generate no-op parks as they happen as
Expand Down

0 comments on commit b69f16a

Please sign in to comment.