diff --git a/Cargo.toml b/Cargo.toml index cc982cf1f..2686fda90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ rust-version.workspace = true # # Only useful on the `wasm32-unknown-unknown` target. web_spin_lock = ["dep:wasm_sync", "rayon-core/web_spin_lock"] +tracing = ["rayon-core/tracing"] [dependencies] # These are both public dependencies! diff --git a/ci/compat-Cargo.lock b/ci/compat-Cargo.lock index 613564438..26a1474f7 100644 --- a/ci/compat-Cargo.lock +++ b/ci/compat-Cargo.lock @@ -1303,6 +1303,8 @@ dependencies = [ "rand", "rand_xorshift", "scoped-tls", + "tracing", + "tracing-subscriber", "wasm_sync", ] @@ -1459,6 +1461,15 @@ dependencies = [ "syn", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1554,6 +1565,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tiny-skia" version = "0.11.4" @@ -1611,6 +1631,20 @@ name = "tracing-core" version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +dependencies = [ + "once_cell", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +dependencies = [ + "sharded-slab", + "thread_local", + "tracing-core", +] [[package]] name = "ttf-parser" diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 5f34e257b..ed1c7ef85 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -24,12 +24,19 @@ web_spin_lock = ["dep:wasm_sync"] [dependencies] crossbeam-deque.workspace = true crossbeam-utils.workspace = true +tracing = { version = "0.1", default-features = false, features = [ + "std", +], optional = true } wasm_sync = { workspace = true, optional = true } [dev-dependencies] rand.workspace = true rand_xorshift.workspace = true scoped-tls.workspace = true +tracing-subscriber = { version = "0.3", default-features = false, features = [ + "std", + "registry", +] } [target.'cfg(unix)'.dev-dependencies] libc.workspace = true diff --git a/rayon-core/src/instrumentation.rs b/rayon-core/src/instrumentation.rs new file mode 100644 index 000000000..ab20efbe7 --- /dev/null +++ b/rayon-core/src/instrumentation.rs @@ -0,0 +1,298 @@ +//! Tracing instrumentation for rayon-core. +//! +//! This module provides optional tracing support, enabled via the `tracing` feature. +//! All instrumentation compiles to no-ops when the feature is disabled. +//! +//! # Spans +//! +//! - `rayon::worker_thread` (INFO) - Wraps the entire lifetime of a worker thread. +//! Fields: `worker` (thread index), `pool_id`. +//! +//! - `rayon::job_execute` (INFO) - Wraps the execution of a job. +//! Fields: `job_id`, `worker` (executing thread index). +//! Parent: The span that was active when the job was created (enables cross-thread +//! context propagation). +//! +//! # Events +//! +//! - `rayon::thread_idle` (DEBUG) - Emitted when a worker thread goes idle. +//! - `rayon::thread_active` (DEBUG) - Emitted when a worker thread wakes up. +//! - `rayon::job_injected` (DEBUG) - Emitted when a job is injected into the global queue. +//! Fields: `job_id`, `pool_id`. +//! - `rayon::job_stolen` (DEBUG) - Emitted when a job is stolen from another thread. +//! Fields: `job_id`, `victim` (thread stolen from). +//! +//! # Context Propagation +//! +//! Jobs capture the current span context at creation time via [`JobContext`]. When +//! a job executes (potentially on a different thread), it re-enters the captured +//! context before creating the `job_execute` span. This allows tracing tools to +//! correctly attribute work to the logical operation that spawned it, even when +//! executed by a different worker thread. + +#[cfg(feature = "tracing")] +#[macro_use] +mod inner { + /// Emits a tracing event when the `tracing` feature is enabled. + /// Compiles to nothing when disabled. + macro_rules! trace_event { + ($($arg:tt)*) => { + tracing::event!($($arg)*) + }; + } + + /// Creates and enters a tracing span when the `tracing` feature is enabled. + /// Returns an `EnteredSpan` that will exit when dropped. + /// Compiles to nothing when disabled. + macro_rules! trace_span { + ($($arg:tt)*) => { + tracing::span!($($arg)*).entered() + }; + } + + use std::sync::atomic::{AtomicU64, Ordering}; + + static NEXT_JOB_ID: AtomicU64 = AtomicU64::new(0); + + /// Guard returned by entering a job context. + pub(crate) type ContextGuard<'a> = tracing::span::Entered<'a>; + + /// Captured context for a job, used to propagate span context across threads. + #[derive(Clone)] + pub(crate) struct JobContext { + span: tracing::Span, + id: u64, + } + + impl JobContext { + /// Captures the current span context and assigns a unique job ID. + pub(crate) fn current() -> Self { + Self { + span: tracing::Span::current(), + id: NEXT_JOB_ID.fetch_add(1, Ordering::Relaxed), + } + } + + /// Returns the unique job ID. + pub(crate) fn id(&self) -> u64 { + self.id + } + + /// Enters the captured span context. + pub(crate) fn enter(&self) -> ContextGuard<'_> { + self.span.enter() + } + } +} + +#[cfg(not(feature = "tracing"))] +#[macro_use] +mod inner { + macro_rules! trace_event { + ($($arg:tt)*) => {}; + } + + macro_rules! trace_span { + ($($arg:tt)*) => { + () + }; + } + + /// Guard returned by entering a job context (no-op). + pub(crate) struct ContextGuard; + + /// Captured context for a job (no-op when tracing is disabled). + #[derive(Clone)] + pub(crate) struct JobContext; + + impl JobContext { + /// Captures the current span context (no-op). + pub(crate) fn current() -> Self { + Self + } + + /// Returns a placeholder job ID. + /// + /// This method exists for API compatibility with the tracing-enabled + /// version. The value is never used because trace macros expand to + /// nothing when the feature is disabled. + #[allow(dead_code)] + pub(crate) fn id(&self) -> u64 { + 0 + } + + /// No-op context entry. + pub(crate) fn enter(&self) -> ContextGuard { + ContextGuard + } + } +} + +pub(crate) use inner::*; + +#[cfg(all(test, feature = "tracing"))] +mod tests { + //! Note: These tests use a global subscriber because rayon's worker threads + //! don't inherit thread-local subscribers. + + use std::collections::HashMap; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex, OnceLock}; + + use tracing::span::{Attributes, Id}; + use tracing::{Event, Subscriber}; + use tracing_subscriber::layer::{Context, SubscriberExt}; + use tracing_subscriber::registry::LookupSpan; + use tracing_subscriber::util::SubscriberInitExt; + use tracing_subscriber::Layer; + + use crate::ThreadPoolBuilder; + + /// Shared test state for tracking spans and events. + struct TestState { + // Span/event counters + worker_thread_spans: AtomicUsize, + job_execute_spans: AtomicUsize, + total_events: AtomicUsize, + + // Parent tracking for context propagation tests + parents: Mutex>>, + user_span_id: Mutex>, + job_spans_with_user_parent: AtomicUsize, + } + + impl TestState { + fn new() -> Arc { + Arc::new(Self { + worker_thread_spans: AtomicUsize::new(0), + job_execute_spans: AtomicUsize::new(0), + total_events: AtomicUsize::new(0), + parents: Mutex::new(HashMap::new()), + user_span_id: Mutex::new(None), + job_spans_with_user_parent: AtomicUsize::new(0), + }) + } + + /// Check if `span_id` has `ancestor_id` as an ancestor. + fn has_ancestor(&self, span_id: &Id, ancestor_id: &Id) -> bool { + let parents = self.parents.lock().unwrap(); + let mut current = Some(span_id.clone()); + while let Some(id) = current { + if &id == ancestor_id { + return true; + } + current = parents.get(&id).and_then(|p| p.clone()); + } + false + } + } + + /// Layer that tracks spans and events for testing. + struct TestLayer(Arc); + + impl Layer for TestLayer + where + S: Subscriber + for<'lookup> LookupSpan<'lookup>, + { + fn on_event(&self, _event: &Event<'_>, _ctx: Context<'_, S>) { + self.0.total_events.fetch_add(1, Ordering::Relaxed); + } + + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let name = attrs.metadata().name(); + + // Record parent relationship + let parent_id = ctx.span(id).and_then(|span| span.parent()).map(|p| p.id()); + self.0.parents.lock().unwrap().insert(id.clone(), parent_id); + + // Count span types + if name.contains("worker_thread") { + self.0.worker_thread_spans.fetch_add(1, Ordering::Relaxed); + } else if name.contains("job_execute") { + self.0.job_execute_spans.fetch_add(1, Ordering::Relaxed); + + // Check if this job_execute has user_operation as ancestor + if let Some(ref user_id) = *self.0.user_span_id.lock().unwrap() { + if self.0.has_ancestor(id, user_id) { + self.0 + .job_spans_with_user_parent + .fetch_add(1, Ordering::Relaxed); + } + } + } else if name == "user_operation" { + *self.0.user_span_id.lock().unwrap() = Some(id.clone()); + } + } + } + + /// Returns the shared test state, initializing the global subscriber on first call. + fn test_state() -> Arc { + static STATE: OnceLock> = OnceLock::new(); + STATE + .get_or_init(|| { + let state = TestState::new(); + tracing_subscriber::registry() + .with(TestLayer(Arc::clone(&state))) + .init(); + state + }) + .clone() + } + + /// Test that worker thread and job execution spans are created. + #[test] + fn test_tracing_instrumentation() { + let state = test_state(); + + let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let registry = Arc::clone(pool.registry()); + + pool.install(|| { + crate::join(|| (0..100).sum::(), || (100..200).sum::()); + }); + + drop(pool); + registry.wait_until_stopped(); + + // Verify worker thread spans + let worker_spans = state.worker_thread_spans.load(Ordering::Relaxed); + assert!( + worker_spans >= 2, + "Expected at least 2 worker_thread spans, got {worker_spans}", + ); + + // Verify job execution spans + let job_spans = state.job_execute_spans.load(Ordering::Relaxed); + assert!( + job_spans > 0, + "Expected some job_execute spans, got {job_spans}" + ); + } + + /// Test that span context is propagated from job creation site to execution site. + #[test] + fn test_context_propagation() { + let state = test_state(); + + let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let registry = Arc::clone(pool.registry()); + + // Create a user span and spawn work inside it + let user_span = tracing::span!(tracing::Level::INFO, "user_operation"); + let _enter = user_span.enter(); + + pool.install(|| { + crate::join(|| (0..100).sum::(), || (100..200).sum::()); + }); + + drop(pool); + registry.wait_until_stopped(); + + // Verify that job_execute spans have user_operation as ancestor + let jobs_with_parent = state.job_spans_with_user_parent.load(Ordering::Relaxed); + assert!( + jobs_with_parent > 0, + "Expected job_execute spans to have user_operation as ancestor, got {jobs_with_parent}", + ); + } +} diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 5664bb385..46b018423 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -1,3 +1,4 @@ +use crate::instrumentation::JobContext; use crate::latch::Latch; use crate::unwind; use crossbeam_deque::{Injector, Steal}; @@ -24,14 +25,21 @@ pub(super) trait Job { unsafe fn execute(this: *const ()); } -/// Effectively a Job trait object. Each JobRef **must** be executed -/// exactly once, or else data may leak. +/// Effectively a decorated Job trait object. Each JobRef **must** be +/// executed exactly once, or else data may leak. /// /// Internally, we store the job's data in a `*const ()` pointer. The /// true type is something like `*const StackJob<...>`, but we hide /// it. We also carry the "execute fn" from the `Job` trait. +/// +/// When the `tracing` feature is enabled, JobRef also captures the +/// current span context at creation time. This context is restored +/// when the job executes, enabling proper parent-child span +/// relationships even when jobs are stolen and executed by different +/// worker threads. pub(super) struct JobRef { pointer: *const (), + context: JobContext, execute_fn: unsafe fn(*const ()), } @@ -48,6 +56,7 @@ impl JobRef { // erase types: JobRef { pointer: data as *const (), + context: JobContext::current(), execute_fn: ::execute, } } @@ -59,8 +68,34 @@ impl JobRef { (self.pointer, self.execute_fn) } + #[inline] + #[cfg_attr(not(feature = "tracing"), allow(dead_code))] + pub(super) fn context(&self) -> &JobContext { + &self.context + } + #[inline] pub(super) unsafe fn execute(self) { + let _context_guard = self.context.enter(); + // We use `enter()` to set the parent context rather than + // `parent: ...` because `parent:` only works if the span is + // recorded. If the span is filtered out (e.g., max level set + // to WARN), context would be lost. + let _span = trace_span!( + tracing::Level::INFO, + "rayon::job_execute", + job_id = self.context.id(), + worker = { + // We find the worker id in the macro to prevent + // overhead when the `tracing` feature is disabled. + let worker = crate::registry::WorkerThread::current(); + if worker.is_null() { + 0 + } else { + (*worker).index() + } + } + ); (self.execute_fn)(self.pointer) } } diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 374507632..621484961 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -53,6 +53,34 @@ //! restrictive tilde or inequality requirements for `rayon-core`. The //! conflicting requirements will need to be resolved before the build will //! succeed. +//! +//! # Tracing +//! +//! Rayon supports the [`tracing`](https://docs.rs/tracing) crate for instrumentation, +//! enabled via the `tracing` Cargo feature. This provides visibility into the runtime +//! behavior of the thread pool. +//! +//! ## Spans (INFO level) +//! +//! - `rayon::worker_thread` - Wraps the lifetime of each worker thread. +//! Fields: `worker` (thread index), `pool_id`. +//! +//! - `rayon::job_execute` - Wraps each job execution. +//! Fields: `job_id`, `worker`. +//! Parent: The span active when the job was created (enables cross-thread context propagation). +//! +//! ## Events (DEBUG level) +//! +//! - `rayon::thread_idle` - Worker thread going idle. +//! - `rayon::thread_active` - Worker thread waking up. +//! - `rayon::job_injected` - Job injected into global queue. Fields: `job_id`, `pool_id`. +//! - `rayon::job_stolen` - Job stolen from another thread. Fields: `job_id`, `victim`. +//! +//! ## Context Propagation +//! +//! Jobs automatically capture the current span context when created. When executed +//! (potentially on a different thread), they restore this context, so `job_execute` +//! spans appear as children of the span that spawned the work. #![deny(missing_debug_implementations)] #![deny(missing_docs)] @@ -70,6 +98,8 @@ use std::thread; #[macro_use] mod private; +#[macro_use] +mod instrumentation; mod broadcast; mod job; diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 809976a08..dd6b293f2 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -437,6 +437,13 @@ impl Registry { "inject() sees state.terminate as true" ); + trace_event!( + tracing::Level::DEBUG, + pool_id = self.id().addr, + job_id = injected_job.context().id(), + "rayon::job_injected" + ); + let queue_was_empty = self.injected_jobs.is_empty(); self.injected_jobs.push(injected_job); @@ -890,7 +897,15 @@ impl WorkerThread { .find_map(|victim_index| { let victim = &thread_infos[victim_index]; match victim.stealer.steal() { - Steal::Success(job) => Some(job), + Steal::Success(job) => { + trace_event!( + tracing::Level::DEBUG, + victim = victim_index, + job_id = job.context().id(), + "rayon::job_stolen" + ); + Some(job) + } Steal::Empty => None, Steal::Retry => { retry = true; @@ -926,6 +941,13 @@ unsafe fn main_loop(thread: ThreadBuilder) { registry.catch_unwind(|| handler(index)); } + let _span = trace_span!( + tracing::Level::INFO, + "rayon::worker_thread", + worker = index, + pool_id = registry.id().addr, + ); + worker_thread.wait_until_out_of_work(); // Normal termination, do not abort. diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 9b02b39b5..9bbdb855c 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -69,6 +69,8 @@ impl Sleep { pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { self.counters.add_inactive_thread(); + trace_event!(tracing::Level::DEBUG, "rayon::thread_idle"); + IdleState { worker_index, rounds: 0, @@ -78,6 +80,8 @@ impl Sleep { #[inline] pub(super) fn work_found(&self) { + trace_event!(tracing::Level::DEBUG, "rayon::thread_active"); + // If we were the last idle thread and other threads are still sleeping, // then we should wake up another thread. let threads_to_wake = self.counters.sub_inactive_thread(); diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index 58f2c9233..c7e75b6c1 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -394,6 +394,13 @@ impl ThreadPool { let curr = self.registry.current_thread()?; Some(curr.yield_local()) } + + /// Returns the registry for this thread pool. Only available for tests. + #[cfg(test)] + #[cfg_attr(not(feature = "tracing"), allow(dead_code))] + pub(crate) fn registry(&self) -> &Arc { + &self.registry + } } impl Drop for ThreadPool { diff --git a/src/lib.rs b/src/lib.rs index 52fc01fe1..8328db1cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,8 +69,26 @@ //! //! # Targets without threading //! -//! Rayon has limited support for targets without `std` threading implementations. -//! See the [`rayon_core`] documentation for more information about its global fallback. +//! Rayon has limited support for targets without `std` threading +//! implementations. See the [`rayon_core`] documentation for more +//! information about its global fallback. +//! +//! # Tracing +//! +//! Rayon has optional support for the [`tracing`] crate, enabled via +//! the `tracing` Cargo feature. When enabled, Rayon emits spans and +//! events for observability: +//! +//! - **Spans**: `rayon::worker_thread` (per-thread lifetime), +//! `rayon::job_execute` (per-job) +//! - **Events**: `rayon::thread_idle`, `rayon::thread_active`, +//! `rayon::job_injected`, `rayon::job_stolen` +//! +//! Job spans automatically propagate context across thread boundaries, +//! so jobs appear as children of the span that created them, even when +//! executed by a different worker. +//! +//! [`tracing`]: https://docs.rs/tracing //! //! # Other questions? //!