diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4fb8da3b..888fbbce 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,6 +17,9 @@ jobs: - name: Run the default tests package: ractor # flags: + - name: Test ractor in async_std + package: ractor + flags: --features async-std - name: Test ractor with the `cluster` feature package: ractor flags: -F cluster diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..4d9636b5 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.showUnlinkedFileNotification": false +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 0af4824d..82d0f6e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,4 @@ members = [ "ractor_cluster_integration_tests", "xtask" ] +resolver = "2" \ No newline at end of file diff --git a/README.md b/README.md index 745b82eb..858540c6 100644 --- a/README.md +++ b/README.md @@ -69,9 +69,10 @@ The minimum supported Rust version (MSRV) of `ractor` is `1.64` ## Features -`ractor` exposes a single feature currently, namely: +`ractor` exposes the following features: 1. `cluster`, which exposes various functionality required for `ractor_cluster` to set up and manage a cluster of actors over a network link. This is work-in-progress and is being tracked in [#16](https://github.com/slawlor/ractor/issues/16). +2. `async-std`, which enables usage of `async-std`'s asynchronous runtime instead of the `tokio` runtime. **However** `tokio` remains a dependency because we utilize the messaging synchronization primatives from `tokio` regardless of runtime as they are not specific to the `tokio` runtime. This work is tracked in [#173](https://github.com/slawlor/ractor/pull/173). ## Working with Actors @@ -183,11 +184,8 @@ will be supported by `ractor`. There are 4 concurrent message types, which are l 1. Signals: Signals are the highest-priority of all and will interrupt the actor wherever processing currently is (this includes terminating async work). There is only 1 signal today, which is `Signal::Kill`, and it immediately terminates all work. This includes message processing or supervision event processing. -2. Stop: There is also a pre-defined stop signal. You can give a "stop reason" if you want, but it's optional. Stop is a graceful exit, meaning currently executing async -work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate -currently executing work, regardless of the provided reason. -3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events -are how an actor's supervisor(s) are notified of events of their children and can handle lifetime events for them. +2. Stop: There is also the pre-defined stop signal. You can give a "stop reason" if you want, but it's optional. Stop is a graceful exit, meaning currently executing async work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate currently executing work, regardless of the provided reason. +3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events are how an actor's supervisor(parent) or peer monitors are notified of events of their children/peers and can handle lifetime events for them. 4. Messages: Regular, user-defined, messages are the last channel of communication to actors. They are the lowest priority of the 4 message types and denote general actor work. The first 3 messages types (signals, stop, supervision) are generally quiet unless it's a lifecycle event for the actor, but this channel is the "work" channel doing what your actor wants to do! @@ -255,7 +253,7 @@ enum MyBasicMessageType { } ``` -which adds a significant amount of underlying boilerplate (take a look yourself with `cargo expand`!) for the implementation. But the short answer is, each enum variant needs to serialize to a byte array of arguments, a variant name, and if it's an RPC give a port that receives a byte array and de-serialize the reply back. Each of the types inside of either the arguments or reply type need to implement the ```ractor_cluster::BytesConvertable``` trait which just says this value can be written to a byte array and decoded from a byte array. If you're using `prost` for your message type definitions (protobuf), we have a macro to auto-implement this for your types. +which adds a significant amount of underlying boilerplate (take a look yourself with `cargo expand`) for the implementation. But the short answer is, each enum variant needs to serialize to a byte array of arguments, a variant name, and if it's an RPC give a port that receives a byte array and de-serialize the reply back. Each of the types inside of either the arguments or reply type need to implement the ```ractor_cluster::BytesConvertable``` trait which just says this value can be written to a byte array and decoded from a byte array. If you're using `prost` for your message type definitions (protobuf), we have a macro to auto-implement this for your types. ```rust ractor_cluster::derive_serialization_for_prost_type! {MyProtobufType} diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 17028350..f2db9f8c 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor" -version = "0.9.2" +version = "0.9.3" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "A actor framework for Rust" documentation = "https://docs.rs/ractor" @@ -14,14 +14,10 @@ categories = ["actor", "erlang"] rust-version = "1.64" [features] -# WIP -# tokio_runtime = ["tokio/time"] -# async_std_runtime = ["async-std"] - -# default = ["tokio_runtime"] -# default = ["async_std_runtime"] - +### Other features cluster = [] + +# default = ["async-std"] default = [] [dependencies] @@ -31,8 +27,10 @@ dashmap = "5" futures = "0.3" once_cell = "1" rand = "0.8" + # Tracing feature requires --cfg=tokio_unstable tokio = { version = "1", features = ["sync", "time", "rt", "macros", "tracing"] } +async-std = { version = "1", features = ["attributes"], optional = true} tracing = { version = "0.1", features = ["attributes"] } [dev-dependencies] diff --git a/ractor/benches/actor.rs b/ractor/benches/actor.rs index 2565bddc..1860a42f 100644 --- a/ractor/benches/actor.rs +++ b/ractor/benches/actor.rs @@ -50,42 +50,80 @@ fn create_actors(c: &mut Criterion) { let large = 10000; let id = format!("Creation of {small} actors"); + #[cfg(not(feature = "async-std"))] let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + #[cfg(feature = "async-std")] + let _ = async_std::task::block_on(async {}); c.bench_function(&id, move |b| { b.iter_batched( || {}, |()| { - runtime.block_on(async move { - let mut handles = vec![]; - for _ in 0..small { - let (_, handler) = Actor::spawn(None, BenchActor, ()) - .await - .expect("Failed to create test agent"); - handles.push(handler); - } - handles - }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { + let mut handles = vec![]; + for _ in 0..small { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + handles.push(handler); + } + handles + }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + let mut handles = vec![]; + for _ in 0..small { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + handles.push(handler); + } + handles + }) + } }, BatchSize::PerIteration, ); }); let id = format!("Creation of {large} actors"); + #[cfg(not(feature = "async-std"))] let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + #[cfg(feature = "async-std")] + let _ = async_std::task::block_on(async {}); c.bench_function(&id, move |b| { b.iter_batched( || {}, |()| { - runtime.block_on(async move { - let mut handles = vec![]; - for _ in 0..large { - let (_, handler) = Actor::spawn(None, BenchActor, ()) - .await - .expect("Failed to create test agent"); - handles.push(handler); - } - handles - }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { + let mut handles = vec![]; + for _ in 0..large { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + handles.push(handler); + } + handles + }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + let mut handles = vec![]; + for _ in 0..large { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + handles.push(handler); + } + handles + }) + } }, BatchSize::PerIteration, ); @@ -97,47 +135,104 @@ fn schedule_work(c: &mut Criterion) { let large = 1000; let id = format!("Waiting on {small} actors to process first message"); + #[cfg(not(feature = "async-std"))] let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + #[cfg(feature = "async-std")] + let _ = async_std::task::block_on(async {}); c.bench_function(&id, move |b| { b.iter_batched( || { - runtime.block_on(async move { - let mut join_set = tokio::task::JoinSet::new(); - - for _ in 0..small { - let (_, handler) = Actor::spawn(None, BenchActor, ()) - .await - .expect("Failed to create test agent"); - join_set.spawn(handler); - } - join_set - }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { + let mut join_set = ractor::concurrency::JoinSet::new(); + + for _ in 0..small { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + join_set.spawn(handler); + } + join_set + }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + let mut join_set = ractor::concurrency::JoinSet::new(); + + for _ in 0..small { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + join_set.spawn(handler); + } + join_set + }) + } }, |mut handles| { - runtime.block_on(async move { while handles.join_next().await.is_some() {} }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + while handles.join_next().await.is_some() {} + }) + } }, BatchSize::PerIteration, ); }); let id = format!("Waiting on {large} actors to process first message"); + #[cfg(not(feature = "async-std"))] let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + #[cfg(feature = "async-std")] + let _ = async_std::task::block_on(async {}); c.bench_function(&id, move |b| { b.iter_batched( || { - runtime.block_on(async move { - let mut join_set = tokio::task::JoinSet::new(); - for _ in 0..large { - let (_, handler) = Actor::spawn(None, BenchActor, ()) - .await - .expect("Failed to create test agent"); - join_set.spawn(handler); - } - join_set - }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { + let mut join_set = ractor::concurrency::JoinSet::new(); + for _ in 0..large { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + join_set.spawn(handler); + } + join_set + }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + let mut join_set = ractor::concurrency::JoinSet::new(); + for _ in 0..large { + let (_, handler) = Actor::spawn(None, BenchActor, ()) + .await + .expect("Failed to create test agent"); + join_set.spawn(handler); + } + join_set + }) + } }, |mut handles| { - runtime.block_on(async move { while handles.join_next().await.is_some() {} }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + while handles.join_next().await.is_some() {} + }) + } }, BatchSize::PerIteration, ); @@ -186,21 +281,47 @@ fn process_messages(c: &mut Criterion) { } let id = format!("Waiting on {NUM_MSGS} messages to be processed"); + #[cfg(not(feature = "async-std"))] let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + #[cfg(feature = "async-std")] + let _ = async_std::task::block_on(async {}); c.bench_function(&id, move |b| { b.iter_batched( || { - runtime.block_on(async move { - let (_, handle) = Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ()) - .await - .expect("Failed to create test actor"); - handle - }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { + let (_, handle) = + Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ()) + .await + .expect("Failed to create test actor"); + handle + }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + let (_, handle) = + Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ()) + .await + .expect("Failed to create test actor"); + handle + }) + } }, |handle| { - runtime.block_on(async move { - let _ = handle.await; - }) + #[cfg(not(feature = "async-std"))] + { + runtime.block_on(async move { + let _ = handle.await; + }) + } + #[cfg(feature = "async-std")] + { + async_std::task::block_on(async move { + let _ = handle.await; + }) + } }, BatchSize::PerIteration, ); diff --git a/ractor/examples/philosophers.rs b/ractor/examples/philosophers.rs index 9f4896c9..6da054f2 100644 --- a/ractor/examples/philosophers.rs +++ b/ractor/examples/philosophers.rs @@ -495,7 +495,7 @@ async fn main() { let time_slice = Duration::from_millis(10); let run_time = Duration::from_secs(5); - let philosopher_names = vec![ + let philosopher_names = [ "Confucius", "Descartes", "Benjamin Franklin", diff --git a/ractor/src/actor/actor_cell.rs b/ractor/src/actor/actor_cell.rs index 5ce54144..f78b75be 100644 --- a/ractor/src/actor/actor_cell.rs +++ b/ractor/src/actor/actor_cell.rs @@ -12,6 +12,9 @@ use std::any::TypeId; use std::sync::Arc; +#[cfg(feature = "async-std")] +use futures::FutureExt; + use super::messages::{Signal, StopMessage}; use super::SupervisionEvent; use crate::actor::actor_properties::ActorProperties; @@ -107,15 +110,32 @@ impl ActorPortSet { where TState: crate::State, { - crate::concurrency::select! { - // supervision or message processing work - // can be interrupted by the signal port receiving - // a kill signal - signal = self.signal_rx.recv() => { - Err(signal.unwrap_or(Signal::Kill)) + #[cfg(feature = "async-std")] + { + crate::concurrency::select! { + // supervision or message processing work + // can be interrupted by the signal port receiving + // a kill signal + signal = self.signal_rx.recv().fuse() => { + Err(signal.unwrap_or(Signal::Kill)) + } + new_state = future.fuse() => { + Ok(new_state) + } } - new_state = future => { - Ok(new_state) + } + #[cfg(not(feature = "async-std"))] + { + crate::concurrency::select! { + // supervision or message processing work + // can be interrupted by the signal port receiving + // a kill signal + signal = self.signal_rx.recv() => { + Err(signal.unwrap_or(Signal::Kill)) + } + new_state = future => { + Ok(new_state) + } } } } @@ -129,18 +149,38 @@ impl ActorPortSet { /// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr] /// in the event any of the channels is closed. pub async fn listen_in_priority(&mut self) -> Result> { - crate::concurrency::select! { - signal = self.signal_rx.recv() => { - signal.map(ActorPortMessage::Signal).ok_or(MessagingErr::ChannelClosed) - } - stop = self.stop_rx.recv() => { - stop.map(ActorPortMessage::Stop).ok_or(MessagingErr::ChannelClosed) - } - supervision = self.supervisor_rx.recv() => { - supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed) + #[cfg(feature = "async-std")] + { + crate::concurrency::select! { + signal = self.signal_rx.recv().fuse() => { + signal.map(ActorPortMessage::Signal).ok_or(MessagingErr::ChannelClosed) + } + stop = self.stop_rx.recv().fuse() => { + stop.map(ActorPortMessage::Stop).ok_or(MessagingErr::ChannelClosed) + } + supervision = self.supervisor_rx.recv().fuse() => { + supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed) + } + message = self.message_rx.recv().fuse() => { + message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed) + } } - message = self.message_rx.recv() => { - message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed) + } + #[cfg(not(feature = "async-std"))] + { + crate::concurrency::select! { + signal = self.signal_rx.recv() => { + signal.map(ActorPortMessage::Signal).ok_or(MessagingErr::ChannelClosed) + } + stop = self.stop_rx.recv() => { + stop.map(ActorPortMessage::Stop).ok_or(MessagingErr::ChannelClosed) + } + supervision = self.supervisor_rx.recv() => { + supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed) + } + message = self.message_rx.recv() => { + message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed) + } } } } diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index 8c9157ce..cf735cae 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -867,8 +867,8 @@ async fn stop_and_wait() { .stop_and_wait(None, None) .await .expect("Failed to wait for actor death"); - // the handle should be done and completed - assert!(handle.is_finished()); + // the handle should be done and completed (after some brief scheduling delay) + periodic_check(|| handle.is_finished(), Duration::from_millis(500)).await; } #[crate::concurrency::test] @@ -896,8 +896,8 @@ async fn kill_and_wait() { .kill_and_wait(None) .await .expect("Failed to wait for actor death"); - // the handle should be done and completed - assert!(handle.is_finished()); + // the handle should be done and completed (after some brief scheduling delay) + periodic_check(|| handle.is_finished(), Duration::from_millis(500)).await; } #[test] diff --git a/ractor/src/concurrency.rs b/ractor/src/concurrency.rs deleted file mode 100644 index 0c8908ff..00000000 --- a/ractor/src/concurrency.rs +++ /dev/null @@ -1,199 +0,0 @@ -// Copyright (c) Sean Lawlor -// -// This source code is licensed under both the MIT license found in the -// LICENSE-MIT file in the root directory of this source tree. - -//! Shared concurrency primitives utilized within the library for different frameworks (tokio, async-std, etc) - -/// A timoeout error -#[derive(Debug)] -pub struct Timeout; - -/// A one-use sender -pub type OneshotSender = tokio::sync::oneshot::Sender; -/// A one-use receiver -pub type OneshotReceiver = tokio::sync::oneshot::Receiver; - -/// A bounded MP;SC sender -pub type MpscSender = tokio::sync::mpsc::Sender; -/// A bounded MP;SC receiver -pub type MpscReceiver = tokio::sync::mpsc::Receiver; - -/// A bounded MP;SC sender -pub type MpscUnboundedSender = tokio::sync::mpsc::UnboundedSender; -/// A bounded MP;SC receiver -pub type MpscUnboundedReceiver = tokio::sync::mpsc::UnboundedReceiver; - -/// A bounded broadcast sender -pub type BroadcastSender = tokio::sync::broadcast::Sender; -/// A bounded broadcast receiver -pub type BroadcastReceiver = tokio::sync::broadcast::Receiver; - -/// MPSC bounded channel -pub fn mpsc_bounded(buffer: usize) -> (MpscSender, MpscReceiver) { - tokio::sync::mpsc::channel(buffer) -} - -/// MPSC unbounded channel -pub fn mpsc_unbounded() -> (MpscUnboundedSender, MpscUnboundedReceiver) { - tokio::sync::mpsc::unbounded_channel() -} - -/// Oneshot channel -pub fn oneshot() -> (OneshotSender, OneshotReceiver) { - tokio::sync::oneshot::channel() -} - -/// Broadcast channel -pub fn broadcast(buffer: usize) -> (BroadcastSender, BroadcastReceiver) { - tokio::sync::broadcast::channel(buffer) -} - -// =============== TOKIO =============== // - -/// Tokio-based primitives -// #[cfg(feature = "tokio_runtime")] -pub mod tokio_primatives { - use std::future::Future; - - /// Represents a task JoinHandle - pub type JoinHandle = tokio::task::JoinHandle; - - /// A duration of time - pub type Duration = tokio::time::Duration; - - /// An instant measured on system time - pub type Instant = tokio::time::Instant; - - /// Sleep the task for a duration of time - pub async fn sleep(dur: super::Duration) { - tokio::time::sleep(dur).await; - } - - /// Spawn a task on the executor runtime - pub fn spawn(future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - spawn_named(None, future) - } - - /// Spawn a (possibly) named task on the executor runtime - pub fn spawn_named(name: Option<&str>, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - #[cfg(feature = "tracing")] - { - let mut builder = tokio::task::Builder::new(); - if let Some(name) = name { - builder = builder.name(name); - } - builder.spawn(future).expect("Tokio task spawn failed") - } - - #[cfg(not(feature = "tracing"))] - { - let _ = name; - tokio::task::spawn(future) - } - } - - /// Execute the future up to a timeout - /// - /// * `dur`: The duration of time to allow the future to execute for - /// * `future`: The future to execute - /// - /// Returns [Ok(_)] if the future succeeded before the timeout, [Err(super::Timeout)] otherwise - pub async fn timeout(dur: super::Duration, future: F) -> Result - where - F: Future, - { - tokio::time::timeout(dur, future) - .await - .map_err(|_| super::Timeout) - } - - macro_rules! select { - ($($tokens:tt)*) => {{ - tokio::select! { - // Biased ensures that we poll the ports in the order they appear, giving - // priority to our message reception operations. See: - // https://docs.rs/tokio/latest/tokio/macro.select.html#fairness - // for more information - biased; - - $( $tokens )* - } - }} - } - - pub(crate) use select; - - // test macro - pub use tokio::test; -} -// #[cfg(feature = "tokio_runtime")] -pub use tokio_primatives::*; - -// =============== ASYNC-STD =============== // - -/// Tokio-based primitives -#[cfg(feature = "async_std_runtime")] -pub mod async_std_primatives { - use std::future::Future; - - /// Represents a task JoinHandle - pub type JoinHandle = async_std::task::JoinHandle; - - /// A duration of time - pub type Duration = std::time::Duration; - - /// Sleep the task for a duration of time - pub async fn sleep(dur: super::Duration) { - async_std::task::sleep(dur).await; - } - - /// Spawn a task on the executor runtime - pub fn spawn(future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - async_std::task::spawn(future) - } - - /// Spawn a (possibly) named task on the executor runtime - pub fn spawn_named(name: Option<&str>, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let _ = name; - spawn(future) - } - - /// Execute the future up to a timeout - /// - /// * `dur`: The duration of time to allow the future to execute for - /// * `future`: The future to execute - /// - /// Returns [Ok(_)] if the future succeeded before the timeout, [Err(super::Timeout)] otherwise - pub async fn timeout(dur: super::Duration, future: F) -> Result - where - F: Future, - { - async_std::future::timeout(dur, future) - .await - .map_err(|_| super::Timeout) - } - - pub use futures::select_biased as select; - // test macro - #[cfg(test)] - pub(crate) use tokio::test; -} -#[cfg(feature = "async_std_runtime")] -pub use async_std_primatives::*; diff --git a/ractor/src/concurrency/async_std_primatives.rs b/ractor/src/concurrency/async_std_primatives.rs new file mode 100644 index 00000000..e5c77cb8 --- /dev/null +++ b/ractor/src/concurrency/async_std_primatives.rs @@ -0,0 +1,246 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Concurrency primitaves based on the `async-std` crate + +use std::{ + future::Future, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; + +use futures::stream::FuturesUnordered; +use futures::{future::BoxFuture, FutureExt, StreamExt}; + +/// Represents a [JoinHandle] on a spawned task. +/// Adds some syntactic wrapping to support a JoinHandle +/// similar to `tokio`'s. +pub struct JoinHandle { + handle: Option>, + is_done: Arc, +} +impl JoinHandle { + /// Determine if the handle is currently finished + pub fn is_finished(&self) -> bool { + self.handle.is_none() || self.is_done.load(Ordering::Relaxed) + } + + /// Abort the handle + pub fn abort(&mut self) { + if let Some(handle) = self.handle.take() { + let f = handle.cancel(); + drop(f); + } + } +} + +impl async_std::future::Future for JoinHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // a little black-magic to poll the inner future, but return a Result instead of a unit + let mutself = self.get_mut(); + let inner_polled_value = if let Some(inner) = mutself.handle.as_mut() { + inner.poll_unpin(cx) + } else { + return Poll::Ready(Err(())); + }; + + match inner_polled_value { + Poll::Pending => Poll::Pending, + Poll::Ready(v) => { + mutself.handle = None; + Poll::Ready(Ok(v)) + } + } + } +} + +/// A duration of time +pub type Duration = std::time::Duration; + +/// A system-agnostic point-in-time +pub type Instant = std::time::Instant; + +/// An asynchronous interval calculation which waits until +/// a checkpoint time to tick. This is a replication of the +/// basic functionality from `tokio`'s `Interval`. +pub struct Interval { + dur: Duration, + next_tick: Instant, +} + +impl Interval { + /// Wait until the next tick time has elapsed, regardless of computation time + pub async fn tick(&mut self) { + let now = Instant::now(); + // if the next tick time is in the future, wait until it's time + if self.next_tick > now { + sleep(self.next_tick - now).await; + } + // set the next tick time + self.next_tick += self.dur; + } +} + +/// Build a new interval at the given duration starting at now +/// +/// Ticks 1 time immediately +pub fn interval(dur: Duration) -> Interval { + Interval { + dur, + next_tick: Instant::now(), + } +} + +/// A set of futures to join on, in an unordered fashion +/// (first-completed, first-served). This is a wrapper +/// to match the signature of `tokio`'s `JoinSet` +#[derive(Default)] +pub struct JoinSet { + set: FuturesUnordered>, +} + +impl JoinSet { + /// Creates a new [JoinSet] + pub fn new() -> JoinSet { + Self { + set: FuturesUnordered::new(), + } + } + + /// Spawn a new future into the join set + pub fn spawn + Send + 'static>(&mut self, f: F) { + self.set.push(f.boxed()); + } + + /// Join the next future + pub async fn join_next(&mut self) -> Option> { + self.set.next().await.map(|item| Ok(item)) + } + + /// Get the number of futures in the [JoinSet] + pub fn len(&self) -> usize { + self.set.len() + } + + /// Determine if the [JoinSet] has any futures in it + pub fn is_empty(&self) -> bool { + self.set.is_empty() + } +} + +/// Sleep the task for a duration of time +pub async fn sleep(dur: super::Duration) { + async_std::task::sleep(dur).await; +} + +/// Spawn a task on the executor runtime +pub fn spawn(future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + let signal = Arc::new(AtomicBool::new(false)); + let inner_signal = signal.clone(); + + let jh = async_std::task::spawn(async move { + let r = future.await; + inner_signal.fetch_or(true, Ordering::Relaxed); + r + }); + + JoinHandle { + handle: Some(jh), + is_done: signal, + } +} + +/// Spawn a (possibly) named task on the executor runtime +pub fn spawn_named(name: Option<&str>, future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + if let Some(name) = name { + let signal = Arc::new(AtomicBool::new(false)); + let inner_signal = signal.clone(); + + let jh = async_std::task::Builder::new() + .name(name.to_string()) + .spawn(async move { + let r = future.await; + inner_signal.fetch_or(true, Ordering::Relaxed); + r + }) + .unwrap(); + + JoinHandle { + handle: Some(jh), + is_done: signal, + } + } else { + spawn(future) + } +} + +/// Execute the future up to a timeout +/// +/// * `dur`: The duration of time to allow the future to execute for +/// * `future`: The future to execute +/// +/// Returns [Ok(_)] if the future succeeded before the timeout, [Err(super::Timeout)] otherwise +pub async fn timeout(dur: super::Duration, future: F) -> Result +where + F: Future, +{ + async_std::future::timeout(dur, future) + .await + .map_err(|_| super::Timeout) +} + +/// test macro +pub use async_std::test; + +pub use futures::select_biased as select; + +#[cfg(test)] +mod async_std_primitive_tests { + + use super::*; + use crate::common_test::periodic_check; + + #[super::test] + async fn join_handle_aborts() { + let mut jh = spawn(async { + sleep(Duration::from_millis(1000)).await; + }); + jh.abort(); + assert!(jh.is_finished()); + } + + #[super::test] + async fn join_handle_finishes() { + let jh = spawn(async { + sleep(Duration::from_millis(5)).await; + println!("done."); + }); + + periodic_check(|| jh.is_finished(), Duration::from_millis(1000)).await; + } + + #[super::test] + async fn test_spawn_named() { + let jh = spawn_named(Some("something"), async { + sleep(Duration::from_millis(5)).await; + println!("done."); + }); + periodic_check(|| jh.is_finished(), Duration::from_millis(1000)).await; + } +} diff --git a/ractor/src/concurrency/mod.rs b/ractor/src/concurrency/mod.rs new file mode 100644 index 00000000..73c9da24 --- /dev/null +++ b/ractor/src/concurrency/mod.rs @@ -0,0 +1,60 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Shared concurrency primitives utilized within the library for different frameworks (tokio, async-std, etc) + +/// A timoeout error +#[derive(Debug)] +pub struct Timeout; + +/// A one-use sender +pub type OneshotSender = tokio::sync::oneshot::Sender; +/// A one-use receiver +pub type OneshotReceiver = tokio::sync::oneshot::Receiver; + +/// A bounded MP;SC sender +pub type MpscSender = tokio::sync::mpsc::Sender; +/// A bounded MP;SC receiver +pub type MpscReceiver = tokio::sync::mpsc::Receiver; + +/// A bounded MP;SC sender +pub type MpscUnboundedSender = tokio::sync::mpsc::UnboundedSender; +/// A bounded MP;SC receiver +pub type MpscUnboundedReceiver = tokio::sync::mpsc::UnboundedReceiver; + +/// A bounded broadcast sender +pub type BroadcastSender = tokio::sync::broadcast::Sender; +/// A bounded broadcast receiver +pub type BroadcastReceiver = tokio::sync::broadcast::Receiver; + +/// MPSC bounded channel +pub fn mpsc_bounded(buffer: usize) -> (MpscSender, MpscReceiver) { + tokio::sync::mpsc::channel(buffer) +} + +/// MPSC unbounded channel +pub fn mpsc_unbounded() -> (MpscUnboundedSender, MpscUnboundedReceiver) { + tokio::sync::mpsc::unbounded_channel() +} + +/// Oneshot channel +pub fn oneshot() -> (OneshotSender, OneshotReceiver) { + tokio::sync::oneshot::channel() +} + +/// Broadcast channel +pub fn broadcast(buffer: usize) -> (BroadcastSender, BroadcastReceiver) { + tokio::sync::broadcast::channel(buffer) +} + +#[cfg(not(feature = "async-std"))] +pub mod tokio_primatives; +#[cfg(not(feature = "async-std"))] +pub use self::tokio_primatives::*; + +#[cfg(feature = "async-std")] +pub mod async_std_primatives; +#[cfg(feature = "async-std")] +pub use self::async_std_primatives::*; diff --git a/ractor/src/concurrency/tokio_primatives.rs b/ractor/src/concurrency/tokio_primatives.rs new file mode 100644 index 00000000..59a6e89b --- /dev/null +++ b/ractor/src/concurrency/tokio_primatives.rs @@ -0,0 +1,102 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Concurrency primitaves based on `tokio` + +use std::future::Future; + +/// Represents a task JoinHandle +pub type JoinHandle = tokio::task::JoinHandle; + +/// A duration of time +pub type Duration = tokio::time::Duration; + +/// An instant measured on system time +pub type Instant = tokio::time::Instant; + +/// Sleep the task for a duration of time +pub async fn sleep(dur: Duration) { + tokio::time::sleep(dur).await; +} + +/// An asynchronous interval calculation which waits until +/// a checkpoint time to tick +pub type Interval = tokio::time::Interval; + +/// Build a new interval at the given duration starting at now +/// +/// Ticks 1 time immediately +pub fn interval(dur: Duration) -> Interval { + tokio::time::interval(dur) +} + +/// A set of futures to join on, in an unordered fashion +/// (first-completed, first-served) +pub type JoinSet = tokio::task::JoinSet; + +/// Spawn a task on the executor runtime +pub fn spawn(future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + spawn_named(None, future) +} + +/// Spawn a (possibly) named task on the executor runtime +pub fn spawn_named(name: Option<&str>, future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + #[cfg(feature = "tracing")] + { + let mut builder = tokio::task::Builder::new(); + if let Some(name) = name { + builder = builder.name(name); + } + builder.spawn(future).expect("Tokio task spawn failed") + } + + #[cfg(not(feature = "tracing"))] + { + let _ = name; + tokio::task::spawn(future) + } +} + +/// Execute the future up to a timeout +/// +/// * `dur`: The duration of time to allow the future to execute for +/// * `future`: The future to execute +/// +/// Returns [Ok(_)] if the future succeeded before the timeout, [Err(super::Timeout)] otherwise +pub async fn timeout(dur: super::Duration, future: F) -> Result +where + F: Future, +{ + tokio::time::timeout(dur, future) + .await + .map_err(|_| super::Timeout) +} + +macro_rules! select { + ($($tokens:tt)*) => {{ + tokio::select! { + // Biased ensures that we poll the ports in the order they appear, giving + // priority to our message reception operations. See: + // https://docs.rs/tokio/latest/tokio/macro.select.html#fairness + // for more information + biased; + + $( $tokens )* + } + }} + } + +pub(crate) use select; + +// test macro +pub use tokio::test; diff --git a/ractor/src/factory/tests/mod.rs b/ractor/src/factory/tests/mod.rs index aea9ca32..acf586b3 100644 --- a/ractor/src/factory/tests/mod.rs +++ b/ractor/src/factory/tests/mod.rs @@ -720,7 +720,7 @@ async fn test_worker_pings() { .expect("Failed to get statistics"); stats.ping_count > 0 }, - Duration::from_secs(5), + Duration::from_secs(10), ) .await; @@ -734,6 +734,12 @@ async fn test_worker_pings() { worker_counters[2].load(Ordering::Relaxed) ); - let all_counter = worker_counters[0].load(Ordering::Relaxed); - assert_eq!(999, all_counter); + periodic_check( + || { + let all_counter = worker_counters[0].load(Ordering::Relaxed); + all_counter == 999 + }, + Duration::from_secs(10), + ) + .await; } diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 32feb085..8138c535 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -273,15 +273,15 @@ async fn test_pg_monitoring() { .await .expect("Failed to start monitor actor"); - // this actor's startup should "monitor" for PG changes + // this actor's startup should notify the "monitor" for PG changes let (test_actor, test_handle) = Actor::spawn(None, AutoJoinActor { pg_group: group }, ()) .await .expect("Failed to start test actor"); - // the monitor is notified async, so we need to wait a tiny bit + // the monitor is notified async, so we need to wait a bit periodic_check( || counter.load(Ordering::Relaxed) == 1, - Duration::from_millis(500), + Duration::from_secs(5), ) .await; @@ -289,7 +289,11 @@ async fn test_pg_monitoring() { test_actor.stop(None); test_handle.await.expect("Actor cleanup failed"); // it should have notified that it's unsubscribed - assert_eq!(0, counter.load(Ordering::Relaxed)); + periodic_check( + || counter.load(Ordering::Relaxed) == 0, + Duration::from_secs(5), + ) + .await; // cleanup monitor_actor.stop(None); diff --git a/ractor/src/port/output/mod.rs b/ractor/src/port/output/mod.rs index db2ed292..a4e443bc 100644 --- a/ractor/src/port/output/mod.rs +++ b/ractor/src/port/output/mod.rs @@ -100,7 +100,7 @@ where { fn drop(&mut self) { let mut subs = self.subscriptions.write().unwrap(); - for sub in subs.iter() { + for sub in subs.iter_mut() { sub.stop(); } subs.clear(); @@ -123,7 +123,7 @@ impl OutputPortSubscription { } /// Stop the subscription, by aborting the underlying [JoinHandle] - pub fn stop(&self) { + pub fn stop(&mut self) { self.handle.abort(); } diff --git a/ractor/src/port/output/tests.rs b/ractor/src/port/output/tests.rs index a04151d8..a35198d3 100644 --- a/ractor/src/port/output/tests.rs +++ b/ractor/src/port/output/tests.rs @@ -67,7 +67,7 @@ async fn test_single_forward() { for _ in 0..4 { output.send(()); } - tokio::time::sleep(Duration::from_millis(50)).await; + crate::concurrency::sleep(Duration::from_millis(50)).await; assert!(!handle.is_finished()); // last send should trigger the exit condition diff --git a/ractor/src/rpc/mod.rs b/ractor/src/rpc/mod.rs index 31cc006a..da166211 100644 --- a/ractor/src/rpc/mod.rs +++ b/ractor/src/rpc/mod.rs @@ -107,13 +107,14 @@ where rx_ports.push(rx); } - let mut join_set = tokio::task::JoinSet::new(); + let mut results = Vec::new(); + let mut join_set = crate::concurrency::JoinSet::new(); for (i, rx) in rx_ports.into_iter().enumerate() { if let Some(duration) = timeout_option { join_set.spawn(async move { ( i, - match tokio::time::timeout(duration, rx).await { + match crate::concurrency::timeout(duration, rx).await { Ok(Ok(result)) => CallResult::Success(result), Ok(Err(_send_err)) => CallResult::SenderError, Err(_) => CallResult::Timeout, @@ -135,7 +136,6 @@ where // we threaded the index in order to maintain ordering from the originally called // actors. - let mut results = Vec::new(); results.resize_with(join_set.len(), || CallResult::Timeout); while let Some(result) = join_set.join_next().await { match result { diff --git a/ractor/src/rpc/tests.rs b/ractor/src/rpc/tests.rs index 1fd272d1..7e2add54 100644 --- a/ractor/src/rpc/tests.rs +++ b/ractor/src/rpc/tests.rs @@ -312,7 +312,11 @@ async fn test_rpc_call_forwarding() { .expect("Failed to forward message"); // make sure the counter was bumped to say the message was forwarded - assert_eq!(4, counter.load(Ordering::Relaxed)); + periodic_check( + || counter.load(Ordering::Relaxed) == 4, + Duration::from_secs(5), + ) + .await; // cleanup forwarder_ref.stop(None); diff --git a/ractor/src/time/mod.rs b/ractor/src/time/mod.rs index f5677cf3..4ddeaf98 100644 --- a/ractor/src/time/mod.rs +++ b/ractor/src/time/mod.rs @@ -39,14 +39,11 @@ where F: Fn() -> TMessage + Send + 'static, { // As per #57, the traditional sleep operation is subject to drift over long periods. - // Tokio providers an interval timer which accounts for execution time to send a message - // and changes in polling to wake the task to assure that the period doesn't drift over - // long runtimes. - // - // TODO (slawlor): Add support for this in a generic operation in the `concurrency` module - // so it can eventually be abstracted with other async runtimes other than Tokio + // Tokio and our internal version for `async_std` provide an interval timer which + // accounts for execution time to send a message and changes in polling to wake + // the task to assure that the period doesn't drift over long runtimes. crate::concurrency::spawn(async move { - let mut timer = tokio::time::interval(period); + let mut timer = crate::concurrency::interval(period); // timer tick's immediately the first time timer.tick().await; while ACTIVE_STATES.contains(&actor.get_status()) { diff --git a/ractor_cluster/Cargo.toml b/ractor_cluster/Cargo.toml index a566d34e..e019b252 100644 --- a/ractor_cluster/Cargo.toml +++ b/ractor_cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster" -version = "0.9.2" +version = "0.9.3" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "Distributed cluster environment of Ractor actors" documentation = "https://docs.rs/ractor" diff --git a/ractor_cluster_derive/Cargo.toml b/ractor_cluster_derive/Cargo.toml index cdc3e3ba..542a5b00 100644 --- a/ractor_cluster_derive/Cargo.toml +++ b/ractor_cluster_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster_derive" -version = "0.9.2" +version = "0.9.3" authors = ["Sean Lawlor "] description = "Derives for ractor_cluster" license = "MIT" diff --git a/ractor_cluster_integration_tests/src/ractor_forward_port_tests.rs b/ractor_cluster_integration_tests/src/ractor_forward_port_tests.rs index 794c1cf0..e77cf2b9 100644 --- a/ractor_cluster_integration_tests/src/ractor_forward_port_tests.rs +++ b/ractor_cluster_integration_tests/src/ractor_forward_port_tests.rs @@ -78,7 +78,7 @@ async fn timeout_rpc() { } let result = rx.await; - assert!(matches!(result, Err(_))); + assert!(result.is_err()); } #[ractor::concurrency::test]