Skip to content

Commit

Permalink
metrics: use MetricAtomic* for task counters (#6624)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Jun 17, 2024
1 parent 3bf4f93 commit 9a75d6f
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 314 deletions.
566 changes: 283 additions & 283 deletions tokio/src/runtime/metrics/runtime.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,10 @@ cfg_unstable_metrics! {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ cfg_rt! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};

impl Handle {
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
}
}

pub(crate) fn num_blocking_threads(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
}
Expand All @@ -191,10 +197,6 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}
Expand Down
10 changes: 6 additions & 4 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ impl Handle {
}

cfg_unstable_metrics! {
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}

pub(crate) fn num_blocking_threads(&self) -> usize {
// workers are currently spawned using spawn_blocking
self.blocking_spawner
Expand All @@ -25,10 +31,6 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ impl<S: 'static> OwnedTasks<S> {
self.list.len()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.list.added()
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.list.added()
}
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
Expand Down
10 changes: 8 additions & 2 deletions tokio/src/util/metric_atomics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicUsize, Ordering};

cfg_64bit_metrics! {
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -52,11 +52,17 @@ impl MetricAtomicU64 {
/// This exposes simplified APIs for use in metrics & uses `std::sync` instead of Loom to avoid polluting loom logs with metric information.
#[derive(Debug, Default)]
pub(crate) struct MetricAtomicUsize {
value: std::sync::atomic::AtomicUsize,
value: AtomicUsize,
}

#[cfg_attr(not(all(tokio_unstable, feature = "rt")), allow(dead_code))]
impl MetricAtomicUsize {
pub(crate) fn new(value: usize) -> Self {
Self {
value: AtomicUsize::new(value),
}
}

pub(crate) fn load(&self, ordering: Ordering) -> usize {
self.value.load(ordering)
}
Expand Down
31 changes: 16 additions & 15 deletions tokio/src/util/sharded_list.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::ptr::NonNull;
use std::sync::atomic::Ordering;

use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::{Mutex, MutexGuard};
use std::sync::atomic::AtomicUsize;
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};

use super::linked_list::{Link, LinkedList};

Expand All @@ -15,8 +14,8 @@ use super::linked_list::{Link, LinkedList};
/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
pub(crate) struct ShardedList<L, T> {
lists: Box<[Mutex<LinkedList<L, T>>]>,
added: AtomicU64,
count: AtomicUsize,
added: MetricAtomicU64,
count: MetricAtomicUsize,
shard_mask: usize,
}

Expand Down Expand Up @@ -44,8 +43,8 @@ impl<L, T> ShardedList<L, T> {
}
Self {
lists: lists.into_boxed_slice(),
added: AtomicU64::new(0),
count: AtomicUsize::new(0),
added: MetricAtomicU64::new(0),
count: MetricAtomicUsize::new(0),
shard_mask,
}
}
Expand All @@ -54,8 +53,8 @@ impl<L, T> ShardedList<L, T> {
/// Used to get the lock of shard.
pub(crate) struct ShardGuard<'a, L, T> {
lock: MutexGuard<'a, LinkedList<L, T>>,
added: &'a AtomicU64,
count: &'a AtomicUsize,
added: &'a MetricAtomicU64,
count: &'a MetricAtomicUsize,
id: usize,
}

Expand All @@ -66,7 +65,7 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
let mut lock = self.shard_inner(shard_id);
let node = lock.pop_back();
if node.is_some() {
self.count.fetch_sub(1, Ordering::Relaxed);
self.count.decrement();
}
node
}
Expand All @@ -86,7 +85,7 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
// to be in any other list of the same sharded list.
let node = unsafe { lock.remove(node) };
if node.is_some() {
self.count.fetch_sub(1, Ordering::Relaxed);
self.count.decrement();
}
node
}
Expand All @@ -107,9 +106,11 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
self.count.load(Ordering::Relaxed)
}

/// Gets the total number of elements added to this list.
pub(crate) fn added(&self) -> u64 {
self.added.load(Ordering::Relaxed)
cfg_64bit_metrics! {
/// Gets the total number of elements added to this list.
pub(crate) fn added(&self) -> u64 {
self.added.load(Ordering::Relaxed)
}
}

/// Returns whether the linked list does not contain any node.
Expand Down Expand Up @@ -137,8 +138,8 @@ impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
assert_eq!(id, self.id);
self.lock.push_front(val);
self.added.fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
self.added.add(1, Ordering::Relaxed);
self.count.increment();
}
}

Expand Down

0 comments on commit 9a75d6f

Please sign in to comment.