From 8e15c234c60cf8132c490ccf03dd31738cfeaca8 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Thu, 6 Jun 2024 04:08:46 -0400 Subject: [PATCH] metrics: add `MetricAtomicUsize` for usized-metrics (#6598) --- tokio/src/runtime/blocking/pool.rs | 21 +++++++++++---------- tokio/src/runtime/metrics/worker.rs | 23 ++++------------------- tokio/src/util/metric_atomics.rs | 28 ++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 3757079f329..2b283a56bba 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -7,11 +7,12 @@ use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use crate::util::metric_atomics::MetricAtomicUsize; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::Ordering; use std::time::Duration; pub(crate) struct BlockingPool { @@ -26,9 +27,9 @@ pub(crate) struct Spawner { #[derive(Default)] pub(crate) struct SpawnerMetrics { - num_threads: AtomicUsize, - num_idle_threads: AtomicUsize, - queue_depth: AtomicUsize, + num_threads: MetricAtomicUsize, + num_idle_threads: MetricAtomicUsize, + queue_depth: MetricAtomicUsize, } impl SpawnerMetrics { @@ -47,27 +48,27 @@ impl SpawnerMetrics { } fn inc_num_threads(&self) { - self.num_threads.fetch_add(1, Ordering::Relaxed); + self.num_threads.increment(); } fn dec_num_threads(&self) { - self.num_threads.fetch_sub(1, Ordering::Relaxed); + self.num_threads.decrement(); } fn inc_num_idle_threads(&self) { - self.num_idle_threads.fetch_add(1, Ordering::Relaxed); + self.num_idle_threads.increment(); } fn dec_num_idle_threads(&self) -> usize { - self.num_idle_threads.fetch_sub(1, Ordering::Relaxed) + self.num_idle_threads.decrement() } fn inc_queue_depth(&self) { - self.queue_depth.fetch_add(1, Ordering::Relaxed); + self.queue_depth.increment(); } fn dec_queue_depth(&self) { - self.queue_depth.fetch_sub(1, Ordering::Relaxed); + self.queue_depth.decrement(); } } diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index a396bf5a391..e5d2c6f17fd 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -1,9 +1,6 @@ use crate::runtime::metrics::Histogram; use crate::runtime::Config; -use crate::util::metric_atomics::MetricAtomicU64; -// This is NOT the Loom atomic. To avoid an unnecessary state explosion in loom, -// all metrics use regular atomics. -use std::sync::atomic::AtomicUsize; +use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize}; use std::sync::atomic::Ordering::Relaxed; /// Retrieve runtime worker metrics. @@ -13,7 +10,7 @@ use std::sync::atomic::Ordering::Relaxed; /// features][unstable] for details. /// /// [unstable]: crate#unstable-features -#[derive(Debug)] +#[derive(Debug, Default)] #[repr(align(128))] pub(crate) struct WorkerMetrics { /// Number of times the worker parked. @@ -45,7 +42,7 @@ pub(crate) struct WorkerMetrics { /// Number of tasks currently in the local queue. Used only by the /// current-thread scheduler. - pub(crate) queue_depth: AtomicUsize, + pub(crate) queue_depth: MetricAtomicUsize, /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, @@ -62,19 +59,7 @@ impl WorkerMetrics { } pub(crate) fn new() -> WorkerMetrics { - WorkerMetrics { - park_count: MetricAtomicU64::new(0), - noop_count: MetricAtomicU64::new(0), - steal_count: MetricAtomicU64::new(0), - steal_operations: MetricAtomicU64::new(0), - poll_count: MetricAtomicU64::new(0), - mean_poll_time: MetricAtomicU64::new(0), - overflow_count: MetricAtomicU64::new(0), - busy_duration_total: MetricAtomicU64::new(0), - local_schedule_count: MetricAtomicU64::new(0), - queue_depth: AtomicUsize::new(0), - poll_count_histogram: None, - } + WorkerMetrics::default() } pub(crate) fn queue_depth(&self) -> usize { diff --git a/tokio/src/util/metric_atomics.rs b/tokio/src/util/metric_atomics.rs index 3c080298ecf..332d2f268e1 100644 --- a/tokio/src/util/metric_atomics.rs +++ b/tokio/src/util/metric_atomics.rs @@ -45,3 +45,31 @@ impl MetricAtomicU64 { pub(crate) fn new(_value: u64) -> Self { Self { } } } } + +#[cfg_attr(not(all(tokio_unstable, feature = "rt")), allow(dead_code))] +/// `AtomicUsize` for use in metrics. +/// +/// This exposes simplified APIs for use in metrics & uses `std::sync` instead of Loom to avoid polluting loom logs with metric information. +#[derive(Debug, Default)] +pub(crate) struct MetricAtomicUsize { + value: std::sync::atomic::AtomicUsize, +} + +#[cfg_attr(not(all(tokio_unstable, feature = "rt")), allow(dead_code))] +impl MetricAtomicUsize { + pub(crate) fn load(&self, ordering: Ordering) -> usize { + self.value.load(ordering) + } + + pub(crate) fn store(&self, val: usize, ordering: Ordering) { + self.value.store(val, ordering) + } + + pub(crate) fn increment(&self) -> usize { + self.value.fetch_add(1, Ordering::Relaxed) + } + + pub(crate) fn decrement(&self) -> usize { + self.value.fetch_sub(1, Ordering::Relaxed) + } +}