From 798c9717df56f3e4eceb35520c19533ec04fb461 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 18 Feb 2022 15:27:52 -0800 Subject: [PATCH] rt: unhandled panic config for current thread rt Allows the user to configure the runtime's behavior when a spawned task panics. Currently, the panic is propagated to the JoinHandle and the runtime resumes. This patch lets the user set the runtime to shutdown on unhandled panic. So far, this is only implemented for the current-thread runtime. Refs: #4516 --- tokio/src/runtime/basic_scheduler.rs | 88 ++++++++++++++++++++++------ tokio/src/runtime/builder.rs | 33 ++++++++++- tokio/src/runtime/mod.rs | 3 + tokio/src/runtime/task/harness.rs | 19 ++++-- tokio/src/runtime/task/mod.rs | 5 ++ tokio/tests/rt_basic.rs | 85 +++++++++++++++++++++++++++ 6 files changed, 210 insertions(+), 23 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 401f55b3f2f..94988643966 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -57,6 +57,10 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + + /// True if a task panicked without being handled and the runtime is + /// configured to shutdown on unhandled panic. + unhandled_panic: bool, } #[derive(Clone)] @@ -64,6 +68,19 @@ pub(crate) struct Spawner { shared: Arc, } +/// Configuration settings passed in from the runtime builder. +pub(crate) struct Config { + /// Callback for a worker parking itself + pub(crate) before_park: Option, + + /// Callback for a worker unparking itself + pub(crate) after_unpark: Option, + + #[cfg(tokio_unstable)] + /// How to respond to unhandled task panics. + pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, +} + /// Scheduler state shared between threads. struct Shared { /// Remote run queue. None if the `Runtime` has been dropped. @@ -78,11 +95,8 @@ struct Shared { /// Indicates whether the blocked on thread was woken. woken: AtomicBool, - /// Callback for a worker parking itself - before_park: Option, - - /// Callback for a worker unparking itself - after_unpark: Option, + /// Scheduler configuration options + config: Config, /// Keeps track of various runtime metrics. scheduler_metrics: SchedulerMetrics, @@ -117,11 +131,7 @@ const REMOTE_FIRST_INTERVAL: u8 = 31; scoped_thread_local!(static CURRENT: Context); impl BasicScheduler { - pub(crate) fn new( - driver: Driver, - before_park: Option, - after_unpark: Option, - ) -> BasicScheduler { + pub(crate) fn new(driver: Driver, config: Config) -> BasicScheduler { let unpark = driver.unpark(); let spawner = Spawner { @@ -130,8 +140,7 @@ impl BasicScheduler { owned: OwnedTasks::new(), unpark, woken: AtomicBool::new(false), - before_park, - after_unpark, + config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: WorkerMetrics::new(), }), @@ -143,6 +152,7 @@ impl BasicScheduler { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), + unhandled_panic: false, }))); BasicScheduler { @@ -157,6 +167,7 @@ impl BasicScheduler { &self.spawner } + #[track_caller] pub(crate) fn block_on(&self, future: F) -> F::Output { pin!(future); @@ -296,7 +307,7 @@ impl Context { fn park(&self, mut core: Box) -> Box { let mut driver = core.driver.take().expect("driver missing"); - if let Some(f) = &self.spawner.shared.before_park { + if let Some(f) = &self.spawner.shared.config.before_park { // Incorrect lint, the closures are actually different types so `f` // cannot be passed as an argument to `enter`. #[allow(clippy::redundant_closure)] @@ -319,7 +330,7 @@ impl Context { core.metrics.returned_from_park(); } - if let Some(f) = &self.spawner.shared.after_unpark { + if let Some(f) = &self.spawner.shared.config.after_unpark { // Incorrect lint, the closures are actually different types so `f` // cannot be passed as an argument to `enter`. #[allow(clippy::redundant_closure)] @@ -460,6 +471,35 @@ impl Schedule for Arc { } }); } + + cfg_unstable! { + fn unhandled_panic(&self) { + use crate::runtime::UnhandledPanic; + + match self.config.unhandled_panic { + UnhandledPanic::Ignore => { + // Do nothing + } + UnhandledPanic::ShutdownRuntime => { + // This hook is only called from within the runtime, so + // `CURRENT` should match with `&self`, i.e. there is no + // opportunity for a nested scheduler to be called. + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { + let mut core = cx.core.borrow_mut(); + + // If `None`, the runtime is shutting down, so there is no need to signal shutdown + if let Some(core) = core.as_mut() { + core.unhandled_panic = true; + self.owned.close_and_shutdown_all(); + } + } + _ => panic!("runtime core not set in CURRENT thread-local"), + }) + } + } + } + } } impl Wake for Shared { @@ -484,8 +524,9 @@ struct CoreGuard<'a> { } impl CoreGuard<'_> { + #[track_caller] fn block_on(self, future: F) -> F::Output { - self.enter(|mut core, context| { + let ret = self.enter(|mut core, context| { let _enter = crate::runtime::enter(false); let waker = context.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); @@ -501,11 +542,16 @@ impl CoreGuard<'_> { core = c; if let Ready(v) = res { - return (core, v); + return (core, Some(v)); } } for _ in 0..MAX_TASKS_PER_TICK { + // Make sure we didn't hit an unhandled_panic + if core.unhandled_panic { + return (core, None); + } + // Get and increment the current tick let tick = core.tick; core.tick = core.tick.wrapping_add(1); @@ -539,7 +585,15 @@ impl CoreGuard<'_> { // pending I/O events. core = context.park_yield(core); } - }) + }); + + match ret { + Some(ret) => ret, + None => { + // `block_on` panicked. + panic!("a spawned task panicked and the runtime is configured to shutdown on unhandled panic"); + } + } } /// Enters the scheduler context. This sets the queue and other necessary diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 91c365fd516..b040fc50458 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -78,6 +78,17 @@ pub struct Builder { /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option, + + #[cfg(tokio_unstable)] + pub(super) unhandled_panic: UnhandledPanic, +} + +cfg_unstable! { + #[derive(Debug, Clone)] + pub enum UnhandledPanic { + Ignore, + ShutdownRuntime, + } } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; @@ -145,6 +156,9 @@ impl Builder { after_unpark: None, keep_alive: None, + + #[cfg(tokio_unstable)] + unhandled_panic: UnhandledPanic::Ignore, } } @@ -554,7 +568,15 @@ impl Builder { self } + cfg_unstable! { + pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self { + self.unhandled_panic = behavior; + self + } + } + fn build_basic_runtime(&mut self) -> io::Result { + use crate::runtime::basic_scheduler::Config; use crate::runtime::{BasicScheduler, Kind}; let (driver, resources) = driver::Driver::new(self.get_cfg())?; @@ -563,8 +585,15 @@ impl Builder { // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. - let scheduler = - BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone()); + let scheduler = BasicScheduler::new( + driver, + Config { + before_park: self.before_park.clone(), + after_unpark: self.after_unpark.clone(), + #[cfg(tokio_unstable)] + unhandled_panic: self.unhandled_panic.clone(), + }, + ); let spawner = Spawner::Basic(scheduler.spawner().clone()); // Blocking pool diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 7c381b0bbd0..c2986d31e92 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -212,6 +212,9 @@ cfg_rt! { mod builder; pub use self::builder::Builder; + cfg_unstable! { + pub use self::builder::UnhandledPanic; + } pub(crate) mod context; pub(crate) mod driver; diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 261dccea415..850be08ca20 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -100,7 +100,7 @@ where let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); - let res = poll_future(&self.core().stage, cx); + let res = poll_future(&self.core().stage, &self.core().scheduler, cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -450,7 +450,11 @@ fn cancel_task(stage: &CoreStage) { /// Polls the future. If the future completes, the output is written to the /// stage field. -fn poll_future(core: &CoreStage, cx: Context<'_>) -> Poll<()> { +fn poll_future( + core: &CoreStage, + scheduler: &S, + cx: Context<'_>, +) -> Poll<()> { // Poll the future. let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { struct Guard<'a, T: Future> { @@ -473,13 +477,20 @@ fn poll_future(core: &CoreStage, cx: Context<'_>) -> Poll<()> { let output = match output { Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), - Err(panic) => Err(JoinError::panic(panic)), + Err(panic) => { + scheduler.unhandled_panic(); + Err(JoinError::panic(panic)) + } }; // Catch and ignore panics if the future panics on drop. - let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { core.store_output(output); })); + if res.is_err() { + scheduler.unhandled_panic(); + } + Poll::Ready(()) } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 2a492dc985d..e06df9cd181 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -234,6 +234,11 @@ pub(crate) trait Schedule: Sync + Sized + 'static { fn yield_now(&self, task: Notified) { self.schedule(task); } + + /// Polling the task resulted in a panic. Should the runtime shutdown? + fn unhandled_panic(&self) { + // By default, do nothing. This maintains the 1.0 behavior. + } } cfg_rt! { diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index cc6ac677280..4bc0ba444c4 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -288,6 +288,91 @@ fn timeout_panics_when_no_time_handle() { }); } +#[cfg(tokio_unstable)] +mod unstable { + use super::*; + use tokio::runtime::{Builder, UnhandledPanic}; + + #[test] + #[should_panic( + expected = "a spawned task panicked and the runtime is configured to shutdown on unhandled panic" + )] + fn shutdown_on_panic() { + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + } + + #[test] + fn spawns_do_nothing() { + use std::sync::Arc; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt1 = Arc::new(rt); + let rt2 = rt1.clone(); + + let _ = std::thread::spawn(move || { + rt2.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + }) + .join(); + + let task = rt1.spawn(async {}); + let res = futures::executor::block_on(task); + assert!(res.is_err()); + } + + #[test] + fn shutdown_all_concurrent_block_on() { + use std::sync::Arc; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt = Arc::new(rt); + let mut ths = vec![]; + + for _ in 0..1 { + let rt = rt.clone(); + ths.push(std::thread::spawn(move || { + rt.block_on(async { + futures::future::pending::<()>().await; + }); + })); + } + + std::thread::sleep(std::time::Duration::from_millis(50)); + + rt.spawn(async { + panic!("boom"); + }); + + for th in ths { + assert!(th.join().is_err()); + } + } +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all()