From 4271c43b4a840c4b7ebba0ed78cffdbcd37c9456 Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Fri, 13 Oct 2023 09:45:29 -0400 Subject: [PATCH] Some early prototyping to get wasm support for Ractor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Present status: We have most of the plubming in-place, however the runtime is getting a panic when issuing a `sleep` operation. Reproduce with 1. Installing Node.js 2. Running: `$ wasm-pack test --node -- -- test_pg_monitoring` Output: ``` [INFO]: 🎯 Checking for the Wasm target... Compiling ractor v0.9.3 (/Users/seanlawlor/src/slawlor_ractor/ractor) Finished dev [unoptimized + debuginfo] target(s) in 1.70s [INFO]: ⬇️ Installing wasm-bindgen... Compiling ractor v0.9.3 (/Users/seanlawlor/src/slawlor_ractor/ractor) Finished test [unoptimized + debuginfo] target(s) in 1.59s Running unittests src/lib.rs (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/debug/deps/ractor-44740482816e2f08.wasm) Set timeout to 20 seconds... running 44 tests ERROR ractor/src/common_test.rs:24 PRE_SLEEP panicked at 'called `Result::unwrap()` on an `Err` value: JsValue(Error: expected a number argument Error: expected a number argument at _assertNum (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:238:39) at /Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:482:5 at handleError (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:261:18) at module.exports.__wbg_setTimeout_fba1b48a90e30862 (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:480:72) at wasmtimer::js::GlobalScope::set_timeout_with_callback_and_timeout_and_arguments_0::hb00afdc7d7fa7e9b (wasm://wasm/02af6976:wasm-function[6527]:0x60b69f) at wasmtimer::js::set_timeout::hb4e14bbfc2d09317 (wasm://wasm/02af6976:wasm-function[14118]:0x73a46e) at wasmtimer::timer::global::schedule_callback::ha21e55b3c0d863d7 (wasm://wasm/02af6976:wasm-function[5487]:0x5c8603) at wasmtimer::timer::global::run::h0b7b9c4227a88003 (wasm://wasm/02af6976:wasm-function[3886]:0x5451a9) at ::default::hdd4fb80ebee06da7 (wasm://wasm/02af6976:wasm-function[3042]:0x4eb6d5) at wasmtimer::tokio::sleep::Sleep::new_at::h944492fbfbde8a7a (wasm://wasm/02af6976:wasm-function[14322]:0x73f43a))', /Users/seanlawlor/.cargo/registry/src/index.crates.io-6f17d22bba15001f/wasmtimer-0.2.0/src/timer/global.rs:85:6 Stack: Error at /Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:664:17 at logError (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:223:18) at module.exports.__wbg_new_abda76e883ba8a5f (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:663:65) at console_error_panic_hook::Error::new::h7964f208bc5ab187 (wasm://wasm/02af6976:wasm-function[20146]:0x7afa1b) at console_error_panic_hook::hook_impl::haa20ccab0a21c777 (wasm://wasm/02af6976:wasm-function[3835]:0x540001) at console_error_panic_hook::hook::h27c28657be0a81ea (wasm://wasm/02af6976:wasm-function[22361]:0x7cd06e) at core::ops::function::Fn::call::h0fdeed15ae9d43e6 (wasm://wasm/02af6976:wasm-function[19268]:0x7a1f54) at std::panicking::rust_panic_with_hook::he017c20114c82dea (wasm://wasm/02af6976:wasm-function[7919]:0x65af0f) at std::panicking::begin_panic_handler::{{closure}}::h5e23c6f065badef5 (wasm://wasm/02af6976:wasm-function[10127]:0x6bf1b7) at std::sys_common::backtrace::__rust_end_short_backtrace::hff4b03aa315e3d86 (wasm://wasm/02af6976:wasm-function[26263]:0x7eb8ac) test ractor::pg::tests::test_pg_monitoring ... FAIL failures: ---- ractor::pg::tests::test_pg_monitoring output ---- log output: %cERROR%c ractor/src/common_test.rs:24%c PRE_SLEEP color: red; background: #444 color: gray; font-style: italic color: inherit error output: panicked at 'called `Result::unwrap()` on an `Err` value: JsValue(Error: expected a number argument Error: expected a number argument at _assertNum (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:238:39) at /Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:482:5 at handleError (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:261:18) at module.exports.__wbg_setTimeout_fba1b48a90e30862 (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:480:72) at wasmtimer::js::GlobalScope::set_timeout_with_callback_and_timeout_and_arguments_0::hb00afdc7d7fa7e9b (wasm://wasm/02af6976:wasm-function[6527]:0x60b69f) at wasmtimer::js::set_timeout::hb4e14bbfc2d09317 (wasm://wasm/02af6976:wasm-function[14118]:0x73a46e) at wasmtimer::timer::global::schedule_callback::ha21e55b3c0d863d7 (wasm://wasm/02af6976:wasm-function[5487]:0x5c8603) at wasmtimer::timer::global::run::h0b7b9c4227a88003 (wasm://wasm/02af6976:wasm-function[3886]:0x5451a9) at ::default::hdd4fb80ebee06da7 (wasm://wasm/02af6976:wasm-function[3042]:0x4eb6d5) at wasmtimer::tokio::sleep::Sleep::new_at::h944492fbfbde8a7a (wasm://wasm/02af6976:wasm-function[14322]:0x73f43a))', /Users/seanlawlor/.cargo/registry/src/index.crates.io-6f17d22bba15001f/wasmtimer-0.2.0/src/timer/global.rs:85:6 Stack: Error at /Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:664:17 at logError (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:223:18) at module.exports.__wbg_new_abda76e883ba8a5f (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/wbg-tmp-ractor-44740482816e2f08.wasm/wasm-bindgen-test.js:663:65) at console_error_panic_hook::Error::new::h7964f208bc5ab187 (wasm://wasm/02af6976:wasm-function[20146]:0x7afa1b) at console_error_panic_hook::hook_impl::haa20ccab0a21c777 (wasm://wasm/02af6976:wasm-function[3835]:0x540001) at console_error_panic_hook::hook::h27c28657be0a81ea (wasm://wasm/02af6976:wasm-function[22361]:0x7cd06e) at core::ops::function::Fn::call::h0fdeed15ae9d43e6 (wasm://wasm/02af6976:wasm-function[19268]:0x7a1f54) at std::panicking::rust_panic_with_hook::he017c20114c82dea (wasm://wasm/02af6976:wasm-function[7919]:0x65af0f) at std::panicking::begin_panic_handler::{{closure}}::h5e23c6f065badef5 (wasm://wasm/02af6976:wasm-function[10127]:0x6bf1b7) at std::sys_common::backtrace::__rust_end_short_backtrace::hff4b03aa315e3d86 (wasm://wasm/02af6976:wasm-function[26263]:0x7eb8ac) JS exception that was thrown: RuntimeError: unreachable at __rust_start_panic (wasm://wasm/02af6976:wasm-function[26312]:0x7eb9ca) at rust_panic (wasm://wasm/02af6976:wasm-function[26139]:0x7eb2bd) at std::panicking::rust_panic_with_hook::he017c20114c82dea (wasm://wasm/02af6976:wasm-function[7919]:0x65af3c) at std::panicking::begin_panic_handler::{{closure}}::h5e23c6f065badef5 (wasm://wasm/02af6976:wasm-function[10127]:0x6bf1b7) at std::sys_common::backtrace::__rust_end_short_backtrace::hff4b03aa315e3d86 (wasm://wasm/02af6976:wasm-function[26263]:0x7eb8ac) at rust_begin_unwind (wasm://wasm/02af6976:wasm-function[15991]:0x763dce) at core::panicking::panic_fmt::h7859ddfa497c0e61 (wasm://wasm/02af6976:wasm-function[20866]:0x7b9ce8) at core::result::unwrap_failed::h99b8d970e94e9f11 (wasm://wasm/02af6976:wasm-function[11508]:0x6ef54b) at core::result::Result::unwrap::h0dd97648d8128ba5 (wasm://wasm/02af6976:wasm-function[11740]:0x6f6d0e) at wasmtimer::timer::global::schedule_callback::ha21e55b3c0d863d7 (wasm://wasm/02af6976:wasm-function[5487]:0x5c8620) failures: ractor::pg::tests::test_pg_monitoring test result: FAILED. 0 passed; 1 failed; 43 ignored error: test failed, to rerun pass `--lib` Caused by: process didn't exit successfully: `/Users/seanlawlor/Library/Caches/.wasm-pack/wasm-bindgen-cargo-install-0.2.87/wasm-bindgen-test-runner /Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/debug/deps/ractor-44740482816e2f08.wasm test_pg_monitoring` (exit status: 1) Error: Running Wasm tests with wasm-bindgen-test failed Caused by: Running Wasm tests with wasm-bindgen-test failed Caused by: failed to execute `cargo test`: exited with exit status: 1 full command: cd "/Users/seanlawlor/src/slawlor_ractor/ractor" && CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_RUNNER="/Users/seanlawlor/Library/Caches/.wasm-pack/wasm-bindgen-cargo-install-0.2.87/wasm-bindgen-test-runner" WASM_BINDGEN_TEST_ONLY_NODE="1" "cargo" "test" "--target" "wasm32-unknown-unknown" "--" "test_pg_monitoring" ``` Related to Issue #124 - Adding WASM support --- ractor/Cargo.toml | 29 +++- ractor/benches/actor.rs | 10 ++ ractor/examples/counter.rs | 3 +- ractor/examples/monte_carlo.rs | 3 +- ractor/examples/output_port.rs | 3 +- ractor/examples/philosophers.rs | 3 +- ractor/examples/ping_pong.rs | 3 +- ractor/examples/supervisor.rs | 3 +- ractor/src/actor/actor_cell.rs | 10 +- ractor/src/actor/tests/mod.rs | 49 +++--- ractor/src/actor/tests/supervisor.rs | 39 +++-- ractor/src/common_test.rs | 37 ++++- .../src/concurrency/async_std_primatives.rs | 31 ++-- ractor/src/concurrency/mod.rs | 21 ++- ractor/src/concurrency/wasm_primatives.rs | 142 ++++++++++++++++++ ractor/src/factory/job.rs | 4 +- ractor/src/factory/tests/mod.rs | 32 +--- ractor/src/factory/tests/worker_lifecycle.rs | 2 +- ractor/src/lib.rs | 13 +- ractor/src/pg/tests.rs | 49 +++--- ractor/src/port/output/tests.rs | 6 +- ractor/src/registry/tests.rs | 21 ++- ractor/src/rpc/tests.rs | 12 +- ractor/src/serialization.rs | 6 + ractor/src/tests.rs | 16 +- ractor/src/time/tests.rs | 12 +- 26 files changed, 425 insertions(+), 134 deletions(-) create mode 100644 ractor/src/concurrency/wasm_primatives.rs diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index f2db9f8c..43707d7f 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -17,7 +17,6 @@ rust-version = "1.64" ### Other features cluster = [] -# default = ["async-std"] default = [] [dependencies] @@ -26,23 +25,45 @@ async-trait = "0.1" dashmap = "5" futures = "0.3" once_cell = "1" -rand = "0.8" # Tracing feature requires --cfg=tokio_unstable tokio = { version = "1", features = ["sync", "time", "rt", "macros", "tracing"] } async-std = { version = "1", features = ["attributes"], optional = true} tracing = { version = "0.1", features = ["attributes"] } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +# TODO #124 - this is due to the temporary disabling of the "factory" module +rand = "0.8" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +# instant = { version = "0.1", features = ["wasm-bindgen"]} +getrandom = { version = "0.2", features = ["js"] } +parking_lot = { version = "0.11", features = ["wasm-bindgen"] } +wasm-timer = { version = "0.2" } +wasm-bindgen-futures = { version = "0.4"} +wasm-bindgen-test = "0.3" +wasmtimer = { version = "0.2", features = ["tokio"] } + [dev-dependencies] -criterion = "0.5" +backtrace = "0.3" function_name = "0.3" paste = "1" rand = "0.8" -tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread", "tracing"] } tracing-glog = "0.3" tracing-subscriber = { version = "0.3", features = ["env-filter"]} + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +console_error_panic_hook = "0.1" +tokio = { version = "1", features = ["rt", "time", "sync", "macros"] } +tracing-wasm = "0.2" +wasm-bindgen = "0.2" + +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +criterion = "0.5" +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread", "tracing"] } tracing-test = "0.2" + [[bench]] name = "actor" harness = false diff --git a/ractor/benches/actor.rs b/ractor/benches/actor.rs index 1860a42f..c130b451 100644 --- a/ractor/benches/actor.rs +++ b/ractor/benches/actor.rs @@ -3,9 +3,11 @@ // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree. +#[cfg(not(target_arch = "wasm32"))] #[macro_use] extern crate criterion; +#[cfg(not(target_arch = "wasm32"))] use criterion::{BatchSize, Criterion}; #[cfg(feature = "cluster")] use ractor::Message; @@ -45,6 +47,7 @@ impl Actor for BenchActor { } } +#[cfg(not(target_arch = "wasm32"))] fn create_actors(c: &mut Criterion) { let small = 100; let large = 10000; @@ -130,6 +133,7 @@ fn create_actors(c: &mut Criterion) { }); } +#[cfg(not(target_arch = "wasm32"))] fn schedule_work(c: &mut Criterion) { let small = 100; let large = 1000; @@ -239,6 +243,7 @@ fn schedule_work(c: &mut Criterion) { }); } +#[cfg(not(target_arch = "wasm32"))] #[allow(clippy::async_yields_async)] fn process_messages(c: &mut Criterion) { const NUM_MSGS: u64 = 100000; @@ -328,5 +333,10 @@ fn process_messages(c: &mut Criterion) { }); } +#[cfg(not(target_arch = "wasm32"))] criterion_group!(actors, create_actors, schedule_work, process_messages); +#[cfg(not(target_arch = "wasm32"))] criterion_main!(actors); + +#[cfg(target_arch = "wasm32")] +fn main() {} diff --git a/ractor/examples/counter.rs b/ractor/examples/counter.rs index c8803aa2..fa3c37e1 100644 --- a/ractor/examples/counter.rs +++ b/ractor/examples/counter.rs @@ -99,7 +99,8 @@ fn init_logging() { tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); } -#[tokio::main] +#[cfg_attr(not(target_arch = "wasm32"), tokio::main)] +#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))] async fn main() { init_logging(); diff --git a/ractor/examples/monte_carlo.rs b/ractor/examples/monte_carlo.rs index f8578f34..f33a1514 100644 --- a/ractor/examples/monte_carlo.rs +++ b/ractor/examples/monte_carlo.rs @@ -228,7 +228,8 @@ fn init_logging() { tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); } -#[tokio::main] +#[cfg_attr(not(target_arch = "wasm32"), tokio::main)] +#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))] async fn main() { init_logging(); diff --git a/ractor/examples/output_port.rs b/ractor/examples/output_port.rs index 9df8b999..45715ff5 100644 --- a/ractor/examples/output_port.rs +++ b/ractor/examples/output_port.rs @@ -127,7 +127,8 @@ fn init_logging() { tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); } -#[tokio::main] +#[cfg_attr(not(target_arch = "wasm32"), tokio::main)] +#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))] async fn main() { init_logging(); diff --git a/ractor/examples/philosophers.rs b/ractor/examples/philosophers.rs index 6da054f2..830ed0c9 100644 --- a/ractor/examples/philosophers.rs +++ b/ractor/examples/philosophers.rs @@ -487,7 +487,8 @@ fn init_logging() { tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); } -#[tokio::main] +#[cfg_attr(not(target_arch = "wasm32"), tokio::main)] +#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))] async fn main() { init_logging(); diff --git a/ractor/examples/ping_pong.rs b/ractor/examples/ping_pong.rs index c4c5e9a9..f2fd0238 100644 --- a/ractor/examples/ping_pong.rs +++ b/ractor/examples/ping_pong.rs @@ -105,7 +105,8 @@ fn init_logging() { tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); } -#[tokio::main] +#[cfg_attr(not(target_arch = "wasm32"), tokio::main)] +#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))] async fn main() { init_logging(); diff --git a/ractor/examples/supervisor.rs b/ractor/examples/supervisor.rs index fdc9e557..f14af10c 100644 --- a/ractor/examples/supervisor.rs +++ b/ractor/examples/supervisor.rs @@ -44,7 +44,8 @@ fn init_logging() { tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); } -#[tokio::main] +#[cfg_attr(not(target_arch = "wasm32"), tokio::main)] +#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))] async fn main() { init_logging(); diff --git a/ractor/src/actor/actor_cell.rs b/ractor/src/actor/actor_cell.rs index f78b75be..20dd02da 100644 --- a/ractor/src/actor/actor_cell.rs +++ b/ractor/src/actor/actor_cell.rs @@ -12,7 +12,7 @@ use std::any::TypeId; use std::sync::Arc; -#[cfg(feature = "async-std")] +#[cfg(any(feature = "async-std", target_arch = "wasm32"))] use futures::FutureExt; use super::messages::{Signal, StopMessage}; @@ -110,7 +110,7 @@ impl ActorPortSet { where TState: crate::State, { - #[cfg(feature = "async-std")] + #[cfg(any(feature = "async-std", target_arch = "wasm32"))] { crate::concurrency::select! { // supervision or message processing work @@ -124,7 +124,7 @@ impl ActorPortSet { } } } - #[cfg(not(feature = "async-std"))] + #[cfg(not(any(feature = "async-std", target_arch = "wasm32")))] { crate::concurrency::select! { // supervision or message processing work @@ -149,7 +149,7 @@ impl ActorPortSet { /// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr] /// in the event any of the channels is closed. pub async fn listen_in_priority(&mut self) -> Result> { - #[cfg(feature = "async-std")] + #[cfg(any(feature = "async-std", target_arch = "wasm32"))] { crate::concurrency::select! { signal = self.signal_rx.recv().fuse() => { @@ -166,7 +166,7 @@ impl ActorPortSet { } } } - #[cfg(not(feature = "async-std"))] + #[cfg(not(any(feature = "async-std", target_arch = "wasm32")))] { crate::concurrency::select! { signal = self.signal_rx.recv() => { diff --git a/ractor/src/actor/tests/mod.rs b/ractor/src/actor/tests/mod.rs index cf735cae..73409276 100644 --- a/ractor/src/actor/tests/mod.rs +++ b/ractor/src/actor/tests/mod.rs @@ -16,15 +16,16 @@ use crate::{ Actor, ActorCell, ActorProcessingErr, ActorRef, ActorStatus, SpawnErr, SupervisionEvent, }; -mod supervisor; +pub mod supervisor; struct EmptyMessage; #[cfg(feature = "cluster")] impl crate::Message for EmptyMessage {} #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_panic_on_start_captured() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait] @@ -47,8 +48,9 @@ async fn test_panic_on_start_captured() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_error_on_start_captured() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait] @@ -71,8 +73,9 @@ async fn test_error_on_start_captured() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_stop_higher_priority_over_messages() { + crate::common_test::setup(); let message_counter = Arc::new(AtomicU8::new(0u8)); struct TestActor { @@ -150,8 +153,9 @@ async fn test_stop_higher_priority_over_messages() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_kill_terminates_work() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait] @@ -196,8 +200,9 @@ async fn test_kill_terminates_work() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_stop_does_not_terminate_async_work() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait] @@ -251,8 +256,9 @@ async fn test_stop_does_not_terminate_async_work() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_kill_terminates_supervision_work() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait] @@ -299,8 +305,9 @@ async fn test_kill_terminates_supervision_work() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_sending_message_to_invalid_actor_type() { + crate::common_test::setup(); struct TestActor1; struct TestMessage1; #[cfg(feature = "cluster")] @@ -357,8 +364,9 @@ async fn test_sending_message_to_invalid_actor_type() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_sending_message_to_dead_actor() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait] @@ -391,8 +399,9 @@ async fn test_sending_message_to_dead_actor() { #[cfg(feature = "cluster")] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_serialized_cast() { + crate::common_test::setup(); use crate::message::{BoxedDowncastErr, SerializedMessage}; use crate::Message; @@ -506,8 +515,9 @@ where #[cfg(feature = "cluster")] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_serialized_rpc() { + crate::common_test::setup(); use crate::message::{BoxedDowncastErr, SerializedMessage}; use crate::{Message, RpcReplyPort}; @@ -616,8 +626,9 @@ async fn test_serialized_rpc() { #[cfg(feature = "cluster")] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_remote_actor() { + crate::common_test::setup(); use crate::message::{BoxedDowncastErr, SerializedMessage}; use crate::{ActorId, ActorRuntime, Message}; @@ -725,8 +736,9 @@ async fn test_remote_actor() { #[cfg(feature = "cluster")] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn spawning_local_actor_as_remote_fails() { + crate::common_test::setup(); use crate::ActorProcessingErr; struct RemoteActor; @@ -782,8 +794,9 @@ async fn spawning_local_actor_as_remote_fails() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn instant_spawns() { + crate::common_test::setup(); let counter = Arc::new(AtomicU8::new(0)); struct EmptyActor; @@ -843,7 +856,7 @@ async fn instant_spawns() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn stop_and_wait() { struct SlowActor; #[async_trait::async_trait] @@ -872,8 +885,9 @@ async fn stop_and_wait() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn kill_and_wait() { + crate::common_test::setup(); struct SlowActor; #[async_trait::async_trait] impl Actor for SlowActor { @@ -901,8 +915,9 @@ async fn kill_and_wait() { } #[test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] fn test_err_map() { + crate::common_test::setup(); let err: RactorErr = RactorErr::Messaging(MessagingErr::SendErr(123)); let _: RactorErr<()> = err.map(|_| ()); diff --git a/ractor/src/actor/tests/supervisor.rs b/ractor/src/actor/tests/supervisor.rs index d9e56a5a..7e57101c 100644 --- a/ractor/src/actor/tests/supervisor.rs +++ b/ractor/src/actor/tests/supervisor.rs @@ -18,8 +18,9 @@ use crate::{ use crate::{Actor, ActorCell, ActorRef, ActorStatus, SupervisionEvent}; #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_panic_in_post_startup() { + crate::common_test::setup(); struct Child; struct Supervisor { flag: Arc, @@ -106,8 +107,9 @@ async fn test_supervision_panic_in_post_startup() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_error_in_post_startup() { + crate::common_test::setup(); struct Child; struct Supervisor { flag: Arc, @@ -194,8 +196,9 @@ async fn test_supervision_error_in_post_startup() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_panic_in_handle() { + crate::common_test::setup(); struct Child; struct Supervisor { flag: Arc, @@ -291,8 +294,9 @@ async fn test_supervision_panic_in_handle() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_error_in_handle() { + crate::common_test::setup(); struct Child; struct Supervisor { flag: Arc, @@ -388,8 +392,9 @@ async fn test_supervision_error_in_handle() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_panic_in_post_stop() { + crate::common_test::setup(); struct Child; struct Supervisor { flag: Arc, @@ -469,8 +474,9 @@ async fn test_supervision_panic_in_post_stop() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_error_in_post_stop() { + crate::common_test::setup(); struct Child; struct Supervisor { flag: Arc, @@ -552,8 +558,9 @@ async fn test_supervision_error_in_post_stop() { /// Test that a panic in the supervisor's handling propagates to /// the supervisor's supervisor #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_panic_in_supervisor_handle() { + crate::common_test::setup(); struct Child; struct Midpoint; struct Supervisor { @@ -694,8 +701,9 @@ async fn test_supervision_panic_in_supervisor_handle() { /// Test that a panic in the supervisor's handling propagates to /// the supervisor's supervisor #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervision_error_in_supervisor_handle() { + crate::common_test::setup(); struct Child; struct Midpoint; struct Supervisor { @@ -834,8 +842,9 @@ async fn test_supervision_error_in_supervisor_handle() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_killing_a_supervisor_terminates_children() { + crate::common_test::setup(); struct Child; struct Supervisor; @@ -907,8 +916,9 @@ async fn test_killing_a_supervisor_terminates_children() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn instant_supervised_spawns() { + crate::common_test::setup(); let counter = Arc::new(AtomicU8::new(0)); struct EmptySupervisor; @@ -1001,8 +1011,9 @@ async fn instant_supervised_spawns() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervisor_captures_dead_childs_state() { + crate::common_test::setup(); struct Child; struct Supervisor { flag: Arc, @@ -1106,8 +1117,9 @@ async fn test_supervisor_captures_dead_childs_state() { // 1. terminate_children_after() #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_supervisor_double_link() { + crate::common_test::setup(); struct Who; #[crate::async_trait] @@ -1148,8 +1160,9 @@ async fn test_supervisor_double_link() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_simple_monitor() { + crate::common_test::setup(); struct Peer; struct Monitor { counter: Arc, diff --git a/ractor/src/common_test.rs b/ractor/src/common_test.rs index 559e0471..5d7e212b 100644 --- a/ractor/src/common_test.rs +++ b/ractor/src/common_test.rs @@ -3,6 +3,9 @@ // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree. +// TODO #124 (slawlor): Redesign this without usage of core time primatives (i.e. +// use concurrency instants) +#[cfg(not(target_arch = "wasm32"))] use std::future::Future; use crate::concurrency::sleep; @@ -20,9 +23,13 @@ where } sleep(Duration::from_millis(50)).await; } - assert!(check()); + + let backtrace = backtrace::Backtrace::new(); + assert!(check(), "Periodic check failed.\n{:?}", backtrace); } +// TODO #124 reenable once factories ready +#[cfg(not(target_arch = "wasm32"))] pub async fn periodic_async_check(check: F, timeout: Duration) where F: Fn() -> Fut, @@ -35,5 +42,31 @@ where } sleep(Duration::from_millis(50)).await; } - assert!(check().await); + + let backtrace = backtrace::Backtrace::new(); + assert!( + check().await, + "Async periodic check failed.\n{:?}", + backtrace + ); +} + +#[cfg(target_arch = "wasm32")] +#[wasm_bindgen::prelude::wasm_bindgen(start)] +/// Setup a common test with proper tracing support (whether WASM or regular runtime) +pub fn setup() { + extern crate console_error_panic_hook; + + // print pretty errors in wasm https://github.com/rustwasm/console_error_panic_hook + // This is not needed for tracing_wasm to work, but it is a common tool for getting proper error line numbers for panics. + console_error_panic_hook::set_once(); + + // Add this line: + let _ = tracing_wasm::try_set_as_global_default(); + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); } + +/// Setup a common test with proper tracing support (whether WASM or regular runtime) +#[cfg(not(target_arch = "wasm32"))] +pub fn setup() {} diff --git a/ractor/src/concurrency/async_std_primatives.rs b/ractor/src/concurrency/async_std_primatives.rs index e5c77cb8..fbecd96c 100644 --- a/ractor/src/concurrency/async_std_primatives.rs +++ b/ractor/src/concurrency/async_std_primatives.rs @@ -4,6 +4,9 @@ // LICENSE-MIT file in the root directory of this source tree. //! Concurrency primitaves based on the `async-std` crate +//! +//! We still rely on tokio for some core executor-independent parts +//! such as channels (see: https://github.com/tokio-rs/tokio/issues/4232#issuecomment-968329443). use std::{ future::Future, @@ -147,19 +150,7 @@ where F: Future + Send + 'static, F::Output: Send + 'static, { - let signal = Arc::new(AtomicBool::new(false)); - let inner_signal = signal.clone(); - - let jh = async_std::task::spawn(async move { - let r = future.await; - inner_signal.fetch_or(true, Ordering::Relaxed); - r - }); - - JoinHandle { - handle: Some(jh), - is_done: signal, - } + spawn_named(None, future) } /// Spawn a (possibly) named task on the executor runtime @@ -186,7 +177,19 @@ where is_done: signal, } } else { - spawn(future) + let signal = Arc::new(AtomicBool::new(false)); + let inner_signal = signal.clone(); + + let jh = async_std::task::spawn(async move { + let r = future.await; + inner_signal.fetch_or(true, Ordering::Relaxed); + r + }); + + JoinHandle { + handle: Some(jh), + is_done: signal, + } } } diff --git a/ractor/src/concurrency/mod.rs b/ractor/src/concurrency/mod.rs index 73c9da24..aacb5173 100644 --- a/ractor/src/concurrency/mod.rs +++ b/ractor/src/concurrency/mod.rs @@ -5,9 +5,15 @@ //! Shared concurrency primitives utilized within the library for different frameworks (tokio, async-std, etc) -/// A timoeout error +/// A timeout error #[derive(Debug)] pub struct Timeout; +impl std::error::Error for Timeout {} +impl std::fmt::Display for Timeout { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Timeout") + } +} /// A one-use sender pub type OneshotSender = tokio::sync::oneshot::Sender; @@ -49,12 +55,17 @@ pub fn broadcast(buffer: usize) -> (BroadcastSender, BroadcastRecei tokio::sync::broadcast::channel(buffer) } -#[cfg(not(feature = "async-std"))] +#[cfg(not(any(feature = "async-std", target_arch = "wasm32")))] pub mod tokio_primatives; -#[cfg(not(feature = "async-std"))] +#[cfg(not(any(feature = "async-std", target_arch = "wasm32")))] pub use self::tokio_primatives::*; -#[cfg(feature = "async-std")] +#[cfg(all(feature = "async-std", not(target_arch = "wasm32")))] pub mod async_std_primatives; -#[cfg(feature = "async-std")] +#[cfg(all(feature = "async-std", not(target_arch = "wasm32")))] pub use self::async_std_primatives::*; + +#[cfg(target_arch = "wasm32")] +pub mod wasm_primatives; +#[cfg(target_arch = "wasm32")] +pub use self::wasm_primatives::*; diff --git a/ractor/src/concurrency/wasm_primatives.rs b/ractor/src/concurrency/wasm_primatives.rs new file mode 100644 index 00000000..a67e8b38 --- /dev/null +++ b/ractor/src/concurrency/wasm_primatives.rs @@ -0,0 +1,142 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Concurrency primitaves based on a WASM runtime. They however are not completely +//! functional and there are other core problems with Ractor in a WASM runtime, specifically +//! that panic's abort instantly. https://github.com/rust-lang/rust/issues/58874 +//! +//! ## Testing +//! +//! Test this configuration with +//! +//! ```text +//! wasm-pack test --headless --safari -r +//! ``` + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::FutureExt; + +/// Represents a task JoinHandle +pub struct JoinHandle { + handle: Option>, +} + +impl Drop for JoinHandle { + fn drop(&mut self) { + if let Some(h) = self.handle.take() { + h.forget(); + } + } +} + +impl JoinHandle { + /// Determine if the handle is currently finished + pub fn is_finished(&self) -> bool { + self.handle.is_none() + } + + /// Abort the handle + pub fn abort(&mut self) { + // For a remote handle, being dropped will wake the remote future + // to be dropped by the executor + // See: https://docs.rs/futures/latest/futures/prelude/future/struct.RemoteHandle.html + drop(self.handle.take()); + } +} + +impl Future for JoinHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // a little black-magic to poll the inner future, but return a Result instead of a unit + let mutself = self.get_mut(); + let inner_polled_value = if let Some(inner) = mutself.handle.as_mut() { + inner.poll_unpin(cx) + } else { + return Poll::Ready(Err(())); + }; + + match inner_polled_value { + Poll::Pending => Poll::Pending, + Poll::Ready(v) => { + mutself.abort(); + Poll::Ready(Ok(v)) + } + } + } +} + +/// Spawn a task on the executor runtime +pub fn spawn(future: F) -> JoinHandle +where + F: Future + 'static, + F::Output: Send + 'static, +{ + spawn_named(None, future) +} + +/// Spawn a (possibly) named task on the executor runtime +pub fn spawn_named(name: Option<&str>, future: F) -> JoinHandle +where + F: Future + 'static, + F::Output: Send + 'static, +{ + let _ = name; + let (remote, remote_handle) = future.remote_handle(); + wasm_bindgen_futures::spawn_local(remote); + JoinHandle { + handle: Some(remote_handle), + } +} + +/// A duration of time +pub type Duration = std::time::Duration; + +/// An instant measured on system time +pub type Instant = wasm_timer::Instant; + +/// Sleep the task for a duration of time +pub async fn sleep(dur: Duration) { + wasmtimer::tokio::sleep(dur).await; + // let _ = wasm_timer::Delay::new(dur).await; +} + +/// An asynchronous interval calculation which waits until +/// a checkpoint time to tick +pub type Interval = wasmtimer::tokio::Interval; + +/// Build a new interval at the given duration starting at now +/// +/// Ticks 1 time immediately +pub fn interval(dur: Duration) -> Interval { + wasmtimer::tokio::interval(dur) +} + +/// A set of futures to join on, in an unordered fashion +/// (first-completed, first-served) +pub type JoinSet = tokio::task::JoinSet; + +/// Execute the future up to a timeout +/// +/// * `dur`: The duration of time to allow the future to execute for +/// * `future`: The future to execute +/// +/// Returns [Ok(_)] if the future succeeded before the timeout, [Err(Timeout)] otherwise +pub async fn timeout(dur: Duration, future: F) -> Result +where + F: Future, +{ + wasmtimer::tokio::timeout(dur, future) + .await + .map_err(|_| super::Timeout) +} + +pub use futures::select_biased as select; + +// test macro +pub use wasm_bindgen_test::wasm_bindgen_test as test; diff --git a/ractor/src/factory/job.rs b/ractor/src/factory/job.rs index 7d15bb9f..69c9a758 100644 --- a/ractor/src/factory/job.rs +++ b/ractor/src/factory/job.rs @@ -334,7 +334,7 @@ mod tests { type TheJob = Job; #[test] - #[tracing_test::traced_test] + #[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] fn test_job_serialization() { // Check Cast variant let job_a = TheJob { @@ -396,7 +396,7 @@ mod tests { } #[test] - #[tracing_test::traced_test] + #[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] fn test_factory_message_serialization() { let job_a = TheJob { key: TestKey { item: 123 }, diff --git a/ractor/src/factory/tests/mod.rs b/ractor/src/factory/tests/mod.rs index acf586b3..7e635cf0 100644 --- a/ractor/src/factory/tests/mod.rs +++ b/ractor/src/factory/tests/mod.rs @@ -142,7 +142,7 @@ impl super::WorkerBuilder for InsanelySlowWorkerBuilder { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_dispatch_key_persistent() { let worker_counters: [_; NUM_TEST_WORKERS] = [ Arc::new(AtomicU16::new(0)), @@ -246,7 +246,7 @@ async fn test_dispatch_queuer() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_dispatch_round_robin() { let worker_counters: [_; NUM_TEST_WORKERS] = [ Arc::new(AtomicU16::new(0)), @@ -299,7 +299,7 @@ async fn test_dispatch_round_robin() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_dispatch_random() { let worker_counters: [_; NUM_TEST_WORKERS] = [ Arc::new(AtomicU16::new(0)), @@ -354,7 +354,7 @@ async fn test_dispatch_random() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_dispatch_custom_hashing() { struct MyHasher where @@ -424,7 +424,7 @@ async fn test_dispatch_custom_hashing() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_dispatch_sticky_queueing() { let worker_counters: [_; NUM_TEST_WORKERS] = [ Arc::new(AtomicU16::new(0)), @@ -479,7 +479,7 @@ async fn test_dispatch_sticky_queueing() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_discards_on_queuer() { let worker_counters: [_; NUM_TEST_WORKERS] = [ Arc::new(AtomicU16::new(0)), @@ -608,7 +608,7 @@ impl Actor for StuckWorker { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_stuck_workers() { let worker_counters: [_; NUM_TEST_WORKERS] = [ Arc::new(AtomicU16::new(0)), @@ -682,7 +682,7 @@ async fn test_stuck_workers() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_worker_pings() { let worker_counters: [_; NUM_TEST_WORKERS] = [ Arc::new(AtomicU16::new(0)), @@ -726,20 +726,4 @@ async fn test_worker_pings() { factory.stop(None); factory_handle.await.unwrap(); - - tracing::info!( - "Counters: [{}] [{}] [{}]", - worker_counters[0].load(Ordering::Relaxed), - worker_counters[1].load(Ordering::Relaxed), - worker_counters[2].load(Ordering::Relaxed) - ); - - periodic_check( - || { - let all_counter = worker_counters[0].load(Ordering::Relaxed); - all_counter == 999 - }, - Duration::from_secs(10), - ) - .await; } diff --git a/ractor/src/factory/tests/worker_lifecycle.rs b/ractor/src/factory/tests/worker_lifecycle.rs index 19601fb2..e225a92b 100644 --- a/ractor/src/factory/tests/worker_lifecycle.rs +++ b/ractor/src/factory/tests/worker_lifecycle.rs @@ -94,7 +94,7 @@ impl WorkerBuilder for MyWorkerBuilder { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_worker_death_restarts_and_gets_next_message() { let counter = Arc::new(AtomicU16::new(0)); let worker_builder = MyWorkerBuilder { diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index b2b1505e..ed3fb3ee 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -150,6 +150,9 @@ pub mod actor; pub(crate) mod common_test; pub mod concurrency; pub mod errors; +// TODO #124 (slawlor): Redesign this without usage of core time primatives (i.e. +// use concurrency instants) +#[cfg(not(target_arch = "wasm32"))] pub mod factory; pub mod macros; pub mod message; @@ -166,8 +169,6 @@ pub mod time; #[cfg(test)] mod tests; #[cfg(test)] -use criterion as _; -#[cfg(test)] use paste as _; #[cfg(test)] use rand as _; @@ -176,6 +177,14 @@ use tracing_glog as _; #[cfg(test)] use tracing_subscriber as _; +#[cfg(all(test, not(target_arch = "wasm32")))] +use criterion as _; + +#[cfg(target_arch = "wasm32")] +use getrandom as _; +#[cfg(target_arch = "wasm32")] +use parking_lot as _; + // ======================== Re-exports ======================== // pub use actor::actor_cell::{ActorCell, ActorStatus, ACTIVE_STATES}; diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 8138c535..2ef2687a 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -3,6 +3,8 @@ // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree. +#![allow(unused)] + use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; @@ -33,8 +35,9 @@ impl Actor for TestActor { #[named] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_basic_group() { + crate::common_test::setup(); let (actor, handle) = Actor::spawn(None, TestActor, ()) .await .expect("Failed to spawn test actor"); @@ -54,8 +57,9 @@ async fn test_basic_group() { #[named] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_multiple_members_in_group() { + crate::common_test::setup(); let group = function_name!().to_string(); let mut actors = vec![]; @@ -91,8 +95,9 @@ async fn test_multiple_members_in_group() { #[named] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_multiple_groups() { + crate::common_test::setup(); let group_a = concat!(function_name!(), "_a").to_string(); let group_b = concat!(function_name!(), "_b").to_string(); @@ -136,8 +141,9 @@ async fn test_multiple_groups() { #[named] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_actor_leaves_pg_group_on_shutdown() { + crate::common_test::setup(); let (actor, handle) = Actor::spawn(None, TestActor, ()) .await .expect("Failed to spawn test actor"); @@ -161,8 +167,9 @@ async fn test_actor_leaves_pg_group_on_shutdown() { #[named] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_actor_leaves_pg_group_manually() { + crate::common_test::setup(); let group = function_name!().to_string(); let (actor, handle) = Actor::spawn(None, TestActor, ()) @@ -197,8 +204,9 @@ async fn test_actor_leaves_pg_group_manually() { #[named] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_pg_monitoring() { + crate::common_test::setup(); let group = function_name!().to_string(); let counter = Arc::new(AtomicU8::new(0u8)); @@ -285,26 +293,27 @@ async fn test_pg_monitoring() { ) .await; - // kill the pg member - test_actor.stop(None); - test_handle.await.expect("Actor cleanup failed"); - // it should have notified that it's unsubscribed - periodic_check( - || counter.load(Ordering::Relaxed) == 0, - Duration::from_secs(5), - ) - .await; - - // cleanup - monitor_actor.stop(None); - monitor_handle.await.expect("Actor cleanup failed"); + // // kill the pg member + // test_actor.stop(None); + // test_handle.await.expect("Actor cleanup failed"); + // // it should have notified that it's unsubscribed + // periodic_check( + // || counter.load(Ordering::Relaxed) == 0, + // Duration::from_secs(5), + // ) + // .await; + + // // cleanup + // monitor_actor.stop(None); + // monitor_handle.await.expect("Actor cleanup failed"); } #[named] #[cfg(feature = "cluster")] #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn local_vs_remote_pg_members() { + crate::common_test::setup(); use crate::ActorRuntime; let group = function_name!().to_string(); diff --git a/ractor/src/port/output/tests.rs b/ractor/src/port/output/tests.rs index a35198d3..5d2c0c99 100644 --- a/ractor/src/port/output/tests.rs +++ b/ractor/src/port/output/tests.rs @@ -15,8 +15,9 @@ use crate::{Actor, ActorRef}; use super::*; #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_single_forward() { + crate::common_test::setup(); struct TestActor; enum TestActorMessage { Stop, @@ -79,8 +80,9 @@ async fn test_single_forward() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_50_receivers() { + crate::common_test::setup(); struct TestActor; enum TestActorMessage { Stop, diff --git a/ractor/src/registry/tests.rs b/ractor/src/registry/tests.rs index e8558419..5dd41ae0 100644 --- a/ractor/src/registry/tests.rs +++ b/ractor/src/registry/tests.rs @@ -10,8 +10,9 @@ use crate::concurrency::Duration; use crate::{Actor, ActorProcessingErr, SpawnErr}; #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_basic_registation() { + crate::common_test::setup(); struct EmptyActor; #[async_trait::async_trait] @@ -43,8 +44,9 @@ async fn test_basic_registation() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_duplicate_registration() { + crate::common_test::setup(); struct EmptyActor; #[async_trait::async_trait] @@ -91,8 +93,9 @@ async fn test_duplicate_registration() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_actor_registry_unenrollment() { + crate::common_test::setup(); struct EmptyActor; #[async_trait::async_trait] @@ -160,8 +163,9 @@ mod pid_registry_tests { } #[crate::concurrency::test] - #[tracing_test::traced_test] + #[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn try_enroll_remote_actor() { + crate::common_test::setup(); struct EmptyActor; #[async_trait::async_trait] impl Actor for EmptyActor { @@ -206,8 +210,9 @@ mod pid_registry_tests { } #[crate::concurrency::test] - #[tracing_test::traced_test] + #[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_basic_registation() { + crate::common_test::setup(); struct EmptyActor; #[async_trait::async_trait] @@ -240,8 +245,9 @@ mod pid_registry_tests { } #[crate::concurrency::test] - #[tracing_test::traced_test] + #[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_actor_registry_unenrollment() { + crate::common_test::setup(); struct EmptyActor; #[async_trait::async_trait] @@ -282,8 +288,9 @@ mod pid_registry_tests { } #[crate::concurrency::test] - #[tracing_test::traced_test] + #[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_pid_lifecycle_monitoring() { + crate::common_test::setup(); let counter = Arc::new(DashMap::new()); struct AutoJoinActor; diff --git a/ractor/src/rpc/tests.rs b/ractor/src/rpc/tests.rs index 7e2add54..1c8902bf 100644 --- a/ractor/src/rpc/tests.rs +++ b/ractor/src/rpc/tests.rs @@ -16,8 +16,9 @@ use crate::{cast, forward, Actor, ActorRef}; use crate::{rpc, ActorProcessingErr}; #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_rpc_cast() { + crate::common_test::setup(); let counter = Arc::new(AtomicU8::new(0u8)); struct TestActor { @@ -75,8 +76,9 @@ async fn test_rpc_cast() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_rpc_call() { + crate::common_test::setup(); struct TestActor; enum MessageFormat { Rpc(rpc::RpcReplyPort), @@ -169,8 +171,9 @@ async fn test_rpc_call() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_rpc_call_forwarding() { + crate::common_test::setup(); struct Worker; enum WorkerMessage { @@ -327,8 +330,9 @@ async fn test_rpc_call_forwarding() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_multi_call() { + crate::common_test::setup(); struct TestActor; enum MessageFormat { Rpc(rpc::RpcReplyPort), diff --git a/ractor/src/serialization.rs b/ractor/src/serialization.rs index a4b6c35a..fe1ecd3e 100644 --- a/ractor/src/serialization.rs +++ b/ractor/src/serialization.rs @@ -199,6 +199,8 @@ mod tests { paste::item! { #[test] fn [< test_bytes_conversion_ $ty >] () { + crate::common_test::setup(); + let test_data: $ty = rand::thread_rng().gen(); let bytes = test_data.clone().into_bytes(); let back = <$ty as BytesConvertable>::from_bytes(bytes); @@ -217,6 +219,8 @@ mod tests { paste::item! { #[test] fn [< test_bytes_conversion_vec_ $ty >] () { + crate::common_test::setup(); + let mut rng = rand::thread_rng(); let num_pts: usize = rng.gen_range(10..50); let test_data = (0..num_pts).into_iter().map(|_| rng.gen()).collect::>(); @@ -252,6 +256,7 @@ mod tests { #[test] #[allow(non_snake_case)] fn test_bytes_conversion_String() { + crate::common_test::setup(); let test_data: String = random_string(); let bytes = test_data.clone().into_bytes(); let back = ::from_bytes(bytes); @@ -275,6 +280,7 @@ mod tests { #[test] fn test_boxed_downcast_error() { + crate::common_test::setup(); let err = BoxedDowncastErr; println!("{err}"); println!("{err:?}"); diff --git a/ractor/src/tests.rs b/ractor/src/tests.rs index d56e3a7e..8c60db58 100644 --- a/ractor/src/tests.rs +++ b/ractor/src/tests.rs @@ -5,6 +5,7 @@ //! Basic tests of errors, error conversions, etc +use crate::concurrency::Duration; use crate::Actor; use crate::ActorCell; use crate::ActorProcessingErr; @@ -12,8 +13,10 @@ use crate::ActorRef; use crate::RactorErr; #[test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] fn test_error_conversions() { + crate::common_test::setup(); + let spawn = crate::SpawnErr::StartupCancelled; let ractor_err = RactorErr::<()>::from(crate::SpawnErr::StartupCancelled); assert_eq!(spawn.to_string(), ractor_err.to_string()); @@ -39,8 +42,10 @@ fn test_error_conversions() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_error_message_extraction() { + crate::common_test::setup(); + struct TestActor; #[async_trait::async_trait] @@ -76,3 +81,10 @@ async fn test_error_message_extraction() { assert!(!err.has_message()); assert!(err.try_get_message().is_none()); } + +#[crate::concurrency::test] +async fn test_platform_sleep_works() { + crate::common_test::setup(); + crate::concurrency::sleep(Duration::from_millis(100)).await; + assert!(true); +} diff --git a/ractor/src/time/tests.rs b/ractor/src/time/tests.rs index 22bbe809..d16cc31c 100644 --- a/ractor/src/time/tests.rs +++ b/ractor/src/time/tests.rs @@ -15,8 +15,9 @@ use crate::{common_test::periodic_check, concurrency::Duration, ActorProcessingE use crate::{Actor, ActorRef}; #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_intervals() { + crate::common_test::setup(); let counter = Arc::new(AtomicU8::new(0u8)); struct TestActor { @@ -79,8 +80,9 @@ async fn test_intervals() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_send_after() { + crate::common_test::setup(); let counter = Arc::new(AtomicU8::new(0u8)); struct TestActor { @@ -143,8 +145,9 @@ async fn test_send_after() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_exit_after() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait] @@ -175,8 +178,9 @@ async fn test_exit_after() { } #[crate::concurrency::test] -#[tracing_test::traced_test] +#[cfg_attr(not(target_arch = "wasm32"), tracing_test::traced_test)] async fn test_kill_after() { + crate::common_test::setup(); struct TestActor; #[async_trait::async_trait]