diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 639386ce2..0c356416a 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -143,6 +143,10 @@ pub struct ThreadPoolBuilder { /// "depth-first" fashion. If true, they will do a "breadth-first" /// fashion. Depth-first is the default. breadth_first: bool, + + /// If true, dispatching a job to a different rayon [`ThreadPool`] will block. + /// If false, the thread will continue processing other jobs from its pool. + disable_cross_pool_optimization: bool, } /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead. @@ -179,6 +183,7 @@ impl Default for ThreadPoolBuilder { exit_handler: None, spawn_handler: DefaultSpawn, breadth_first: false, + disable_cross_pool_optimization: false, } } } @@ -362,6 +367,7 @@ impl ThreadPoolBuilder { start_handler: self.start_handler, exit_handler: self.exit_handler, breadth_first: self.breadth_first, + disable_cross_pool_optimization: self.disable_cross_pool_optimization, } } @@ -519,6 +525,19 @@ impl ThreadPoolBuilder { self.breadth_first } + /// Disables an optimization that allows a pool thread to continue processing + /// other jobs when it does a "blocking" dispatch to a different [`ThreadPool`]. + /// + /// Note that this could lead to deadlocks if all pool threads block. + pub fn disable_cross_pool_optimization(mut self) -> Self { + self.disable_cross_pool_optimization = true; + self + } + + fn get_disable_cross_pool_optimization(&self) -> bool { + self.disable_cross_pool_optimization + } + /// Takes the current thread start callback, leaving `None`. fn take_start_handler(&mut self) -> Option> { self.start_handler.take() @@ -685,6 +704,7 @@ impl fmt::Debug for ThreadPoolBuilder { ref exit_handler, spawn_handler: _, ref breadth_first, + ref disable_cross_pool_optimization, } = *self; // Just print `Some()` or `None` to the debug @@ -708,6 +728,10 @@ impl fmt::Debug for ThreadPoolBuilder { .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) .field("breadth_first", &breadth_first) + .field( + "disable_cross_pool_optimization", + disable_cross_pool_optimization, + ) .finish() } } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index af438a644..b2db4dee2 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -153,6 +153,8 @@ pub(super) struct Registry { // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) // and that job will keep the pool alive. terminate_latch: CountLatch, + + disable_cross_pool_optimization: bool, } /// //////////////////////////////////////////////////////////////////////// @@ -241,6 +243,7 @@ impl Registry { panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), + disable_cross_pool_optimization: builder.get_disable_cross_pool_optimization(), }); // If we return early or panic, make sure to terminate existing threads. @@ -311,6 +314,10 @@ impl Registry { } } + fn disable_cross_pool_optimization(&self) -> bool { + self.disable_cross_pool_optimization + } + pub(super) fn num_threads(&self) -> usize { self.thread_infos.len() } @@ -416,7 +423,11 @@ impl Registry { { unsafe { let worker_thread = WorkerThread::current(); - if worker_thread.is_null() { + if worker_thread.is_null() + || (*worker_thread) + .registry() + .disable_cross_pool_optimization() + { self.in_worker_cold(op) } else if (*worker_thread).registry().id() != self.id() { self.in_worker_cross(&*worker_thread, op) @@ -439,7 +450,12 @@ impl Registry { LOCK_LATCH.with(|l| { // This thread isn't a member of *any* thread pool, so just block. - debug_assert!(WorkerThread::current().is_null()); + debug_assert!( + WorkerThread::current().is_null() + || (*WorkerThread::current()) + .registry() + .disable_cross_pool_optimization() + ); let job = StackJob::new( |injected| { let worker_thread = WorkerThread::current();