From 3df9c4e840fb89b5101b78af053f8f5014522b5b Mon Sep 17 00:00:00 2001 From: Ryder Timberlake <14241275+yakryder@users.noreply.github.com> Date: Mon, 6 Oct 2025 08:51:56 -0400 Subject: [PATCH 1/7] benches: add benchmark demonstrating timer drop contention This benchmark demonstrates the mutex contention issue described in #6504, specifically focusing on the drop path for timers that are registered but never fire. The benchmark creates 10,000 sleep timers, polls each once to initialize and register it with the timer wheel, then drops them before they fire. This simulates the common case of timeouts that don't fire (e.g., operations that complete before their timeout). Baseline results show severe contention: the 8-worker case is only ~1.5x faster than single-threaded. Refs: #6504 --- benches/Cargo.toml | 5 ++ benches/time_drop_sleep_contention.rs | 104 ++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 benches/time_drop_sleep_contention.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 87163be153f..75dee668a33 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -96,5 +96,10 @@ name = "time_timeout" path = "time_timeout.rs" harness = false +[[bench]] +name = "time_drop_sleep_contention" +path = "time_drop_sleep_contention.rs" +harness = false + [lints] workspace = true diff --git a/benches/time_drop_sleep_contention.rs b/benches/time_drop_sleep_contention.rs new file mode 100644 index 00000000000..aba34ec6bec --- /dev/null +++ b/benches/time_drop_sleep_contention.rs @@ -0,0 +1,104 @@ +/// Benchmark demonstrating timer mutex contention on drop (Issue #6504) +/// +/// This benchmark creates many timers, polls them once to initialize and register +/// them with the timer wheel, then drops them before they fire. This is the common +/// case for timeouts that are set but don't fire. +/// +/// Each drop acquires the global timer mutex to deregister from the wheel, causing +/// severe contention under concurrent load. +/// +/// ## Baseline Results (Pre-Fix) +/// +/// ```text +/// timer_drop_single_thread_10k: 33.3 ms (32.7-34.0 ms) +/// timer_drop_multi_thread_10k_8workers: 21.6 ms (19.1-24.7 ms) +/// ``` +/// +/// **Analysis**: Multi-threaded (8 workers) is only 1.54x faster than single-threaded, +/// demonstrating severe mutex contention. + +use std::future::{poll_fn, Future}; +use std::pin::Pin; +use std::time::Instant; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use tokio::{ + runtime::Runtime, + time::{sleep, Duration}, +}; + +fn build_runtime(workers: usize) -> Runtime { + if workers == 1 { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } else { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap() + } +} + +async fn create_and_drop_timers(count: usize, workers: usize) { + let handles: Vec<_> = (0..workers) + .map(|_| { + tokio::spawn(async move { + for _ in 0..count / workers { + let mut sleep = Box::pin(sleep(Duration::from_secs(60))); + + // Poll once to initialize and register without awaiting + poll_fn(|cx| { + let _ = sleep.as_mut().poll(cx); + std::task::Poll::Ready(()) + }) + .await; + + black_box(drop(sleep)); + } + }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } +} + +fn timer_drop_contention_single_thread(c: &mut Criterion) { + let runtime = build_runtime(1); + + c.bench_function("timer_drop_single_thread_10k", |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(create_and_drop_timers(10_000 * iters as usize, 1)).await; + }); + start.elapsed() + }) + }); +} + +fn timer_drop_contention_multi_thread(c: &mut Criterion) { + let runtime = build_runtime(8); + + c.bench_function("timer_drop_multi_thread_10k_8workers", |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(create_and_drop_timers(10_000 * iters as usize, 8)).await; + }); + start.elapsed() + }) + }); +} + +criterion_group!( + timer_contention, + timer_drop_contention_single_thread, + timer_drop_contention_multi_thread +); + +criterion_main!(timer_contention); From 24a53ea336ae69cf99121deecc0e89586a59d6af Mon Sep 17 00:00:00 2001 From: Ryder Timberlake <14241275+yakryder@users.noreply.github.com> Date: Fri, 10 Oct 2025 01:39:01 -0400 Subject: [PATCH 2/7] runtime: reduce timer drop contention with per-worker timers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduces lock contention in timer operations by registering timers in a per-worker HashMap for the multi-threaded runtime, while falling back to the global timer wheel for current_thread runtime and block_in_place. Benchmark results (benches/time_drop_sleep_contention.rs): - Single-threaded: 33.3ms → 32.7ms (no regression) - Multi-threaded (8 workers): 21.6ms → 16.0ms (25.9% faster) Refs #6504 --- tokio/src/runtime/context.rs | 10 ++++ .../runtime/scheduler/multi_thread/worker.rs | 52 ++++++++++++++++++- tokio/src/runtime/time/entry.rs | 12 +++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 0d54f6ca5b2..4ac0e804f5e 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -190,6 +190,16 @@ cfg_rt! { .unwrap_or_else(|_| (f.take().unwrap())(None)) } + /// Attempts to register a timer with the current worker's local timer map. + /// Returns true if registered, false if no multi-threaded worker core available. + #[cfg(all(feature = "rt", feature = "rt-multi-thread"))] + pub(crate) fn try_register_timer(deadline: crate::time::Instant, waker: std::task::Waker) -> bool { + with_scheduler(|ctx| match ctx { + Some(scheduler::Context::MultiThread(ctx)) => ctx.register_timer(deadline.into(), waker), + _ => false, + }) + } + cfg_taskdump! { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 7ec3f126467..f945f6512b9 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -70,9 +70,10 @@ use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::RefCell; +use std::collections::HashMap; use std::task::Waker; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; mod metrics; @@ -139,6 +140,10 @@ struct Core { /// Fast random number generator. rand: FastRand, + + /// Per-worker timers: lock-free HashMap for timer registration + /// Maps deadline -> wakers to fire at that time + timers: HashMap>, } /// State shared across all workers @@ -267,6 +272,7 @@ pub(super) fn create( global_queue_interval: stats.tuned_global_queue_interval(&config), stats, rand: FastRand::from_seed(config.seed_generator.next_seed()), + timers: HashMap::new(), })); remotes.push(Remote { steal, unpark }); @@ -534,6 +540,9 @@ impl Context { // Run maintenance, if needed core = self.maintenance(core); + // Fire any expired timers (lock-free, per-worker) + core.fire_expired_timers(Instant::now()); + // First, check work available to the current worker. if let Some(task) = core.next_task(&self.worker) { core = self.run_task(task, core)?; @@ -793,6 +802,21 @@ impl Context { self.defer.defer(waker); } } + + /// Register a timer with the current worker's local timer HashMap. + /// + /// This is called from TimerEntry when a timer needs to be registered. + /// Returns true if successfully registered, false if no core is available + /// (e.g., during block_in_place). + pub(crate) fn register_timer(&self, deadline: Instant, waker: Waker) -> bool { + let mut core = self.core.borrow_mut(); + if let Some(core) = core.as_mut() { + core.register_timer(deadline, waker); + true + } else { + false + } + } } impl Core { @@ -1042,6 +1066,32 @@ impl Core { self.global_queue_interval = next; } } + + /// Fire all timers that have expired by the given instant. + /// + /// This scans the per-worker timer HashMap and wakes all tasks whose + /// deadlines have passed. Unlike the global timer wheel, this is lock-free + /// and doesn't require any synchronization. + fn fire_expired_timers(&mut self, now: Instant) { + self.timers.retain(|&deadline, wakers| { + (now < deadline) || { + wakers.drain(..).for_each(Waker::wake); + false + } + }); + } + + /// Register a timer waker at the given deadline. + /// + /// This is called from TimerEntry::poll_elapsed when a timer is registered. + /// The waker will be fired when fire_expired_timers() is called with a time + /// >= deadline. + pub(crate) fn register_timer(&mut self, deadline: Instant, waker: Waker) { + self.timers + .entry(deadline) + .or_insert_with(Vec::new) + .push(waker); + } } impl Worker { diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 627fcbc5ec3..bb2d15dd0d5 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -606,6 +606,18 @@ impl TimerEntry { ); if !self.registered { + #[cfg(feature = "rt-multi-thread")] + { + // Try worker-local registration first (lock-free on multi-threaded runtime) + if crate::runtime::context::try_register_timer(self.deadline, cx.waker().clone()) { + // Successfully registered with worker-local timers + let this = self.as_mut().project(); + *this.registered = true; + return Poll::Pending; + } + } + + // Fall back to global timer wheel (current_thread runtime or block_in_place) let deadline = self.deadline; self.as_mut().reset(deadline, true); } From 2f76baf79d666cbf9c84ceaddc70aac5255b6e79 Mon Sep 17 00:00:00 2001 From: Ryder Timberlake <14241275+yakryder@users.noreply.github.com> Date: Sat, 11 Oct 2025 15:55:49 -0400 Subject: [PATCH 3/7] benches: improve many timer benchmark Some of these improvements may not actually be better, but helped me rule things out as a newcomer (e.g. do not start timing until after tasks and sleeps are created). --- benches/time_drop_sleep_contention.rs | 109 +++++++++++++++----------- 1 file changed, 62 insertions(+), 47 deletions(-) diff --git a/benches/time_drop_sleep_contention.rs b/benches/time_drop_sleep_contention.rs index aba34ec6bec..c430e2566ca 100644 --- a/benches/time_drop_sleep_contention.rs +++ b/benches/time_drop_sleep_contention.rs @@ -1,31 +1,18 @@ -/// Benchmark demonstrating timer mutex contention on drop (Issue #6504) +/// Benchmark measuring timer lifecycle performance (Issue #6504) /// -/// This benchmark creates many timers, polls them once to initialize and register -/// them with the timer wheel, then drops them before they fire. This is the common -/// case for timeouts that are set but don't fire. +/// This benchmark creates many timers, polls them once to register with the timer +/// system, then drops them before they fire. This simulates the common case of +/// timeouts that don't fire (e.g., operations completing before timeout). /// -/// Each drop acquires the global timer mutex to deregister from the wheel, causing -/// severe contention under concurrent load. -/// -/// ## Baseline Results (Pre-Fix) -/// -/// ```text -/// timer_drop_single_thread_10k: 33.3 ms (32.7-34.0 ms) -/// timer_drop_multi_thread_10k_8workers: 21.6 ms (19.1-24.7 ms) -/// ``` -/// -/// **Analysis**: Multi-threaded (8 workers) is only 1.54x faster than single-threaded, -/// demonstrating severe mutex contention. - +/// The benchmark compares single-threaded vs multi-threaded performance to reveal +/// contention in timer registration and deregistration. use std::future::{poll_fn, Future}; -use std::pin::Pin; -use std::time::Instant; +use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use tokio::{ - runtime::Runtime, - time::{sleep, Duration}, -}; +use tokio::{runtime::Runtime, time::sleep}; + +const TIMER_COUNT: usize = 10_000; fn build_runtime(workers: usize) -> Runtime { if workers == 1 { @@ -42,63 +29,91 @@ fn build_runtime(workers: usize) -> Runtime { } } -async fn create_and_drop_timers(count: usize, workers: usize) { +/// Returns (wall_clock_duration, per_task_durations) +async fn create_and_drop_timers_instrumented(count: usize, workers: usize) -> (Duration, Vec) { let handles: Vec<_> = (0..workers) .map(|_| { tokio::spawn(async move { + // Create all sleep futures + let mut sleeps = Vec::with_capacity(count / workers); for _ in 0..count / workers { - let mut sleep = Box::pin(sleep(Duration::from_secs(60))); + sleeps.push(Box::pin(sleep(Duration::from_secs(60)))); + } - // Poll once to initialize and register without awaiting + // Start timing - poll and drop (METERED) + let start = Instant::now(); + for mut sleep in sleeps { + // Poll once to register poll_fn(|cx| { let _ = sleep.as_mut().poll(cx); std::task::Poll::Ready(()) }) .await; + // Drop to deregister black_box(drop(sleep)); } + let elapsed = start.elapsed(); + + elapsed }) }) .collect(); + let wall_clock_start = Instant::now(); + + let mut task_durations = Vec::with_capacity(workers); for handle in handles { - handle.await.unwrap(); + task_durations.push(handle.await.unwrap()); } + + let wall_clock = wall_clock_start.elapsed(); + + (wall_clock, task_durations) } -fn timer_drop_contention_single_thread(c: &mut Criterion) { - let runtime = build_runtime(1); +fn bench_many_timers(c: &mut Criterion) { + let mut group = c.benchmark_group("many_timers"); - c.bench_function("timer_drop_single_thread_10k", |b| { + // Single-threaded baseline + let runtime = build_runtime(1); + group.bench_function("single_thread", |b| { b.iter_custom(|iters| { - let start = Instant::now(); - runtime.block_on(async { - black_box(create_and_drop_timers(10_000 * iters as usize, 1)).await; + let (wall_clock, _task_durations) = runtime.block_on(async { + create_and_drop_timers_instrumented(TIMER_COUNT * iters as usize, 1).await }); - start.elapsed() + + wall_clock }) }); -} -fn timer_drop_contention_multi_thread(c: &mut Criterion) { - let runtime = build_runtime(8); - - c.bench_function("timer_drop_multi_thread_10k_8workers", |b| { + // Multi-threaded with 8 workers + let runtime_multi = build_runtime(8); + group.bench_function("multi_thread", |b| { b.iter_custom(|iters| { - let start = Instant::now(); - runtime.block_on(async { - black_box(create_and_drop_timers(10_000 * iters as usize, 8)).await; + let (wall_clock, task_durations) = runtime_multi.block_on(async { + create_and_drop_timers_instrumented(TIMER_COUNT * iters as usize, 8).await }); - start.elapsed() + + // Print variance stats to stderr + let min = task_durations.iter().min().unwrap(); + let max = task_durations.iter().max().unwrap(); + let range = max.saturating_sub(*min); + eprintln!( + "multi_thread: wall={:?}, min={:?}, max={:?}, range={:?}", + wall_clock, min, max, range + ); + + wall_clock }) }); + + group.finish(); } criterion_group!( - timer_contention, - timer_drop_contention_single_thread, - timer_drop_contention_multi_thread + many_timers, + bench_many_timers ); -criterion_main!(timer_contention); +criterion_main!(many_timers); From c5315d38879fe7f342dc1c17a735484581e1ee6a Mon Sep 17 00:00:00 2001 From: Ryder Timberlake <14241275+yakryder@users.noreply.github.com> Date: Sat, 11 Oct 2025 17:25:40 -0400 Subject: [PATCH 4/7] benches: rename many timer benchmark --- benches/Cargo.toml | 4 ++-- .../{time_drop_sleep_contention.rs => time_many_timers.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename benches/{time_drop_sleep_contention.rs => time_many_timers.rs} (100%) diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 75dee668a33..2b70cd0d7f5 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -97,8 +97,8 @@ path = "time_timeout.rs" harness = false [[bench]] -name = "time_drop_sleep_contention" -path = "time_drop_sleep_contention.rs" +name = "time_many_timers" +path = "time_many_timers.rs" harness = false [lints] diff --git a/benches/time_drop_sleep_contention.rs b/benches/time_many_timers.rs similarity index 100% rename from benches/time_drop_sleep_contention.rs rename to benches/time_many_timers.rs From 3ebc1bc795c464f63812e27e68c8a497ff0e49bb Mon Sep 17 00:00:00 2001 From: Ryder Timberlake <14241275+yakryder@users.noreply.github.com> Date: Mon, 20 Oct 2025 03:40:19 -0400 Subject: [PATCH 5/7] benches: do housekeeping Because of my ecosystem ignorance I was doing absurd numbers of iterations for first few rounds. Bringing the iteration count down enabled getting quick feedback from a much larger number of timers. --- benches/time_many_timers.rs | 75 +++++++++++++------------------------ 1 file changed, 27 insertions(+), 48 deletions(-) diff --git a/benches/time_many_timers.rs b/benches/time_many_timers.rs index c430e2566ca..12153c9a372 100644 --- a/benches/time_many_timers.rs +++ b/benches/time_many_timers.rs @@ -10,33 +10,18 @@ use std::future::{poll_fn, Future}; use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use tokio::{runtime::Runtime, time::sleep}; - -const TIMER_COUNT: usize = 10_000; - -fn build_runtime(workers: usize) -> Runtime { - if workers == 1 { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - } else { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(workers) - .build() - .unwrap() - } -} +use tokio::time::sleep; -/// Returns (wall_clock_duration, per_task_durations) -async fn create_and_drop_timers_instrumented(count: usize, workers: usize) -> (Duration, Vec) { - let handles: Vec<_> = (0..workers) +const TIMER_COUNT: usize = 1_000_000; + +/// Returns (wall_clock_duration, task_durations) +async fn create_and_drop_timers_instrumented(count: usize, concurrent_tasks: usize) -> Duration { + let handles: Vec<_> = (0..concurrent_tasks) .map(|_| { tokio::spawn(async move { // Create all sleep futures - let mut sleeps = Vec::with_capacity(count / workers); - for _ in 0..count / workers { + let mut sleeps = Vec::with_capacity(count / concurrent_tasks); + for _ in 0..count / concurrent_tasks { sleeps.push(Box::pin(sleep(Duration::from_secs(60)))); } @@ -62,47 +47,44 @@ async fn create_and_drop_timers_instrumented(count: usize, workers: usize) -> (D let wall_clock_start = Instant::now(); - let mut task_durations = Vec::with_capacity(workers); + let mut task_durations = Vec::with_capacity(concurrent_tasks); for handle in handles { task_durations.push(handle.await.unwrap()); } let wall_clock = wall_clock_start.elapsed(); - (wall_clock, task_durations) + wall_clock } fn bench_many_timers(c: &mut Criterion) { let mut group = c.benchmark_group("many_timers"); // Single-threaded baseline - let runtime = build_runtime(1); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); group.bench_function("single_thread", |b| { - b.iter_custom(|iters| { - let (wall_clock, _task_durations) = runtime.block_on(async { - create_and_drop_timers_instrumented(TIMER_COUNT * iters as usize, 1).await - }); + b.iter_custom(|_iters| { + let wall_clock = runtime + .block_on(async { create_and_drop_timers_instrumented(TIMER_COUNT, 1).await }); wall_clock }) }); // Multi-threaded with 8 workers - let runtime_multi = build_runtime(8); + let runtime_multi = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(8) + .build() + .unwrap(); + group.bench_function("multi_thread", |b| { - b.iter_custom(|iters| { - let (wall_clock, task_durations) = runtime_multi.block_on(async { - create_and_drop_timers_instrumented(TIMER_COUNT * iters as usize, 8).await - }); - - // Print variance stats to stderr - let min = task_durations.iter().min().unwrap(); - let max = task_durations.iter().max().unwrap(); - let range = max.saturating_sub(*min); - eprintln!( - "multi_thread: wall={:?}, min={:?}, max={:?}, range={:?}", - wall_clock, min, max, range - ); + b.iter_custom(|_iters| { + let wall_clock = runtime_multi + .block_on(async { create_and_drop_timers_instrumented(TIMER_COUNT, 8).await }); wall_clock }) @@ -111,9 +93,6 @@ fn bench_many_timers(c: &mut Criterion) { group.finish(); } -criterion_group!( - many_timers, - bench_many_timers -); +criterion_group!(many_timers, bench_many_timers); criterion_main!(many_timers); From 54f89f946100f82d79bb7d3dbd20af4512a5eb8d Mon Sep 17 00:00:00 2001 From: Ryder Timberlake <14241275+yakryder@users.noreply.github.com> Date: Tue, 21 Oct 2025 02:35:38 -0400 Subject: [PATCH 6/7] Revert "runtime: reduce timer drop contention with per-worker timers" This reverts commit 24a53ea336ae69cf99121deecc0e89586a59d6af. --- tokio/src/runtime/context.rs | 10 ---- .../runtime/scheduler/multi_thread/worker.rs | 52 +------------------ tokio/src/runtime/time/entry.rs | 12 ----- 3 files changed, 1 insertion(+), 73 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 4ac0e804f5e..0d54f6ca5b2 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -190,16 +190,6 @@ cfg_rt! { .unwrap_or_else(|_| (f.take().unwrap())(None)) } - /// Attempts to register a timer with the current worker's local timer map. - /// Returns true if registered, false if no multi-threaded worker core available. - #[cfg(all(feature = "rt", feature = "rt-multi-thread"))] - pub(crate) fn try_register_timer(deadline: crate::time::Instant, waker: std::task::Waker) -> bool { - with_scheduler(|ctx| match ctx { - Some(scheduler::Context::MultiThread(ctx)) => ctx.register_timer(deadline.into(), waker), - _ => false, - }) - } - cfg_taskdump! { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index f945f6512b9..7ec3f126467 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -70,10 +70,9 @@ use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::RefCell; -use std::collections::HashMap; use std::task::Waker; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; mod metrics; @@ -140,10 +139,6 @@ struct Core { /// Fast random number generator. rand: FastRand, - - /// Per-worker timers: lock-free HashMap for timer registration - /// Maps deadline -> wakers to fire at that time - timers: HashMap>, } /// State shared across all workers @@ -272,7 +267,6 @@ pub(super) fn create( global_queue_interval: stats.tuned_global_queue_interval(&config), stats, rand: FastRand::from_seed(config.seed_generator.next_seed()), - timers: HashMap::new(), })); remotes.push(Remote { steal, unpark }); @@ -540,9 +534,6 @@ impl Context { // Run maintenance, if needed core = self.maintenance(core); - // Fire any expired timers (lock-free, per-worker) - core.fire_expired_timers(Instant::now()); - // First, check work available to the current worker. if let Some(task) = core.next_task(&self.worker) { core = self.run_task(task, core)?; @@ -802,21 +793,6 @@ impl Context { self.defer.defer(waker); } } - - /// Register a timer with the current worker's local timer HashMap. - /// - /// This is called from TimerEntry when a timer needs to be registered. - /// Returns true if successfully registered, false if no core is available - /// (e.g., during block_in_place). - pub(crate) fn register_timer(&self, deadline: Instant, waker: Waker) -> bool { - let mut core = self.core.borrow_mut(); - if let Some(core) = core.as_mut() { - core.register_timer(deadline, waker); - true - } else { - false - } - } } impl Core { @@ -1066,32 +1042,6 @@ impl Core { self.global_queue_interval = next; } } - - /// Fire all timers that have expired by the given instant. - /// - /// This scans the per-worker timer HashMap and wakes all tasks whose - /// deadlines have passed. Unlike the global timer wheel, this is lock-free - /// and doesn't require any synchronization. - fn fire_expired_timers(&mut self, now: Instant) { - self.timers.retain(|&deadline, wakers| { - (now < deadline) || { - wakers.drain(..).for_each(Waker::wake); - false - } - }); - } - - /// Register a timer waker at the given deadline. - /// - /// This is called from TimerEntry::poll_elapsed when a timer is registered. - /// The waker will be fired when fire_expired_timers() is called with a time - /// >= deadline. - pub(crate) fn register_timer(&mut self, deadline: Instant, waker: Waker) { - self.timers - .entry(deadline) - .or_insert_with(Vec::new) - .push(waker); - } } impl Worker { diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index bb2d15dd0d5..627fcbc5ec3 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -606,18 +606,6 @@ impl TimerEntry { ); if !self.registered { - #[cfg(feature = "rt-multi-thread")] - { - // Try worker-local registration first (lock-free on multi-threaded runtime) - if crate::runtime::context::try_register_timer(self.deadline, cx.waker().clone()) { - // Successfully registered with worker-local timers - let this = self.as_mut().project(); - *this.registered = true; - return Poll::Pending; - } - } - - // Fall back to global timer wheel (current_thread runtime or block_in_place) let deadline = self.deadline; self.as_mut().reset(deadline, true); } From 4fcc8ffe653c4eefd5813a129fc0621ce533e1ed Mon Sep 17 00:00:00 2001 From: Ryder Timberlake <14241275+yakryder@users.noreply.github.com> Date: Sat, 25 Oct 2025 07:30:58 -0400 Subject: [PATCH 7/7] time: add timer buckets to reduce lock contention Introduce GlobalTimerBuckets, a ring buffer of per-bucket locks for timers 0-120 seconds in the future. This reduces occurrence and impact of global lock contention for short-lived timers, the common case. Timers > 120s fall back to the existing timer wheel. When a timer is dropped, it must be removed from whichever storage it's in before the underlying memory is freed. Add try_remove() to safely remove from buckets, and update clear_entry() to call it. Performance: Preliminary results from a million concurrent timer benchmark show 84x improvement in multi-threaded runs and 25x over single-threaded. --- benches/time_many_timers.rs | 56 +++- tokio/src/runtime/time/entry.rs | 73 +++++- tokio/src/runtime/time/mod.rs | 142 ++++++++++- tokio/src/runtime/time/timer_buckets.rs | 325 ++++++++++++++++++++++++ tokio/src/runtime/time/wheel/mod.rs | 7 + 5 files changed, 571 insertions(+), 32 deletions(-) create mode 100644 tokio/src/runtime/time/timer_buckets.rs diff --git a/benches/time_many_timers.rs b/benches/time_many_timers.rs index 12153c9a372..d21c7fbdc73 100644 --- a/benches/time_many_timers.rs +++ b/benches/time_many_timers.rs @@ -1,3 +1,4 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; /// Benchmark measuring timer lifecycle performance (Issue #6504) /// /// This benchmark creates many timers, polls them once to register with the timer @@ -7,23 +8,55 @@ /// The benchmark compares single-threaded vs multi-threaded performance to reveal /// contention in timer registration and deregistration. use std::future::{poll_fn, Future}; +use std::iter::repeat; use std::time::{Duration, Instant}; - -use criterion::{black_box, criterion_group, criterion_main, Criterion}; use tokio::time::sleep; const TIMER_COUNT: usize = 1_000_000; -/// Returns (wall_clock_duration, task_durations) +struct TimerDistribution { + duration: Duration, + percentage: f64, +} + +const fn from_secs(s: u64) -> Duration { + Duration::from_secs(s) +} + +const TIMER_DISTRIBUTIONS: &[TimerDistribution] = &[ + TimerDistribution { + duration: from_secs(1), + percentage: 0.40, + }, + TimerDistribution { + duration: from_secs(10), + percentage: 0.30, + }, + TimerDistribution { + duration: from_secs(60), + percentage: 0.20, + }, + TimerDistribution { + duration: from_secs(300), + percentage: 0.10, + }, +]; + +/// Each timer is polled once to register, then dropped before firing. async fn create_and_drop_timers_instrumented(count: usize, concurrent_tasks: usize) -> Duration { let handles: Vec<_> = (0..concurrent_tasks) .map(|_| { tokio::spawn(async move { - // Create all sleep futures - let mut sleeps = Vec::with_capacity(count / concurrent_tasks); - for _ in 0..count / concurrent_tasks { - sleeps.push(Box::pin(sleep(Duration::from_secs(60)))); - } + // Create all sleep futures with realistic distribution + let sleeps: Vec<_> = TIMER_DISTRIBUTIONS + .iter() + .flat_map(|td| { + repeat(td.duration).take((TIMER_COUNT as f64 * td.percentage) as usize) + }) + .cycle() + .take(count / concurrent_tasks) + .map(|timeout| Box::pin(sleep(timeout))) + .collect(); // Start timing - poll and drop (METERED) let start = Instant::now(); @@ -47,14 +80,11 @@ async fn create_and_drop_timers_instrumented(count: usize, concurrent_tasks: usi let wall_clock_start = Instant::now(); - let mut task_durations = Vec::with_capacity(concurrent_tasks); for handle in handles { - task_durations.push(handle.await.unwrap()); + handle.await.unwrap(); } - let wall_clock = wall_clock_start.elapsed(); - - wall_clock + wall_clock_start.elapsed() } fn bench_many_timers(c: &mut Criterion) { diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 627fcbc5ec3..0397b05fd58 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -327,6 +327,11 @@ pub(crate) struct TimerHandle { inner: NonNull, } +// SAFETY: TimerHandle is a pointer to TimerShared, which is Send + Sync. +// The handle can be safely sent across threads. +unsafe impl Send for TimerHandle {} +unsafe impl Sync for TimerHandle {} + pub(super) type EntryList = crate::util::linked_list::LinkedList; /// The shared state structure of a timer. This structure is shared between the @@ -356,6 +361,12 @@ pub(crate) struct TimerShared { /// complete, fired, error, etc). state: StateCell, + /// Tracks whether this timer is in the buckets (true) or wheel (false). + /// This is used during cleanup to determine where to remove the timer from. + /// Accessed with relaxed ordering as it's only modified during registration + /// under either the bucket lock or driver lock. + in_buckets: crate::loom::sync::atomic::AtomicBool, + _p: PhantomPinned, } @@ -388,6 +399,7 @@ impl TimerShared { registered_when: AtomicU64::new(0), pointers: linked_list::Pointers::new(), state: StateCell::default(), + in_buckets: crate::loom::sync::atomic::AtomicBool::new(false), _p: PhantomPinned, } } @@ -415,7 +427,7 @@ impl TimerShared { /// /// SAFETY: Must be called with the driver lock held, and when this entry is /// not in any timer wheel lists. - unsafe fn set_registered_when(&self, when: u64) { + pub(super) unsafe fn set_registered_when(&self, when: u64) { self.registered_when.store(when, Ordering::Relaxed); } @@ -453,6 +465,17 @@ impl TimerShared { pub(super) fn might_be_registered(&self) -> bool { self.state.might_be_registered() } + + /// Returns true if this timer is registered in the buckets (vs the wheel). + pub(super) fn is_in_buckets(&self) -> bool { + self.in_buckets.load(Ordering::Relaxed) + } + + /// Marks this timer as being in the buckets. + /// SAFETY: Must be called while holding the bucket lock during insertion. + pub(super) unsafe fn mark_in_buckets(&self) { + self.in_buckets.store(true, Ordering::Relaxed); + } } unsafe impl linked_list::Link for TimerShared { @@ -583,11 +606,19 @@ impl TimerEntry { } }; - if inner.extend_expiration(tick).is_ok() { + // For bucket timers, we cannot use extend_expiration because the timer handle + // is physically located in a specific bucket. Changing the expiration would + // leave the handle in the wrong bucket. So we skip extend_expiration and go + // straight to reregister, which will insert into the correct new bucket. + // + // NOTE: Even when reregister=false (e.g., reset_without_reregister used by Interval), + // bucket timers MUST still reregister to move the handle to the new bucket. + // The reregister=false case is only valid for wheel timers which can use extend_expiration. + if !inner.is_in_buckets() && inner.extend_expiration(tick).is_ok() { return; } - if reregister { + if reregister || inner.is_in_buckets() { unsafe { self.driver() .reregister(&self.driver.driver().io, tick, inner.into()); @@ -641,8 +672,14 @@ impl TimerHandle { /// Forcibly sets the true and cached expiration times to the given tick. /// - /// SAFETY: The caller must ensure that the handle remains valid, the driver - /// lock is held, and that the timer is not in any wheel linked lists. + /// SAFETY: The caller must ensure that the handle remains valid and that + /// the timer is not in any wheel linked lists. Additionally, either: + /// - The driver lock is held (for wheel-based timers), OR + /// - The appropriate bucket lock is held (for bucket-based timers) + /// + /// The lock requirement ensures proper memory synchronization between the + /// thread setting the expiration and the driver thread that will later + /// fire the timer. pub(super) unsafe fn set_expiration(&self, tick: u64) { self.inner.as_ref().set_expiration(tick); } @@ -670,6 +707,32 @@ impl TimerHandle { } } + /// Marks this timer as being in the buckets. + /// SAFETY: Must be called while holding the bucket lock during insertion. + pub(super) unsafe fn mark_in_buckets(&self) { + self.inner.as_ref().mark_in_buckets() + } + + /// Unmarks this timer as being in the buckets. + pub(super) unsafe fn unmark_in_buckets(&self) { + self.inner + .as_ref() + .in_buckets + .store(false, crate::loom::sync::atomic::Ordering::Relaxed); + } + + /// Returns true if this timer is in the buckets (vs the wheel). + /// SAFETY: The handle must be valid. + pub(super) unsafe fn is_in_buckets_unsafe(&self) -> bool { + unsafe { self.inner.as_ref().is_in_buckets() } + } + + /// Returns true if this timer might still be registered (not yet fired). + /// SAFETY: The handle must be valid. + pub(super) unsafe fn might_be_registered(&self) -> bool { + unsafe { self.inner.as_ref().might_be_registered() } + } + /// Attempts to transition to a terminal state. If the state is already a /// terminal state, does nothing. /// diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 3250dce97f6..b1f4a7b2b7d 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -16,6 +16,9 @@ pub(crate) use self::handle::Handle; mod source; pub(crate) use source::TimeSource; +mod timer_buckets; +use timer_buckets::GlobalTimerBuckets; + mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; @@ -94,6 +97,10 @@ struct Inner { // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex state: Mutex, + /// Global timer buckets for fast-path timer registration (0-120 seconds). + /// These have their own synchronization and don't need the driver lock. + buckets: GlobalTimerBuckets, + /// True if the driver is being shutdown. is_shutdown: AtomicBool, @@ -112,7 +119,7 @@ struct InnerState { /// The earliest time at which we promise to wake up without unparking. next_wake: Option, - /// Timer wheel. + /// Timer wheel (fallback for timers > 120 seconds). wheel: wheel::Wheel, } @@ -125,6 +132,7 @@ impl Driver { /// Specifying the source of time is useful when testing. pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) { let time_source = TimeSource::new(clock); + let initial_tick = time_source.now(clock); let handle = Handle { time_source, @@ -133,6 +141,7 @@ impl Driver { next_wake: None, wheel: wheel::Wheel::new(), }), + buckets: GlobalTimerBuckets::new(initial_tick), is_shutdown: AtomicBool::new(false), #[cfg(feature = "test-util")] @@ -175,7 +184,16 @@ impl Driver { assert!(!handle.is_shutdown()); - let next_wake = lock.wheel.next_expiration_time(); + // Calculate next_wake from both bucket timers and wheel timers + let maybe_bucket_next = handle.inner.buckets.next_expiration_time(); + let maybe_wheel_next = lock.wheel.next_expiration_time(); + + // Take the minimum of bucket and wheel next expiration times + let next_wake = match (maybe_bucket_next, maybe_wheel_next) { + (Some(bucket_next), Some(wheel_next)) => Some(std::cmp::min(bucket_next, wheel_next)), + (bucket_next, wheel_next) => bucket_next.or(wheel_next), + }; + lock.next_wake = next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); @@ -255,6 +273,19 @@ impl Handle { pub(self) fn process_at_time(&self, mut now: u64) { let mut waker_list = WakeList::new(); + // First, advance the timer buckets and fire all expired timers + // This doesn't require the driver lock - buckets have their own synchronization + // SAFETY: The buckets manage their own thread safety via atomics and per-bucket locks + let bucket_wakers = unsafe { self.inner.buckets.advance(now) }; + for waker in bucket_wakers { + waker_list.push(waker); + + if !waker_list.can_push() { + waker_list.wake_all(); + } + } + + // Now process the timer wheel (for timers > 120s) - this DOES need the driver lock let mut lock = self.inner.lock(); if now < lock.wheel.elapsed() { @@ -285,10 +316,18 @@ impl Handle { } } - lock.next_wake = lock - .wheel - .poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + // Calculate next_wake from both bucket timers and wheel timers + let maybe_bucket_next = self.inner.buckets.next_expiration_time(); + let maybe_wheel_next = lock.wheel.poll_at(); + + // Take the minimum of bucket and wheel next expiration times + let next_wake = match (maybe_bucket_next, maybe_wheel_next) { + (Some(b), Some(w)) => Some(std::cmp::min(b, w)), + (bucket, wheel) => bucket.or(wheel), + }; + + lock.next_wake = + next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); drop(lock); @@ -307,13 +346,25 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.inner.lock(); + // Check if this timer is in the buckets or the wheel + let in_buckets = entry.as_ref().is_in_buckets(); + + if in_buckets { + // Timer is in buckets - remove it from the bucket Vec before the entry is freed. + // This prevents use-after-free when the TimerShared (embedded in TimerEntry) is dropped. + let registered_when = entry.as_ref().registered_when(); + let entry_handle = entry.as_ref().handle(); + self.inner.buckets.try_remove(registered_when, entry_handle); + } else { + // Timer is in the wheel - need driver lock for safe removal + let mut lock = self.inner.lock(); - if entry.as_ref().might_be_registered() { - lock.wheel.remove(entry); - } + if entry.as_ref().might_be_registered() { + lock.wheel.remove(entry); + } - entry.as_ref().handle().fire(Ok(())); + entry.as_ref().handle().fire(Ok(())); + } } } @@ -329,11 +380,74 @@ impl Handle { new_tick: u64, entry: NonNull, ) { + // Check if this timer was previously in buckets + let was_in_buckets = unsafe { entry.as_ref().is_in_buckets() }; + + if was_in_buckets { + // Timer is in buckets - keep it in buckets for reset + // Just insert at new deadline - stale copies will be skipped via registered_when check + let entry_handle = entry.as_ref().handle(); + + match self.inner.buckets.try_insert(new_tick, entry_handle) { + timer_buckets::InsertResult::Inserted => { + // Always unpark for bucket insertions - the bucket maintains its own + // next_wake atomic, and we need to ensure the driver wakes up for + // bucket timers. Checking next_wake would require locking, defeating + // the purpose of lock-free bucket insertion. + unpark.unpark(); + return; + } + timer_buckets::InsertResult::Elapsed(handle) => { + // Timer already elapsed - but update registered_when so future resets work + unsafe { + entry.as_ref().set_registered_when(new_tick); + handle.fire(Ok(())); + }; + return; + } + timer_buckets::InsertResult::OutOfRange(_handle) => { + // New deadline is >120s, must move to wheel + unsafe { entry.as_ref().handle().unmark_in_buckets() }; + // Fall through to wheel path below + } + } + } else { + // Timer was NOT in buckets (either new or was in wheel) + // Try buckets first for the new deadline + let entry_handle = entry.as_ref().handle(); + + match self.inner.buckets.try_insert(new_tick, entry_handle) { + timer_buckets::InsertResult::Inserted => { + // Successfully inserted in buckets + // If timer was previously in wheel, it will remain there as a stale entry + // The wheel will skip it when it sees in_buckets = true + + // Always unpark for bucket insertions - the bucket maintains its own + // next_wake atomic, and we need to ensure the driver wakes up for + // bucket timers. Checking next_wake would require locking, defeating + // the purpose of lock-free bucket insertion. + unpark.unpark(); + return; + } + timer_buckets::InsertResult::Elapsed(handle) => { + // Timer already elapsed - but update registered_when so future resets work + unsafe { + entry.as_ref().set_registered_when(new_tick); + handle.fire(Ok(())); + }; + return; + } + timer_buckets::InsertResult::OutOfRange(_handle) => { + // Fall through to wheel path + } + } + } + + // Timer didn't fit in buckets (>120s) - use wheel let waker = unsafe { let mut lock = self.inner.lock(); - // We may have raced with a firing/deregistration, so check before - // deregistering. + // Remove from wheel if it's already there if unsafe { entry.as_ref().might_be_registered() } { lock.wheel.remove(entry); } @@ -344,7 +458,7 @@ impl Handle { if self.is_shutdown() { unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } } else { - entry.set_expiration(new_tick); + unsafe { entry.set_expiration(new_tick) }; // Note: We don't have to worry about racing with some other resetting // thread, because add_entry and reregister require exclusive control of diff --git a/tokio/src/runtime/time/timer_buckets.rs b/tokio/src/runtime/time/timer_buckets.rs new file mode 100644 index 00000000000..2a8db2da60d --- /dev/null +++ b/tokio/src/runtime/time/timer_buckets.rs @@ -0,0 +1,325 @@ +//! Global timer buckets for fast-path timer registration and firing. +//! +//! This module implements a ring buffer of timer buckets with 1ms granularity, +//! covering timers from 0-120 seconds in the future. Timers beyond this range +//! fall back to the global timer wheel. +//! +//! # Design +//! +//! The ring buffer contains 120,000 buckets (one per millisecond for 2 minutes). +//! Each bucket has its own lock and contains a Vec of TimerHandles. This provides: +//! - Lock-free index calculation for insertion +//! - Fine-grained locking (only contention when timers land in same millisecond) +//! - O(1) advancement as time progresses +//! - Natural wraparound every 120 seconds +//! +//! # Synchronization +//! +//! - `head`: Atomic index pointing to the "current time" bucket +//! - `ref_time`: Atomic tick value representing when head was at position 0 +//! - Per-bucket locks: Only held during push (insertion) or drain (firing) +//! +//! This allows workers to insert timers lock-free (just atomic reads + single bucket lock), +//! while the driver advances the head pointer and fires expired buckets under the driver lock. + +use crate::loom::sync::atomic::Ordering; +use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; +use crate::loom::sync::Mutex; +use crate::runtime::time::TimerHandle; + +use std::task::Waker; + +/// Number of milliseconds to cover in the ring buffer (2 minutes). +/// Timers beyond this range fall back to the global timer wheel. +const BUCKET_COUNT: usize = 120_000; + +/// Global ring buffer of timer buckets. +/// +/// This structure is shared across all workers and is integrated into the +/// timer driver's Handle. +pub(crate) struct GlobalTimerBuckets { + /// The buckets themselves. Vec of 120,000 buckets. + buckets: Vec, + + /// Index of the "head" bucket (represents current time). + /// Advances forward as time progresses. + head: AtomicUsize, + + /// The tick value (milliseconds since epoch) that the head position represents. + /// Used to calculate bucket offsets for new timers. + ref_time: AtomicU64, + + /// The earliest timer deadline across all buckets. + /// Used to calculate when the driver should wake up. + /// Value of u64::MAX means no timers are registered. + next_wake: AtomicU64, +} + +/// A single bucket in the ring buffer. +struct Bucket { + /// Timers that expire in this millisecond. + /// Protected by a mutex for simplicity - can be optimized to lock-free later if needed. + timers: Mutex>, +} + +impl Default for Bucket { + fn default() -> Self { + Self { + timers: Mutex::new(Vec::new()), + } + } +} + +/// Result of attempting to insert a timer into buckets. +#[derive(Debug)] +pub(crate) enum InsertResult { + /// Timer was successfully inserted into a bucket + Inserted, + /// Timer is too far in the future (>120s), try the wheel instead + OutOfRange(TimerHandle), + /// Timer has already elapsed and should be fired immediately + Elapsed(TimerHandle), +} + +impl GlobalTimerBuckets { + /// Creates a new global timer bucket ring buffer. + /// + /// The `initial_tick` parameter sets the reference time for the head position. + pub(crate) fn new(initial_tick: u64) -> Self { + // Pre-allocate all buckets on the heap + let buckets = (0..BUCKET_COUNT).map(|_| Bucket::default()).collect(); + + Self { + buckets, + head: AtomicUsize::new(0), + ref_time: AtomicU64::new(initial_tick), + next_wake: AtomicU64::new(u64::MAX), + } + } + + /// Attempts to insert a timer into the appropriate bucket. + /// + /// Returns `InsertResult` indicating what happened. + /// + /// # Parameters + /// - `deadline_tick`: The absolute tick (milliseconds since epoch) when timer expires + /// - `timer`: The timer handle to insert + pub(crate) fn try_insert(&self, deadline_tick: u64, timer: TimerHandle) -> InsertResult { + self.try_insert_inner(deadline_tick, timer, true) + } + + fn try_insert_inner(&self, deadline_tick: u64, timer: TimerHandle, mark: bool) -> InsertResult { + // Read current reference time and head position atomically + let ref_tick = self.ref_time.load(Ordering::Acquire); + let head_pos = self.head.load(Ordering::Acquire); + + // Check if timer has already elapsed + if deadline_tick <= ref_tick { + return InsertResult::Elapsed(timer); + } + + // Calculate offset from current reference time + let offset = deadline_tick - ref_tick; + + // If timer is beyond our range, don't insert it here + if offset >= BUCKET_COUNT as u64 { + return InsertResult::OutOfRange(timer); + } + + // Calculate target bucket index with wraparound + let bucket_idx = (head_pos + offset as usize) % BUCKET_COUNT; + + // Lock just this bucket and insert the timer + let mut bucket = self.buckets[bucket_idx].timers.lock(); + + // Re-check after locking - ref_time might have advanced while we were waiting for lock + let ref_tick_locked = self.ref_time.load(Ordering::Acquire); + if deadline_tick <= ref_tick_locked { + // Deadline passed while we were acquiring the lock + return InsertResult::Elapsed(timer); + } + + // SAFETY: We hold the bucket lock which synchronizes with advance(). + // The handle is valid (just passed to us), and this timer is not in + // the wheel (it's going into buckets instead). The bucket lock provides + // the necessary memory fence for the relaxed atomic operations in + // set_expiration() to be visible when advance() later fires this timer. + unsafe { + if mark { + timer.mark_in_buckets(); + } + timer.set_expiration(deadline_tick); + } + + bucket.push(timer); + + // Update next_wake if this timer is earlier + self.next_wake.fetch_min(deadline_tick, Ordering::Release); + + InsertResult::Inserted + } + + /// Returns the tick of the next timer that will fire, if any. + /// + /// This is used to calculate when the driver should wake up. + pub(crate) fn next_expiration_time(&self) -> Option { + let next = self.next_wake.load(Ordering::Acquire); + if next == u64::MAX { + None + } else { + Some(next) + } + } + + /// Removes a timer from the bucket where it was inserted. + /// + /// This is called when a timer is being cancelled/dropped before it fires. + /// We need to remove the handle from the bucket Vec to avoid use-after-free + /// when the underlying TimerShared is freed. + /// + /// # Parameters + /// - `registered_when`: The tick value when this timer was originally inserted (stored in registered_when) + /// - `timer_to_remove`: The handle to remove + pub(crate) fn try_remove(&self, registered_when: u64, _timer_to_remove: TimerHandle) { + // Calculate which bucket this timer should be in based on when it was registered + let ref_tick = self.ref_time.load(Ordering::Acquire); + let head_pos = self.head.load(Ordering::Acquire); + + // Only try to remove if the timer is still in the valid bucket range + if registered_when <= ref_tick { + // Timer's deadline has passed, it's either already been fired or elapsed + // Don't try to remove it + return; + } + + let offset = registered_when - ref_tick; + + // If offset is >= BUCKET_COUNT, the timer is out of range (shouldn't happen for buckets, + // but be safe) + if offset >= BUCKET_COUNT as u64 { + return; + } + + let bucket_idx = (head_pos + offset as usize) % BUCKET_COUNT; + + // Lock the bucket and search for/remove the matching timer + if let Some(mut bucket) = self.buckets[bucket_idx].timers.try_lock() { + // Remove the timer from the bucket Vec by finding the matching registered_when value. + // Multiple timers can be in the same bucket, but only one should have our registered_when. + let target_registered_when = registered_when; + bucket.retain(|handle: &TimerHandle| { + let handle_registered = unsafe { handle.registered_when() }; + // Keep the handle if it has a different registered_when (not our timer) + handle_registered != target_registered_when + }); + } + // If we can't acquire the lock, bail out. The timer will be fired during the + // current advance() call that's holding the lock, since it's still marked as + // being in buckets. + } + + /// Advances the ring buffer to the current time and fires all expired timers. + /// + /// This is called by the driver when it processes timers. Returns all wakers + /// that need to be woken after the driver lock is released. + /// + /// # Parameters + /// - `now_tick`: The current tick (milliseconds since epoch) + /// + /// # Safety + /// Must be called with the driver lock held. + pub(crate) unsafe fn advance(&self, now_tick: u64) -> Vec { + let mut wakers = Vec::new(); + + let ref_tick = self.ref_time.load(Ordering::Acquire); + let ticks_elapsed = now_tick.saturating_sub(ref_tick); + + // Cap the number of ticks we advance to the bucket count + // If time jumped way forward (e.g., during shutdown with u64::MAX), we only need + // to fire all buckets once, not loop billions of times + let ticks_to_advance = std::cmp::min(ticks_elapsed, BUCKET_COUNT as u64); + + // Track if we need to clear next_wake (if we fire the bucket it points to and it becomes empty) + let current_next_wake = self.next_wake.load(Ordering::Acquire); + + // Advance through each elapsed tick, firing timers in each bucket + for _ in 0..ticks_to_advance { + // Atomically advance head and ref_time, get the tick we're firing + // fetch_add returns the OLD value, but we want to fire the NEW bucket at the NEW time + let bucket_idx = (self.head.fetch_add(1, Ordering::AcqRel) + 1) % BUCKET_COUNT; + let current_tick = self.ref_time.fetch_add(1, Ordering::AcqRel) + 1; + + // Fire all timers in this bucket + let mut bucket = self.buckets[bucket_idx].timers.lock(); + + let had_timers = !bucket.is_empty(); + + for timer_handle in bucket.drain(..) { + // Only fire timers that are actually in buckets and match the current tick. + // If a timer was reset to a deadline > 120s, it was moved to the wheel but + // its old handle may still be in this bucket Vec. We skip these by checking + // is_in_buckets and registered_when to ensure we only fire valid entries. + if unsafe { timer_handle.is_in_buckets_unsafe() } { + let registered = unsafe { timer_handle.registered_when() }; + if registered == current_tick { + // SAFETY: We hold the driver lock, which is required for firing + if let Some(waker) = unsafe { timer_handle.fire(Ok(())) } { + wakers.push(waker); + } + } + } + } + + // If we just drained the bucket that next_wake was pointing to, clear it + if had_timers && current_tick == current_next_wake { + // Use compare_exchange to only clear if it's still pointing to this tick + // (another thread might have inserted a new earlier timer) + let _ = self.next_wake.compare_exchange( + current_next_wake, + u64::MAX, + Ordering::Release, + Ordering::Relaxed, + ); + } + } + + wakers + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_offset_calculation() { + let buckets = GlobalTimerBuckets::new(1000); + + // Timer 5000ms in the future should land at index 5000 + assert_eq!(buckets.head.load(Ordering::Relaxed), 0); + assert_eq!(buckets.ref_time.load(Ordering::Relaxed), 1000); + + // Offset = 6000 - 1000 = 5000 + // Index = (0 + 5000) % 120000 = 5000 + let offset = 6000u64.saturating_sub(1000); + assert_eq!(offset, 5000); + assert!((offset as usize) < BUCKET_COUNT); + } + + #[test] + fn test_wraparound() { + let buckets = GlobalTimerBuckets::new(0); + + // Advance head near the end + buckets.head.store(119_500, Ordering::Release); + buckets.ref_time.store(119_500, Ordering::Release); + + // Timer 1000ms in the future should wrap around + let offset = 120_500u64.saturating_sub(119_500); + assert_eq!(offset, 1000); + + let bucket_idx = (119_500 + offset as usize) % BUCKET_COUNT; + assert_eq!(bucket_idx, 120_500 % BUCKET_COUNT); + assert_eq!(bucket_idx, 500); // Wrapped around + } +} diff --git a/tokio/src/runtime/time/wheel/mod.rs b/tokio/src/runtime/time/wheel/mod.rs index 8d94303544c..85fb183b6dc 100644 --- a/tokio/src/runtime/time/wheel/mod.rs +++ b/tokio/src/runtime/time/wheel/mod.rs @@ -234,6 +234,13 @@ impl Wheel { debug_assert_eq!(unsafe { item.registered_when() }, expiration.deadline); } + // Check if timer has already been fired (e.g., by bucket path during reset). + // Also skip if timer was moved to buckets (stale wheel entry). + if !unsafe { item.might_be_registered() } || unsafe { item.is_in_buckets_unsafe() } { + // Timer was already fired or moved to buckets, skip it + continue; + } + // Try to expire the entry; this is cheap (doesn't synchronize) if // the timer is not expired, and updates registered_when. match unsafe { item.mark_pending(expiration.deadline) } {