From 3c94f0f2e26179575a96df28d71b019c532ba0bd Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Thu, 19 Dec 2024 21:30:34 -0500 Subject: [PATCH] Add a `wait()` operator for an actor (#304) This allows someone to wait for the actor's exit without waiting on the JoinHandle --- ractor/Cargo.toml | 2 +- ractor/src/actor/actor_cell.rs | 19 +++++++++++++ ractor/src/actor/actor_properties.rs | 10 +++++++ ractor/src/actor/tests/mod.rs | 41 ++++++++++++++++++++++++++++ ractor_cluster/Cargo.toml | 2 +- ractor_cluster_derive/Cargo.toml | 2 +- 6 files changed, 73 insertions(+), 3 deletions(-) diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 6d484eb..2201188 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor" -version = "0.14.1" +version = "0.14.2" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "A actor framework for Rust" documentation = "https://docs.rs/ractor" diff --git a/ractor/src/actor/actor_cell.rs b/ractor/src/actor/actor_cell.rs index 2106978..565e58c 100644 --- a/ractor/src/actor/actor_cell.rs +++ b/ractor/src/actor/actor_cell.rs @@ -451,6 +451,25 @@ impl ActorCell { } } + /// Wait for the actor to exit, optionally within a timeout + /// + /// * `timeout`: If supplied, the amount of time to wait before + /// returning an error and cancelling the wait future. + /// + /// IMPORTANT: If the timeout is hit, the actor is still running. + /// You should wait again for its exit. + pub async fn wait( + &self, + timeout: Option, + ) -> Result<(), crate::concurrency::Timeout> { + if let Some(to) = timeout { + crate::concurrency::timeout(to, self.inner.wait()).await + } else { + self.inner.wait().await; + Ok(()) + } + } + /// Send a supervisor event to the supervisory port /// /// * `message` - The [SupervisionEvent] to send to the supervisory port diff --git a/ractor/src/actor/actor_properties.rs b/ractor/src/actor/actor_properties.rs index 2b33eb7..af8e650 100644 --- a/ractor/src/actor/actor_properties.rs +++ b/ractor/src/actor/actor_properties.rs @@ -225,6 +225,12 @@ impl ActorProperties { Ok(()) } + /// Wait for the actor to exit + pub(crate) async fn wait(&self) { + let rx = self.wait_handler.notified(); + rx.await; + } + /// Send the kill signal, threading in a OneShot sender which notifies when the shutdown is completed pub(crate) async fn send_signal_and_wait( &self, @@ -239,5 +245,9 @@ impl ActorProperties { pub(crate) fn notify_stop_listener(&self) { self.wait_handler.notify_waiters(); + // make sure that any future caller immediately returns by pre-storing + // a notify permit (i.e. the actor stops, but you are only start waiting + // after the actor has already notified it's dead.) + self.wait_handler.notify_one(); } } diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index 5be425c..f97e094 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -1156,3 +1156,44 @@ async fn runtime_message_typing() { actor.stop(None); handle.await.unwrap(); } + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn wait_for_death() { + struct TestActor; + + #[cfg_attr(feature = "async-trait", crate::async_trait)] + impl Actor for TestActor { + type Msg = EmptyMessage; + type Arguments = (); + type State = (); + + async fn pre_start( + &self, + _this_actor: crate::ActorRef, + _: (), + ) -> Result { + Ok(()) + } + + async fn post_stop( + &self, + _myself: ActorRef, + _: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + crate::concurrency::sleep(Duration::from_millis(10)).await; + Ok(()) + } + } + + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to start test actor"); + + actor.stop(None); + assert!(actor.wait(Some(Duration::from_millis(100))).await.is_ok()); + + // cleanup + actor.stop(None); + handle.await.unwrap(); +} diff --git a/ractor_cluster/Cargo.toml b/ractor_cluster/Cargo.toml index c727b3b..f69d588 100644 --- a/ractor_cluster/Cargo.toml +++ b/ractor_cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster" -version = "0.14.1" +version = "0.14.2" authors = ["Sean Lawlor "] description = "Distributed cluster environment of Ractor actors" documentation = "https://docs.rs/ractor" diff --git a/ractor_cluster_derive/Cargo.toml b/ractor_cluster_derive/Cargo.toml index ea0775e..3bc24fd 100644 --- a/ractor_cluster_derive/Cargo.toml +++ b/ractor_cluster_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster_derive" -version = "0.14.1" +version = "0.14.2" authors = ["Sean Lawlor "] description = "Derives for ractor_cluster" license = "MIT"