Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async_std as a runtime #173

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ jobs:
- name: Run the default tests
package: ractor
# flags:
- name: Test ractor in async_std
package: ractor
flags: --features async-std
- name: Test ractor with the `cluster` feature
package: ractor
flags: -F cluster
Expand Down
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.showUnlinkedFileNotification": false
}
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ members = [
"ractor_cluster_integration_tests",
"xtask"
]
resolver = "2"
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ The minimum supported Rust version (MSRV) of `ractor` is `1.64`

## Features

`ractor` exposes a single feature currently, namely:
`ractor` exposes the following features:

1. `cluster`, which exposes various functionality required for `ractor_cluster` to set up and manage a cluster of actors over a network link. This is work-in-progress and is being tracked in [#16](https://github.com/slawlor/ractor/issues/16).
2. `async-std`, which enables usage of `async-std`'s asynchronous runtime instead of the `tokio` runtime. **However** `tokio` remains a dependency because we utilize the messaging synchronization primatives from `tokio` regardless of runtime as they are not specific to the `tokio` runtime. This work is tracked in [#173](https://github.com/slawlor/ractor/pull/173).

## Working with Actors

Expand Down Expand Up @@ -183,11 +184,8 @@ will be supported by `ractor`. There are 4 concurrent message types, which are l

1. Signals: Signals are the highest-priority of all and will interrupt the actor wherever processing currently is (this includes terminating async work). There
is only 1 signal today, which is `Signal::Kill`, and it immediately terminates all work. This includes message processing or supervision event processing.
2. Stop: There is also a pre-defined stop signal. You can give a "stop reason" if you want, but it's optional. Stop is a graceful exit, meaning currently executing async
work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate
currently executing work, regardless of the provided reason.
3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events
are how an actor's supervisor(s) are notified of events of their children and can handle lifetime events for them.
2. Stop: There is also the pre-defined stop signal. You can give a "stop reason" if you want, but it's optional. Stop is a graceful exit, meaning currently executing async work will complete, and on the next message processing iteration Stop will take priority over future supervision events or regular messages. It will **not** terminate currently executing work, regardless of the provided reason.
3. SupervisionEvent: Supervision events are messages from child actors to their supervisors in the event of their startup, death, and/or unhandled panic. Supervision events are how an actor's supervisor(parent) or peer monitors are notified of events of their children/peers and can handle lifetime events for them.
4. Messages: Regular, user-defined, messages are the last channel of communication to actors. They are the lowest priority of the 4 message types and denote general actor work. The first
3 messages types (signals, stop, supervision) are generally quiet unless it's a lifecycle event for the actor, but this channel is the "work" channel doing what your actor wants to do!

Expand Down Expand Up @@ -255,7 +253,7 @@ enum MyBasicMessageType {
}
```

which adds a significant amount of underlying boilerplate (take a look yourself with `cargo expand`!) for the implementation. But the short answer is, each enum variant needs to serialize to a byte array of arguments, a variant name, and if it's an RPC give a port that receives a byte array and de-serialize the reply back. Each of the types inside of either the arguments or reply type need to implement the ```ractor_cluster::BytesConvertable``` trait which just says this value can be written to a byte array and decoded from a byte array. If you're using `prost` for your message type definitions (protobuf), we have a macro to auto-implement this for your types.
which adds a significant amount of underlying boilerplate (take a look yourself with `cargo expand`) for the implementation. But the short answer is, each enum variant needs to serialize to a byte array of arguments, a variant name, and if it's an RPC give a port that receives a byte array and de-serialize the reply back. Each of the types inside of either the arguments or reply type need to implement the ```ractor_cluster::BytesConvertable``` trait which just says this value can be written to a byte array and decoded from a byte array. If you're using `prost` for your message type definitions (protobuf), we have a macro to auto-implement this for your types.

```rust
ractor_cluster::derive_serialization_for_prost_type! {MyProtobufType}
Expand Down
14 changes: 6 additions & 8 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.9.2"
version = "0.9.3"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -14,14 +14,10 @@ categories = ["actor", "erlang"]
rust-version = "1.64"

[features]
# WIP
# tokio_runtime = ["tokio/time"]
# async_std_runtime = ["async-std"]

# default = ["tokio_runtime"]
# default = ["async_std_runtime"]

### Other features
cluster = []

# default = ["async-std"]
default = []

[dependencies]
Expand All @@ -31,8 +27,10 @@ dashmap = "5"
futures = "0.3"
once_cell = "1"
rand = "0.8"

# Tracing feature requires --cfg=tokio_unstable
tokio = { version = "1", features = ["sync", "time", "rt", "macros", "tracing"] }
async-std = { version = "1", features = ["attributes"], optional = true}
tracing = { version = "0.1", features = ["attributes"] }

[dev-dependencies]
Expand Down
225 changes: 173 additions & 52 deletions ractor/benches/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,42 +50,80 @@ fn create_actors(c: &mut Criterion) {
let large = 10000;

let id = format!("Creation of {small} actors");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {},
|()| {
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut handles = vec![];
for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
},
BatchSize::PerIteration,
);
});

let id = format!("Creation of {large} actors");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {},
|()| {
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut handles = vec![];
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut handles = vec![];
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
handles.push(handler);
}
handles
})
}
},
BatchSize::PerIteration,
);
Expand All @@ -97,47 +135,104 @@ fn schedule_work(c: &mut Criterion) {
let large = 1000;

let id = format!("Waiting on {small} actors to process first message");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {
runtime.block_on(async move {
let mut join_set = tokio::task::JoinSet::new();

for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();

for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();

for _ in 0..small {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
},
|mut handles| {
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
while handles.join_next().await.is_some() {}
})
}
},
BatchSize::PerIteration,
);
});

let id = format!("Waiting on {large} actors to process first message");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {
runtime.block_on(async move {
let mut join_set = tokio::task::JoinSet::new();
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let mut join_set = ractor::concurrency::JoinSet::new();
for _ in 0..large {
let (_, handler) = Actor::spawn(None, BenchActor, ())
.await
.expect("Failed to create test agent");
join_set.spawn(handler);
}
join_set
})
}
},
|mut handles| {
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move { while handles.join_next().await.is_some() {} })
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
while handles.join_next().await.is_some() {}
})
}
},
BatchSize::PerIteration,
);
Expand Down Expand Up @@ -186,21 +281,47 @@ fn process_messages(c: &mut Criterion) {
}

let id = format!("Waiting on {NUM_MSGS} messages to be processed");
#[cfg(not(feature = "async-std"))]
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
#[cfg(feature = "async-std")]
let _ = async_std::task::block_on(async {});
c.bench_function(&id, move |b| {
b.iter_batched(
|| {
runtime.block_on(async move {
let (_, handle) = Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ())
.await
.expect("Failed to create test actor");
handle
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let (_, handle) =
Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ())
.await
.expect("Failed to create test actor");
handle
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let (_, handle) =
Actor::spawn(None, MessagingActor { num_msgs: NUM_MSGS }, ())
.await
.expect("Failed to create test actor");
handle
})
}
},
|handle| {
runtime.block_on(async move {
let _ = handle.await;
})
#[cfg(not(feature = "async-std"))]
{
runtime.block_on(async move {
let _ = handle.await;
})
}
#[cfg(feature = "async-std")]
{
async_std::task::block_on(async move {
let _ = handle.await;
})
}
},
BatchSize::PerIteration,
);
Expand Down
2 changes: 1 addition & 1 deletion ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ async fn main() {
let time_slice = Duration::from_millis(10);
let run_time = Duration::from_secs(5);

let philosopher_names = vec![
let philosopher_names = [
"Confucius",
"Descartes",
"Benjamin Franklin",
Expand Down
Loading
Loading