diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c2f89cb6691..3b758ad3bed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -258,6 +258,9 @@ jobs: # only Linux supports io_uring - { os: ubuntu-latest, extra_features: io-uring } - { os: macos-latest, extra_features: "" } + # only Linux and macOS support usdt + - { os: ubuntu-latest, extra_features: usdt } + - { os: macos-latest, extra_features: usdt } steps: - uses: actions/checkout@v5 - name: Install Rust ${{ env.rust_stable }} @@ -527,7 +530,7 @@ jobs: os: ubuntu-24.04-arm - target: aarch64-unknown-linux-gnu os: ubuntu-24.04-arm - extra_features: "io-uring,taskdump" + extra_features: "io-uring,taskdump,usdt" - target: aarch64-pc-windows-msvc os: windows-11-arm steps: @@ -577,7 +580,7 @@ jobs: os: ubuntu-24.04-arm - target: aarch64-unknown-linux-gnu os: ubuntu-24.04-arm - extra_features: "io-uring,taskdump" + extra_features: "io-uring,taskdump,usdt" - target: aarch64-pc-windows-msvc os: windows-11-arm steps: @@ -671,7 +674,8 @@ jobs: # https://github.com/tokio-rs/tokio/issues/5373 - name: Check # We use `--skip io-uring` since io-uring crate doesn't provide a binding for the i686 target. - run: cargo hack check -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json -p tokio --feature-powerset --skip io-uring --depth 2 --keep-going + # We use `--skip usdt` since usdt is not supported on the i686 target. + run: cargo hack check -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json -p tokio --feature-powerset --skip io-uring --skip usdt --depth 2 --keep-going env: RUSTFLAGS: --cfg tokio_unstable -Dwarnings @@ -684,11 +688,11 @@ jobs: include: - name: "" rustflags: "" - exclude_features: "io-uring,taskdump" + exclude_features: "io-uring,taskdump,usdt" - name: "--unstable" rustflags: "--cfg tokio_unstable -Dwarnings" - exclude_features: "io-uring,taskdump" - - name: "--unstable io-uring,taskdump" + exclude_features: "io-uring,taskdump,usdt" + - name: "--unstable io-uring,taskdump,usdt" rustflags: "--cfg tokio_unstable -Dwarnings" exclude_features: "" steps: @@ -1091,11 +1095,11 @@ jobs: matrix: include: - os: windows-latest - # Windows neither supports io-uring nor taskdump. + # Windows neither supports io-uring nor taskdump nor usdt. extra_features: "tracing" - os: ubuntu-latest # includes all unstable features. - extra_features: "tracing,io-uring,taskdump" + extra_features: "tracing,io-uring,taskdump,usdt" steps: - uses: actions/checkout@v5 - name: Install Rust ${{ matrix.rust }} @@ -1112,7 +1116,7 @@ jobs: - name: check-external-types env: RUSTFLAGS: --cfg tokio_unstable -Dwarnings - RUSTDOCFLAGS: --cfg tokio_unstable + RUSTDOCFLAGS: --cfg tokio_unstable ${{ matrix.extra_flags }} run: cargo check-external-types --features $TOKIO_STABLE_FEATURES,${{ matrix.extra_features }} working-directory: tokio diff --git a/spellcheck.dic b/spellcheck.dic index 2baf2df351f..48eca78fcc3 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -308 +310 & + < @@ -293,6 +293,8 @@ Unsets unsynchronized untrusted uring +usdt +USDT usecases Valgrind Varghese diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index e87d2ad0381..36b9cf0943e 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -88,6 +88,8 @@ time = [] io-uring = ["dep:io-uring", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"] # Unstable feature. Requires `--cfg tokio_unstable` to enable. taskdump = ["dep:backtrace"] +# Unstable feature. Requires `--cfg tokio_unstable` to enable. +usdt = [] [dependencies] tokio-macros = { version = "~2.6.0", path = "../tokio-macros", optional = true } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b24a7705935..7037dd18118 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -351,6 +351,7 @@ //! Some feature flags are only available when specifying the `tokio_unstable` flag: //! //! - `tracing`: Enables tracing events. +//! - `usdt`: Enables USDT probes. //! //! Likewise, some parts of the API are only available with the same flag: //! @@ -499,6 +500,9 @@ compile_error!( linux, on `aarch64`, `x86` and `x86_64`." ); +#[cfg(all(not(tokio_unstable), feature = "usdt"))] +compile_error!("The `usdt` feature requires `--cfg tokio_unstable`."); + // Includes re-exports used by macros. // // This module is not intended to be part of the public API. In general, any diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 9af23b01cbd..a16ff3bb87b 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -593,6 +593,35 @@ macro_rules! cfg_not_trace { } } +macro_rules! cfg_usdt { + ($($item:item)*) => { + $( + #[cfg(all(tokio_unstable, feature = "usdt"))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "usdt"))))] + $item + )* + }; +} + +macro_rules! cfg_not_usdt { + ($($item:item)*) => { + $( + #[cfg(any(not(tokio_unstable), not(feature = "usdt")))] + $item + )* + } +} + +macro_rules! cfg_trace_or_usdt { + ($($item:item)*) => { + $( + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))))] + $item + )* + }; +} + macro_rules! cfg_coop { ($($item:item)*) => { $( diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index c42924be77d..57ff677c11a 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -10,7 +10,7 @@ cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; } -cfg_trace! { +cfg_trace_or_usdt! { pub(crate) use pool::Mandatory; } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 9aae69ab98f..013d93aef28 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -940,6 +940,8 @@ impl Builder { /// # } /// ``` pub fn build(&mut self) -> io::Result { + crate::util::usdt::register_probes()?; + match &self.kind { Kind::CurrentThread => self.build_current_thread_runtime(), #[cfg(feature = "rt-multi-thread")] diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 756151690cf..c0335c93bd2 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -19,6 +19,7 @@ use crate::runtime::task::JoinHandle; use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; use crate::util::trace::SpawnMeta; +use crate::util::usdt; use std::future::Future; use std::marker::PhantomData; @@ -359,9 +360,12 @@ impl Handle { ))] let future = super::task::trace::Trace::root(future); + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] + let id = super::task::Id::next(); + #[cfg(all(tokio_unstable, feature = "usdt"))] + let future = usdt::block_on(future, _meta, id); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = - crate::util::trace::task(future, "block_on", _meta, super::task::Id::next().as_u64()); + let future = crate::util::trace::task(future, "block_on", _meta, id.as_u64()); // Enter the runtime context. This sets the current driver handles and // prevents blocking an existing runtime. @@ -385,6 +389,7 @@ impl Handle { any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") ))] let future = super::task::trace::Trace::root(future); + usdt::start_task(usdt::TaskKind::Spawn, meta, id, std::mem::size_of::()); #[cfg(all(tokio_unstable, feature = "tracing"))] let future = crate::util::trace::task(future, "task", meta, id.as_u64()); self.inner.spawn(future, id, meta.spawned_at) @@ -410,6 +415,12 @@ impl Handle { any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") ))] let future = super::task::trace::Trace::root(future); + usdt::start_task( + usdt::TaskKind::SpawnLocal, + meta, + id, + std::mem::size_of::(), + ); #[cfg(all(tokio_unstable, feature = "tracing"))] let future = crate::util::trace::task(future, "task", meta, id.as_u64()); self.inner.spawn_local(future, id, meta.spawned_at) diff --git a/tokio/src/runtime/local_runtime/runtime.rs b/tokio/src/runtime/local_runtime/runtime.rs index 8c47445d850..b7b10f38fd8 100644 --- a/tokio/src/runtime/local_runtime/runtime.rs +++ b/tokio/src/runtime/local_runtime/runtime.rs @@ -6,6 +6,8 @@ use crate::runtime::{context, Builder, EnterGuard, Handle, BOX_FUTURE_THRESHOLD} use crate::task::JoinHandle; use crate::util::trace::SpawnMeta; +#[cfg(all(tokio_unstable, feature = "usdt"))] +use crate::util::usdt; use std::future::Future; use std::marker::PhantomData; use std::mem; @@ -239,13 +241,12 @@ impl LocalRuntime { ))] let future = crate::runtime::task::trace::Trace::root(future); + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] + let id = crate::runtime::task::Id::next(); + #[cfg(all(tokio_unstable, feature = "usdt"))] + let future = usdt::block_on(future, _meta, id); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task( - future, - "block_on", - _meta, - crate::runtime::task::Id::next().as_u64(), - ); + let future = crate::util::trace::task(future, "block_on", _meta, id.as_u64()); let _enter = self.enter(); diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index ae58ce6da86..f7cb7ef83ce 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -407,7 +407,7 @@ cfg_rt! { #[cfg_attr(target_os = "wasi", allow(unused_imports))] pub(crate) use blocking::spawn_blocking; - cfg_trace! { + cfg_trace_or_usdt! { pub(crate) use blocking::Mandatory; } diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 609b27831e8..2f45bff2d00 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -4,6 +4,8 @@ use crate::runtime::scheduler::CurrentThread; use crate::runtime::{context, EnterGuard, Handle}; use crate::task::JoinHandle; use crate::util::trace::SpawnMeta; +#[cfg(all(tokio_unstable, feature = "usdt"))] +use crate::util::usdt; use std::future::Future; use std::mem; @@ -354,13 +356,12 @@ impl Runtime { ))] let future = super::task::trace::Trace::root(future); + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] + let id = crate::runtime::task::Id::next(); + #[cfg(all(tokio_unstable, feature = "usdt"))] + let future = usdt::block_on(future, _meta, id); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task( - future, - "block_on", - _meta, - crate::runtime::task::Id::next().as_u64(), - ); + let future = crate::util::trace::task(future, "block_on", _meta, id.as_u64()); let _enter = self.enter(); diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index e91e8be4025..367c3f56d66 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -15,7 +15,7 @@ use crate::runtime::context; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; use crate::runtime::task::{Id, Schedule, TaskHarnessScheduleHooks}; -use crate::util::linked_list; +use crate::util::{linked_list, usdt}; use std::num::NonZeroU64; #[cfg(tokio_unstable)] @@ -362,11 +362,13 @@ impl Core { let future = unsafe { Pin::new_unchecked(future) }; let _guard = TaskIdGuard::enter(self.task_id); + let _usdt_guard = crate::util::usdt::PollGuard::new(self.task_id); future.poll(&mut cx) }) }; if res.is_ready() { + usdt::finish_task(self.task_id, usdt::TerminateKind::Success); self.drop_future_or_output(); } diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 6f20d66efc6..1578610c2a8 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -3,6 +3,7 @@ use crate::runtime::task::core::{Cell, Core, Header, Trailer}; use crate::runtime::task::state::{Snapshot, State}; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task}; +use crate::util::usdt; #[cfg(tokio_unstable)] use crate::runtime::TaskMeta; @@ -498,6 +499,8 @@ enum PollFuture { /// Cancels the task and store the appropriate error in the stage field. fn cancel_task(core: &Core) { + usdt::finish_task(core.task_id, usdt::TerminateKind::Cancelled); + // Drop the future from a panic guard. let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { core.drop_future_or_output(); @@ -526,6 +529,8 @@ fn poll_future(core: &Core, cx: Context<'_>) -> Po } impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> { fn drop(&mut self) { + usdt::finish_task(self.core.task_id, usdt::TerminateKind::Panicked); + // If the future panics on poll, we drop it inside the panic // guard. self.core.drop_future_or_output(); diff --git a/tokio/src/runtime/task/waker.rs b/tokio/src/runtime/task/waker.rs index 2a1568fe8f7..113a1aa79aa 100644 --- a/tokio/src/runtime/task/waker.rs +++ b/tokio/src/runtime/task/waker.rs @@ -64,9 +64,37 @@ cfg_not_trace! { } } +cfg_usdt! { + macro_rules! usdt { + ($header:expr, $op:expr) => { + let id = Header::get_id($header).as_u64(); + match $op { + "clone" => crate::util::usdt::waker_clone(id), + "drop" => crate::util::usdt::waker_drop(id), + "wake" => { + crate::util::usdt::waker_wake(id); + crate::util::usdt::waker_drop(id); + } + "wake_by_ref" => crate::util::usdt::waker_wake(id), + _ => {} + } + } + } +} + +cfg_not_usdt! { + macro_rules! usdt { + ($header:expr, $op:expr) => { + // noop + let _ = &$header; + } + } +} + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { let header = NonNull::new_unchecked(ptr as *mut Header); trace!(header, "waker.clone"); + usdt!(header, "clone"); header.as_ref().state.ref_inc(); raw_waker(header) } @@ -74,6 +102,7 @@ unsafe fn clone_waker(ptr: *const ()) -> RawWaker { unsafe fn drop_waker(ptr: *const ()) { let ptr = NonNull::new_unchecked(ptr as *mut Header); trace!(ptr, "waker.drop"); + usdt!(ptr, "drop"); let raw = RawTask::from_raw(ptr); raw.drop_reference(); } @@ -81,6 +110,7 @@ unsafe fn drop_waker(ptr: *const ()) { unsafe fn wake_by_val(ptr: *const ()) { let ptr = NonNull::new_unchecked(ptr as *mut Header); trace!(ptr, "waker.wake"); + usdt!(ptr, "wake"); let raw = RawTask::from_raw(ptr); raw.wake_by_val(); } @@ -89,6 +119,7 @@ unsafe fn wake_by_val(ptr: *const ()) { unsafe fn wake_by_ref(ptr: *const ()) { let ptr = NonNull::new_unchecked(ptr as *mut Header); trace!(ptr, "waker.wake_by_ref"); + usdt!(ptr, "wake_by_ref"); let raw = RawTask::from_raw(ptr); raw.wake_by_ref(); } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 467a700646e..8dfa4aff6f3 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -59,7 +59,10 @@ use std::{future::Future, io, mem}; /// [`spawn`]: Builder::spawn /// [`spawn_blocking`]: Builder::spawn_blocking #[derive(Default, Debug)] -#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +#[cfg_attr( + docsrs, + doc(cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))) +)] pub struct Builder<'a> { name: Option<&'a str>, } diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 544d6a23467..6d5d1cc1920 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -69,8 +69,11 @@ pub struct JoinSet { /// than on the current default runtime. /// /// [`task::Builder`]: crate::task::Builder -#[cfg(all(tokio_unstable, feature = "tracing"))] -#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +#[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] +#[cfg_attr( + docsrs, + doc(cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))) +)] #[must_use = "builders do nothing unless used to spawn a task"] pub struct Builder<'a, T> { joinset: &'a mut JoinSet, @@ -117,8 +120,11 @@ impl JoinSet { /// Ok(()) /// } /// ``` - #[cfg(all(tokio_unstable, feature = "tracing"))] - #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] + #[cfg_attr( + docsrs, + doc(cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))) + )] pub fn build_task(&mut self) -> Builder<'_, T> { Builder { builder: super::Builder::new(), @@ -651,8 +657,11 @@ where // === impl Builder === -#[cfg(all(tokio_unstable, feature = "tracing"))] -#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +#[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] +#[cfg_attr( + docsrs, + doc(cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))) +)] impl<'a, T: 'static> Builder<'a, T> { /// Assigns a name to the task which will be spawned. pub fn name(self, name: &'a str) -> Self { @@ -794,8 +803,11 @@ impl<'a, T: 'static> Builder<'a, T> { // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is // `Debug`. -#[cfg(all(tokio_unstable, feature = "tracing"))] -#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +#[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] +#[cfg_attr( + docsrs, + doc(cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))) +)] impl<'a, T> fmt::Debug for Builder<'a, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("join_set::Builder") diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 021e6277534..8e458101a41 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -9,7 +9,7 @@ use crate::runtime::task::{ use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD}; use crate::sync::AtomicWaker; use crate::util::trace::SpawnMeta; -use crate::util::RcCell; +use crate::util::{usdt, RcCell}; use std::cell::Cell; use std::collections::VecDeque; @@ -441,6 +441,7 @@ cfg_rt! { ))] let future = task::trace::Trace::root(future); let id = task::Id::next(); + usdt::start_task(usdt::TaskKind::Spawn, meta, id, std::mem::size_of::()); let task = crate::util::trace::task(future, "task", meta, id.as_u64()); // safety: we have verified that this is a `LocalRuntime` owned by the current thread @@ -1040,6 +1041,12 @@ impl Context { F::Output: 'static, { let id = crate::runtime::task::Id::next(); + usdt::start_task( + usdt::TaskKind::SpawnLocal, + meta, + id, + std::mem::size_of::(), + ); let future = crate::util::trace::task(future, "local", meta, id.as_u64()); // Safety: called from the thread that owns the `LocalSet` diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index ebd3a1bcecd..b013972e323 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -319,7 +319,7 @@ cfg_rt! { pub use crate::runtime::task::{Id, id, try_id}; - cfg_trace! { + cfg_trace_or_usdt! { mod builder; pub use builder::Builder; } diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 8ed288034d9..a11db214109 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,6 +1,7 @@ use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::task::JoinHandle; use crate::util::trace::SpawnMeta; +use crate::util::usdt; use std::future::Future; @@ -200,6 +201,8 @@ cfg_rt! { ))] let future = task::trace::Trace::root(future); let id = task::Id::next(); + + usdt::start_task(usdt::TaskKind::Spawn, meta, id, std::mem::size_of::()); let task = crate::util::trace::task(future, "task", meta, id.as_u64()); match context::with_current(|handle| handle.spawn(task, id, meta.spawned_at)) { diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index c671fd6a1da..6aeb41e1cd8 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -85,6 +85,10 @@ cfg_rt_multi_thread! { pub(crate) mod trace; +cfg_rt! { + pub(crate) mod usdt; +} + #[cfg(feature = "fs")] pub(crate) mod typeid; diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index b22c2aeb593..f38c0eef11c 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -4,10 +4,10 @@ cfg_rt! { #[derive(Copy, Clone)] pub(crate) struct SpawnMeta<'a> { /// The name of the task - #[cfg(all(tokio_unstable, feature = "tracing"))] + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] pub(crate) name: Option<&'a str>, /// The original size of the future or function being spawned - #[cfg(all(tokio_unstable, feature = "tracing"))] + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] pub(crate) original_size: usize, /// The source code location where the task was spawned. /// @@ -19,7 +19,7 @@ cfg_rt! { impl<'a> SpawnMeta<'a> { /// Create new spawn meta with a name and original size (before possible auto-boxing) - #[cfg(all(tokio_unstable, feature = "tracing"))] + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] #[track_caller] pub(crate) fn new(name: Option<&'a str>, original_size: usize) -> Self { Self { @@ -33,13 +33,13 @@ cfg_rt! { /// Create a new unnamed spawn meta with the original size (before possible auto-boxing) #[track_caller] pub(crate) fn new_unnamed(original_size: usize) -> Self { - #[cfg(not(all(tokio_unstable, feature = "tracing")))] + #[cfg(not(all(tokio_unstable, any(feature = "tracing", feature = "usdt"))))] let _original_size = original_size; Self { - #[cfg(all(tokio_unstable, feature = "tracing"))] + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] name: None, - #[cfg(all(tokio_unstable, feature = "tracing"))] + #[cfg(all(tokio_unstable, any(feature = "tracing", feature = "usdt")))] original_size, spawned_at: crate::runtime::task::SpawnLocation::capture(), _pd: PhantomData, diff --git a/tokio/src/util/usdt/macos.rs b/tokio/src/util/usdt/macos.rs new file mode 100644 index 00000000000..3ca2d3847d3 --- /dev/null +++ b/tokio/src/util/usdt/macos.rs @@ -0,0 +1,190 @@ +//! USDT for macOS. +//! +//! Probe discovery is based on link names +//! +//! To generate the following extern "C" block, run the following: +//! +//! ```sh +//! dtrace -h -s provider.d -o /dev/stdout +//! ``` +//! +//! How to construct the probe names manually: +//! ```rust,ignore +//! let provider = "tokio"; +//! // dtrace convention: `__` implies a `-`, +//! let probe = "task__details" +//! // the probe arguments as dtrace types. +//! let args = ["uint64_t", "char *", "char *", "uint32_t", "uint32_t"]; +//! +//! let mut link_name = format!("__dtrace_probe${provider}${probe}$v1"); +//! for arg in args { +//! link_name.push('$'); +//! link_name.push_str(hex::encode(arg)); +//! } +//! ``` +//! +//! A list of common types: +//! * `*const core::ffi::c_char` -> `char *` -> `63686172202a` +//! * `usize` -> `uintptr_t` -> `75696e747074725f74` +//! * `u64` -> `uint64_t` -> `75696e7436345f74` +//! * `u32` -> `uint32_t` -> `75696e7433325f74` +//! * `u8` -> `uint8_t` -> `75696e74385f74` +//! +//! Since these are regular functions, the arguments must be passed in + +use std::arch::global_asm; + +#[inline(always)] +pub(super) fn task_details(task_id: u64, name: &str, file: &str, line: u32, col: u32) { + unsafe extern "C" { + #[link_name = "__dtrace_isenabled$tokio$task__details$v1"] + fn task_details_enabled() -> i32; + + #[link_name = "__dtrace_probe$tokio$task__details$v1$75696e7436345f74$63686172202a$63686172202a$75696e7433325f74$75696e7433325f74"] + #[cold] + fn __task_details( + task_id: u64, + name: *const core::ffi::c_char, + file: *const core::ffi::c_char, + line: u32, + col: u32, + ); + } + + if unsafe { task_details_enabled() } != 0 { + // add nul bytes + let name0 = [name.as_bytes(), b"\0"].concat(); + let file0 = [file.as_bytes(), b"\0"].concat(); + + unsafe { + __task_details( + task_id, + name0.as_ptr() as *const std::ffi::c_char, + file0.as_ptr() as *const std::ffi::c_char, + line, + col, + ); + } + } +} + +#[inline(always)] +pub(super) fn task_start(task_id: u64, spawned: u8, size: usize, original_size: usize) { + extern "C" { + #[link_name = "__dtrace_probe$tokio$task__start$v1$75696e7436345f74$75696e74385f74$75696e747074725f74$75696e747074725f74"] + fn __task_start(task_id: u64, kind: u8, size: usize, original_size: usize); + } + + unsafe { + __task_start(task_id, spawned, size, original_size); + } +} + +#[inline(always)] +pub(super) fn task_poll_start(task_id: u64) { + extern "C" { + #[link_name = "__dtrace_isenabled$tokio$task__poll__start$v1"] + fn task_poll_start_enabled() -> i32; + + #[link_name = "__dtrace_probe$tokio$task__poll__start$v1$75696e7436345f74"] + fn __task_poll_start(task_id: core::ffi::c_ulonglong); + } + + #[inline(never)] + fn probe_inner(task_id: u64) { + unsafe { __task_poll_start(task_id) } + } + + if unsafe { task_poll_start_enabled() } != 0 { + probe_inner(task_id); + } +} + +#[inline(always)] +pub(super) fn task_poll_end(task_id: u64) { + extern "C" { + #[link_name = "__dtrace_isenabled$tokio$task__poll__end$v1"] + fn task_poll_end_enabled() -> i32; + + #[link_name = "__dtrace_probe$tokio$task__poll__end$v1$75696e7436345f74"] + fn __task_poll_end(task_id: core::ffi::c_ulonglong); + } + + #[inline(never)] + fn probe_inner(task_id: u64) { + unsafe { __task_poll_end(task_id) } + } + + if unsafe { task_poll_end_enabled() } != 0 { + probe_inner(task_id); + } +} + +#[inline(always)] +pub(crate) fn task_terminate(task_id: u64, reason: u8) { + extern "C" { + #[link_name = "__dtrace_isenabled$tokio$task__terminate$v1"] + fn task_terminate_enabled() -> i32; + + #[link_name = "__dtrace_probe$tokio$task__terminate$v1$75696e7436345f74$75696e74385f74"] + fn __task_terminate(task_id: core::ffi::c_ulonglong, reason: core::ffi::c_uchar); + } + + #[inline(never)] + fn probe_inner(task_id: u64, reason: u8) { + unsafe { __task_terminate(task_id, reason) } + } + + if unsafe { task_terminate_enabled() } != 0 { + probe_inner(task_id, reason); + } +} + +#[inline(always)] +pub(crate) fn waker_clone(task_id: u64) { + extern "C" { + #[link_name = "__dtrace_probe$tokio$waker__clone$v1$75696e7436345f74"] + fn __waker_clone(task_id: core::ffi::c_ulonglong); + } + + unsafe { __waker_clone(task_id) } +} + +#[inline(always)] +pub(crate) fn waker_drop(task_id: u64) { + extern "C" { + #[link_name = "__dtrace_probe$tokio$waker__drop$v1$75696e7436345f74"] + fn __waker_drop(task_id: core::ffi::c_ulonglong); + } + + unsafe { __waker_drop(task_id) } +} + +#[inline(always)] +pub(crate) fn waker_wake(task_id: u64) { + extern "C" { + #[link_name = "__dtrace_probe$tokio$waker__wake$v1$75696e7436345f74"] + fn __waker_wake(task_id: core::ffi::c_ulonglong); + } + + unsafe { __waker_wake(task_id) } +} + +unsafe extern "C" { + #[link_name = "__dtrace_stability$tokio$v1$1_1_0_1_1_0_1_1_0_1_1_0_1_1_0"] + fn stability(); + + #[link_name = "__dtrace_typedefs$tokio$v2"] + fn typedefs(); +} + +global_asm!( + ".reference {typedefs} + .reference {stability}", + typedefs = sym typedefs, + stability = sym stability, +); + +pub(crate) fn register_probes() -> std::io::Result<()> { + Ok(()) +} diff --git a/tokio/src/util/usdt/mod.rs b/tokio/src/util/usdt/mod.rs new file mode 100644 index 00000000000..4e876887a4d --- /dev/null +++ b/tokio/src/util/usdt/mod.rs @@ -0,0 +1,146 @@ +use crate::task::Id; +use crate::util::trace::SpawnMeta; + +#[repr(u8)] +pub(crate) enum TaskKind { + #[allow(dead_code)] + BlockOn = 0, + Spawn = 1, + SpawnLocal = 2, +} + +#[repr(u8)] +pub(crate) enum TerminateKind { + Success = 0, + Cancelled = 1, + Panicked = 2, +} + +cfg_usdt! { + #[cfg(all(target_os = "macos", any(target_arch = "x86_64", target_arch = "aarch64")))] + #[path = "macos.rs"] + mod usdt_impl; + + #[cfg(all(target_os = "linux", any(target_arch = "x86_64", target_arch = "aarch64")))] + #[path = "stapsdt.rs"] + mod usdt_impl; + + #[cfg(not(any( + all(target_os = "macos", any(target_arch = "x86_64", target_arch = "aarch64")), + all(target_os = "linux", any(target_arch = "x86_64", target_arch = "aarch64")), + )))] + compile_error!( + "The `usdt` feature is only currently supported on linux/macos, on `aarch64` and `x86_64`." + ); + + use core::{ + pin::Pin, + task::{Context, Poll}, + }; + use pin_project_lite::pin_project; + use std::future::Future; + + + pub(crate) use usdt_impl::{waker_clone, waker_wake, waker_drop, register_probes}; + + #[inline(never)] + pub(crate) fn start_task(kind: TaskKind, meta: SpawnMeta<'_>, id: Id, size: usize) { + usdt_impl::task_start(id.as_u64(), kind as u8, size, meta.original_size); + usdt_impl::task_details( + id.as_u64(), + meta.name.unwrap_or_default(), + meta.spawned_at.0.file(), + meta.spawned_at.0.line(), + meta.spawned_at.0.column(), + ); + } + + #[inline] + pub(crate) fn finish_task(id: Id, reason: TerminateKind) { + usdt_impl::task_terminate(id.as_u64(), reason as u8); + } + + /// Mark a task as polling in USDT traces + pub(crate) struct PollGuard(Id); + + impl PollGuard { + #[inline] + pub(crate) fn new(id: Id) -> Self { + usdt_impl::task_poll_start(id.as_u64()); + PollGuard(id) + } + } + + impl Drop for PollGuard { + #[inline] + fn drop(&mut self) { + usdt_impl::task_poll_end(self.0.as_u64()); + } + } + + #[inline] + pub(crate) fn block_on(task: F, meta: SpawnMeta<'_>, id: Id) -> BlockOn { + start_task(TaskKind::BlockOn, meta, id, std::mem::size_of::()); + BlockOn { inner: task, task_id: id } + } + + pin_project! { + #[derive(Debug, Clone)] + pub(crate) struct BlockOn { + #[pin] + inner: F, + task_id: Id, + } + } + + impl Future for BlockOn { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + pub(crate) struct Guard(Id); + + impl Drop for Guard { + fn drop(&mut self) { + usdt_impl::task_poll_end(self.0.as_u64()); + finish_task(self.0, TerminateKind::Panicked); + } + } + + let this = self.project(); + + usdt_impl::task_poll_start(this.task_id.as_u64()); + let guard = Guard(*this.task_id); + let res = this.inner.poll(cx); + drop(guard); + usdt_impl::task_poll_end(this.task_id.as_u64()); + + if res.is_ready() { + finish_task(*this.task_id, TerminateKind::Success); + } + + res + } + } +} + +cfg_not_usdt! { + #[inline] + pub(crate) fn start_task(_kind: TaskKind, _meta: SpawnMeta<'_>, _id: Id, _size: usize) {} + + #[inline] + pub(crate) fn finish_task(_id: Id, _reason: TerminateKind) {} + + /// Mark a task as polling in USDT traces + pub(crate) struct PollGuard(); + + impl PollGuard { + #[inline] + pub(crate) fn new(_id: Id) -> Self { + PollGuard() + } + } + + pub(crate) fn register_probes() -> std::io::Result<()> { + Ok(()) + } +} diff --git a/tokio/src/util/usdt/provider.d b/tokio/src/util/usdt/provider.d new file mode 100644 index 00000000000..2fcac084cd3 --- /dev/null +++ b/tokio/src/util/usdt/provider.d @@ -0,0 +1,12 @@ +provider tokio { + probe task__details(uint64_t, char*, char*, uint32_t, uint32_t); + probe task__start(uint64_t, uint8_t, uintptr_t, uintptr_t); + probe task__terminate(uint64_t, uint8_t); + + probe task__poll__start(uint64_t); + probe task__poll__end(uint64_t); + + probe task__waker__clone(uint64_t); + probe task__waker__drop(uint64_t); + probe task__waker__wake(uint64_t); +}; diff --git a/tokio/src/util/usdt/stapsdt.rs b/tokio/src/util/usdt/stapsdt.rs new file mode 100644 index 00000000000..cad277c3a29 --- /dev/null +++ b/tokio/src/util/usdt/stapsdt.rs @@ -0,0 +1,300 @@ +//! USDT support on linux, and other platforms that use the SystemTap SDT V3 system. +//! +//! Implementer details: +//! +//! ## Calling probes +//! +//! To update or add new probes, you should be able to copy an existing probe for inspiration. +//! For a guide on the register formats, see: +//! * +//! * +//! +//! There should be no need to modify the macros provided if you just want to add a new probe. +//! +//! ## Semaphores +//! +//! Probes can either have a semaphore (with `semaphore = sym $ident`), +//! or they can have no semaphore (with `semaphore = const 0`). +//! +//! A semaphore lets you know if a program is attached to the given probe. +//! This can help offload any expensive setup, such as in `task_details` which needs to +//! allocate a null-termianted string. +//! +//! If you have some expensive setup needed to call the probe, consider moving it +//! into a seperate function and marking it as `#[cold]` or `#[inline(never)]`. +//! This ensures the hot path (probe is disabled) doesn't need to skip over a large +//! number of instructions, and the branch predictor can make better predictions. +//! +//! ## Avoid monomorphisation +//! +//! We also use semaphores to ensure that probe callsites are **NOT** monomorphised. +//! When a probe is monomorphised in many callsites, it has been observed to +//! produce weird results after linking where the probe addresses are not correct. +//! +//! Be very mindful of the above when you add a probe. + +#[cfg(target_arch = "x86_64")] +macro_rules! call_probe { + ( + $name:literal, + x86_64: $x86_64:literal, + aarch64: $aarch64:literal, + $($args:tt)* + ) => { + // + std::arch::asm!( + "990: nop + .pushsection .note.stapsdt, \"\", \"note\" + .balign 4 + .4byte 992f-991f, 994f-993f, 3 + 991: + .asciz \"stapsdt\" + 992: + .balign 4 + 993: + .8byte 990b + .8byte _.stapsdt.base + .8byte {semaphore} + .asciz \"tokio\"", + concat!(".asciz \"", $name, "\""), + concat!(".asciz \"", $x86_64, "\""), + "994: + .balign 4 + .popsection", + $($args)* + options(att_syntax, readonly, nostack, preserves_flags), + ); + }; +} + +#[cfg(target_arch = "aarch64")] +macro_rules! call_probe { + ( + $name:literal, + x86_64: $x86_64:literal, + aarch64: $aarch64:literal, + $($args:tt)* + ) => { + // + std::arch::asm!( + "990: nop + .pushsection .note.stapsdt, \"\", \"note\" + .balign 4 + .4byte 992f-991f, 994f-993f, 3 + 991: + .asciz \"stapsdt\" + 992: + .balign 4 + 993: + .8byte 990b + .8byte _.stapsdt.base + .8byte {semaphore} + .asciz \"tokio\"", + concat!(".asciz \"", $name, "\""), + concat!(".asciz \"", $aarch64, "\""), + "994: + .balign 4 + .popsection", + $($args)* + options(readonly, nostack, preserves_flags), + ); + }; +} + +macro_rules! declare_semaphore { + ($name:ident) => { + extern "C" { + static mut $name: u16; + } + + std::arch::global_asm!( + ".ifndef {semaphore} + .pushsection .probes, \"aw\", \"progbits\" + .weak {semaphore} + .hidden {semaphore} + {semaphore}: + .zero 2 + .type {semaphore}, @object + .size {semaphore}, 2 + .popsection + .endif", + semaphore = sym $name + ); + }; +} + +declare_semaphore!(__usdt_sema_tokio_task__details); + +// `inline(always)` is ok here since we only inline into `super::start_task` which is `inline(never)` +#[inline(always)] +pub(super) fn task_details(task_id: u64, name: &str, file: &str, line: u32, col: u32) { + #[cold] + fn task_details_inner(task_id: u64, name: &str, file: &str, line: u32, col: u32) { + // add nul bytes + let name0 = [name.as_bytes(), b"\0"].concat(); + let file0 = [file.as_bytes(), b"\0"].concat(); + + unsafe { + call_probe!( + "task-details", + x86_64: "8@{task_id:r} 8@{name} 8@{file} 4@{line:e} 4@{col:e}", + aarch64: "8@{task_id} 8@{name} 8@{file} 4@{line:w} 4@{col:w}", + semaphore = sym __usdt_sema_tokio_task__details, + task_id = in(reg) task_id, + name = in(reg) name0.as_ptr(), + file = in(reg) file0.as_ptr(), + line = in(reg) line, + col = in(reg) col, + ); + } + } + + if unsafe { __usdt_sema_tokio_task__details } != 0 { + task_details_inner(task_id, name, file, line, col); + } +} + +// `inline(always)` is ok here since we only inline into `super::start_task` which is `inline(never)` +#[inline(always)] +pub(super) fn task_start(task_id: u64, kind: u8, size: usize, original_size: usize) { + unsafe { + call_probe!( + "task-start", + x86_64: "8@{task_id:r} 1@{kind:l} 8@{size} 8@{original_size}", + aarch64: "8@{task_id} 1@{kind:w} 8@{size} 8@{original_size}", + semaphore = const 0, + task_id = in(reg) task_id, + kind = in(reg) kind as u32, + size = in(reg) size, + original_size = in(reg) original_size, + ); + } +} + +declare_semaphore!(__usdt_sema_tokio_task__poll__start); + +#[inline(always)] +pub(super) fn task_poll_start(task_id: u64) { + // `inline(never)` since poll_start probes are monomorphised otherwise + #[inline(never)] + fn probe_inner(task_id: u64) { + unsafe { + call_probe!( + "task-poll-start", + x86_64: "8@{task_id:r}", + aarch64: "8@{task_id}", + semaphore = sym __usdt_sema_tokio_task__poll__start, + task_id = in(reg) task_id, + ); + } + } + + if unsafe { __usdt_sema_tokio_task__poll__start } != 0 { + probe_inner(task_id); + } +} + +declare_semaphore!(__usdt_sema_tokio_task__poll__end); + +#[inline(always)] +pub(super) fn task_poll_end(task_id: u64) { + // `inline(never)` since poll_end probes are monomorphised otherwise + #[inline(never)] + fn probe_inner(task_id: u64) { + unsafe { + call_probe!( + "task-poll-end", + x86_64: "8@{task_id:r}", + aarch64: "8@{task_id}", + semaphore = sym __usdt_sema_tokio_task__poll__end, + task_id = in(reg) task_id, + ); + } + } + + if unsafe { __usdt_sema_tokio_task__poll__end } != 0 { + probe_inner(task_id); + } +} + +declare_semaphore!(__usdt_sema_tokio_task__terminate); + +#[inline(always)] +pub(super) fn task_terminate(task_id: u64, reason: u8) { + // `inline(never)` since terminate probes are monomorphised otherwise + #[inline(never)] + fn probe_inner(task_id: u64, reason: u8) { + unsafe { + call_probe!( + "task-terminate", + x86_64: "8@{task_id:r} 1@{reason:l}", + aarch64: "8@{task_id} 1@{reason:w}", + semaphore = sym __usdt_sema_tokio_task__terminate, + task_id = in(reg) task_id, + reason = in(reg) reason as u32, + ); + } + } + + if unsafe { __usdt_sema_tokio_task__terminate } != 0 { + probe_inner(task_id, reason); + } +} + +// `inline(always)` is ok here since wakers are polymorphised. +#[inline(always)] +pub(crate) fn waker_clone(task_id: u64) { + unsafe { + call_probe!( + "waker-clone", + x86_64: "8@{task_id:r}", + aarch64: "8@{task_id}", + semaphore = const 0, + task_id = in(reg) task_id, + ); + } +} + +// `inline(always)` is ok here since wakers are polymorphised. +#[inline(always)] +pub(crate) fn waker_drop(task_id: u64) { + unsafe { + call_probe!( + "waker-drop", + x86_64: "8@{task_id:r}", + aarch64: "8@{task_id}", + semaphore = const 0, + task_id = in(reg) task_id, + ); + } +} + +// `inline(always)` is ok here since wakers are polymorphised. +#[inline(always)] +pub(crate) fn waker_wake(task_id: u64) { + unsafe { + call_probe!( + "waker-wake", + x86_64: "8@{task_id:r}", + aarch64: "8@{task_id}", + semaphore = const 0, + task_id = in(reg) task_id, + ); + } +} + +std::arch::global_asm!( + ".ifndef _.stapsdt.base + .pushsection .stapsdt.base, \"aGR\", \"progbits\", .stapsdt.base, comdat + .weak _.stapsdt.base + .hidden _.stapsdt.base + _.stapsdt.base: + .space 1 + .size _.stapsdt.base, 1 + .popsection + .endif", +); + +pub(crate) fn register_probes() -> std::io::Result<()> { + Ok(()) +}