diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index eb9cc232e5bf1..7dc04822bfd25 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,14 +9,14 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -multi-threaded = ["dep:async-channel", "dep:concurrent-queue"] +multi-threaded = ["dep:event-listener", "dep:concurrent-queue"] [dependencies] futures-lite = "2.0.1" async-executor = "1.11" -async-channel = { version = "2.2.0", optional = true } async-io = { version = "2.0.0", optional = true } concurrent-queue = { version = "2.0.0", optional = true } +event-listener = { version = "5.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 74b3045e782ba..890dc9adf0b93 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -9,6 +9,7 @@ use std::{ use async_executor::FallibleTask; use concurrent_queue::ConcurrentQueue; +use event_listener::{Event, listener}; use futures_lite::FutureExt; use crate::{ @@ -111,7 +112,7 @@ pub struct TaskPool { // The inner state of the pool. threads: Vec>, - shutdown_tx: async_channel::Sender<()>, + shutdown: Arc, } impl TaskPool { @@ -131,8 +132,7 @@ impl TaskPool { } fn new_internal(builder: TaskPoolBuilder) -> Self { - let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - + let shutdown = Arc::new(Event::new()); let executor = Arc::new(async_executor::Executor::new()); let num_threads = builder @@ -142,7 +142,7 @@ impl TaskPool { let threads = (0..num_threads) .map(|i| { let ex = Arc::clone(&executor); - let shutdown_rx = shutdown_rx.clone(); + let shutdown = shutdown.clone(); let thread_name = if let Some(thread_name) = builder.thread_name.as_deref() { format!("{thread_name} ({i})") @@ -173,11 +173,10 @@ impl TaskPool { local_executor.tick().await; } }; - block_on(ex.run(tick_forever.or(shutdown_rx.recv()))) + listener!(shutdown => shutdown_listener); + block_on(ex.run(tick_forever.or(shutdown_listener))); }); - if let Ok(value) = res { - // Use unwrap_err because we expect a Closed error - value.unwrap_err(); + if res.is_ok() { break; } } @@ -190,7 +189,7 @@ impl TaskPool { Self { executor, threads, - shutdown_tx, + shutdown, } } @@ -581,7 +580,7 @@ impl Default for TaskPool { impl Drop for TaskPool { fn drop(&mut self) { - self.shutdown_tx.close(); + self.shutdown.notify(usize::MAX); let panicking = thread::panicking(); for join_handle in self.threads.drain(..) {