diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index 13f05b54..ab50b44e 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -146,7 +146,12 @@ impl std::fmt::Display for SupervisionEvent { write!(f, "Actor panicked {actor:?} - {panic_msg}") } SupervisionEvent::ProcessGroupChanged(change) => { - write!(f, "Process group {} changed", change.get_group()) + write!( + f, + "Process group {} in scope {} changed", + change.get_group(), + change.get_scope() + ) } #[cfg(feature = "cluster")] SupervisionEvent::PidLifecycleEvent(change) => { diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index b2b1505e..264e3772 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -198,6 +198,9 @@ pub type ActorName = String; /// A process group's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) pub type GroupName = String; +/// A scope's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) +pub type ScopeName = String; + /// Represents the state of an actor. Must be safe /// to send between threads (same bounds as a [Message]) pub trait State: std::any::Any + Send + 'static {} diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 1c83bddb..0b205f28 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -26,10 +26,16 @@ use dashmap::DashMap; use once_cell::sync::OnceCell; -use crate::{ActorCell, ActorId, GroupName, SupervisionEvent}; +use crate::{ActorCell, ActorId, GroupName, ScopeName, SupervisionEvent}; -/// Key to monitor all of the groups -pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; +/// Key to set the default scope +pub const DEFAULT_SCOPE: &str = "__default_scope__"; + +/// Key to monitor all of the scopes +pub const ALL_SCOPES_NOTIFICATION: &str = "__world_scope__"; + +/// Key to monitor all of the groups in a scope +pub const ALL_GROUPS_NOTIFICATION: &str = "__world_group_"; #[cfg(test)] mod tests; @@ -38,24 +44,53 @@ mod tests; #[derive(Clone)] pub enum GroupChangeMessage { /// Some actors joined a group - Join(GroupName, Vec), + Join(ScopeName, GroupName, Vec), /// Some actors left a group - Leave(GroupName, Vec), + Leave(ScopeName, GroupName, Vec), } impl GroupChangeMessage { /// Retrieve the group that changed pub fn get_group(&self) -> GroupName { match self { - Self::Join(name, _) => name.clone(), - Self::Leave(name, _) => name.clone(), + Self::Join(_, name, _) => name.clone(), + Self::Leave(_, name, _) => name.clone(), + } + } + + /// Retrieve the name of the scope in which the group change took place + pub fn get_scope(&self) -> ScopeName { + match self { + Self::Join(scope, _, _) => scope.to_string(), + Self::Leave(scope, _, _) => scope.to_string(), } } } +/// Represents the combination of a `ScopeName` and a `GroupName` +/// that uniquely identifies a specific group in a specific scope +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct ScopeGroupKey { + /// the `ScopeName` + scope: ScopeName, + /// The `GroupName` + group: GroupName, +} + +impl ScopeGroupKey { + /// Retrieve the struct's scope + pub fn get_scope(&self) -> ScopeName { + self.scope.to_owned() + } + /// Retrieve the struct's group + pub fn get_group(&self) -> GroupName { + self.group.to_owned() + } +} + struct PgState { - map: Arc>>, - listeners: Arc>>, + map: Arc>>, + listeners: Arc>>, } static PG_MONITOR: OnceCell = OnceCell::new(); @@ -67,14 +102,28 @@ fn get_monitor<'a>() -> &'a PgState { }) } -/// Join actors to the group `group` +/// Join actors to the group `group` in the default scope /// -/// * `group` - The statically named group. Will be created if first actors to join +/// * `group` - The named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group pub fn join(group: GroupName, actors: Vec) { + join_scoped(DEFAULT_SCOPE.to_owned(), group, actors); +} + +/// Join actors to the group `group` within the scope `scope` +/// +/// * `scope` - the named scope. Will be created if first actors to join +/// * `group` - The named group. Will be created if first actors to join +/// * `actors` - The list of [crate::Actor]s to add to the group +pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); + // insert into the monitor group - match monitor.map.entry(group.clone()) { + match monitor.map.entry(key.to_owned()) { Occupied(mut occupied) => { let oref = occupied.get_mut(); for actor in actors.iter() { @@ -90,53 +139,77 @@ pub fn join(group: GroupName, actors: Vec) { } } // notify supervisors - if let Some(listeners) = monitor.listeners.get(&group) { + if let Some(listeners) = monitor.listeners.get(&key) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(group.clone(), actors.clone()), + GroupChangeMessage::Join(scope.to_owned(), group.clone(), actors.clone()), )); } } // notify the world monitors - if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { - for listener in listeners.value() { - let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(group.clone(), actors.clone()), - )); + let world_monitor_keys = get_world_monitor_keys(); + for key in world_monitor_keys { + if let Some(listeners) = monitor.listeners.get(&key) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Join(scope.to_owned(), group.clone(), actors.clone()), + )); + } } } } -/// Leaves the specified [crate::Actor]s from the PG group +/// Leaves the specified [crate::Actor]s from the PG group in the default scope /// -/// * `group` - The statically named group +/// * `group` - A named group /// * `actors` - The list of actors to remove from the group pub fn leave(group: GroupName, actors: Vec) { + leave_scoped(DEFAULT_SCOPE.to_owned(), group, actors); +} + +/// Leaves the specified [crate::Actor]s from the PG group within the scope `scope` +/// +/// * `scope` - A named scope +/// * `group` - A named group +/// * `actors` - The list of actors to remove from the group +pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); - match monitor.map.entry(group.clone()) { + match monitor.map.entry(key.to_owned()) { Vacant(_) => {} Occupied(mut occupied) => { let mut_ref = occupied.get_mut(); for actor in actors.iter() { mut_ref.remove(&actor.get_id()); } - // the group is empty, remove it + + // the scope and group tuple is empty, remove it if mut_ref.is_empty() { occupied.remove(); } - if let Some(listeners) = monitor.listeners.get(&group) { + if let Some(listeners) = monitor.listeners.get(&key) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), actors.clone()), + GroupChangeMessage::Leave(scope.to_owned(), group.clone(), actors.clone()), )); } } // notify the world monitors - if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { - for listener in listeners.value() { - let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), actors.clone()), - )); + let world_monitor_keys = get_world_monitor_keys(); + for key in world_monitor_keys { + if let Some(listeners) = monitor.listeners.get(&key) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt( + SupervisionEvent::ProcessGroupChanged(GroupChangeMessage::Leave( + scope.to_owned(), + group.clone(), + actors.clone(), + )), + ); + } } } } @@ -163,19 +236,31 @@ pub(crate) fn leave_all(actor: ActorId) { // notify the listeners let all_listeners = pg_monitor.listeners.clone(); - for (group, cell) in removal_events.into_iter() { - if let Some(this_listeners) = all_listeners.get(&group) { + for (scope_and_group, cell) in removal_events.into_iter() { + if let Some(this_listeners) = all_listeners.get(&scope_and_group) { this_listeners.iter().for_each(|listener| { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), vec![cell.clone()]), + GroupChangeMessage::Leave( + scope_and_group.scope.clone(), + scope_and_group.group.clone(), + vec![cell.clone()], + ), )); }); } // notify the world monitors - if let Some(listeners) = all_listeners.get(ALL_GROUPS_NOTIFICATION) { + let world_monitor_scoped = ScopeGroupKey { + scope: scope_and_group.scope, + group: ALL_GROUPS_NOTIFICATION.to_owned(), + }; + if let Some(listeners) = all_listeners.get(&world_monitor_scoped) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), vec![cell.clone()]), + GroupChangeMessage::Leave( + world_monitor_scoped.scope.clone(), + world_monitor_scoped.group.clone(), + vec![cell.clone()], + ), )); } } @@ -187,14 +272,30 @@ pub(crate) fn leave_all(actor: ActorId) { } } -/// Returns all the actors running on the local node in the group `group`. +/// Returns all actors running on the local node in the group `group` +/// in the default scope. /// -/// * `group_name` - Either a statically named group or scope +/// * `group` - A named group /// /// Returns a [`Vec`] representing the members of this paging group -pub fn get_local_members(group_name: &GroupName) -> Vec { +pub fn get_local_members(group: &GroupName) -> Vec { + get_scoped_local_members(&DEFAULT_SCOPE.to_owned(), group) +} + +/// Returns all actors running on the local node in the group `group` +/// in scope `scope` +/// +/// * `scope_name` - A named scope +/// * `group_name` - A named group +/// +/// Returns a [`Vec`] representing the members of this paging group +pub fn get_scoped_local_members(scope: &ScopeName, group: &GroupName) -> Vec { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(group_name) { + if let Some(actors) = monitor.map.get(&key) { actors .value() .values() @@ -206,14 +307,30 @@ pub fn get_local_members(group_name: &GroupName) -> Vec { } } -/// Returns all the actors running on any node in the group `group`. +/// Returns all the actors running on any node in the group `group` +/// in the default scope. /// -/// * `group_name` - Either a statically named group or scope +/// * `group_name` - A named group /// /// Returns a [`Vec`] with the member actors pub fn get_members(group_name: &GroupName) -> Vec { + get_scoped_members(&DEFAULT_SCOPE.to_owned(), group_name) +} + +/// Returns all the actors running on any node in the group `group` +/// in the scope `scope`. +/// +/// * `scope` - A named scope +/// * `group` - A named group +/// +/// Returns a [`Vec`] with the member actors +pub fn get_scoped_members(scope: &ScopeName, group: &GroupName) -> Vec { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(group_name) { + if let Some(actors) = monitor.map.get(&key) { actors.value().values().cloned().collect::>() } else { vec![] @@ -224,6 +341,44 @@ pub fn get_members(group_name: &GroupName) -> Vec { /// /// Returns a [`Vec`] representing all the registered group names pub fn which_groups() -> Vec { + let monitor = get_monitor(); + let mut groups = monitor + .map + .iter() + .map(|kvp| kvp.key().group.to_owned()) + .collect::>(); + groups.sort_unstable(); + groups.dedup(); + groups +} + +/// Returns a list of all known groups in scope `scope` +/// +/// * `scope` - The scope to retrieve the groups from +/// +/// Returns a [`Vec`] representing all the registered group names +/// in `scope` +pub fn which_scoped_groups(scope: &ScopeName) -> Vec { + let monitor = get_monitor(); + monitor + .map + .iter() + .filter_map(|kvp| { + let key = kvp.key(); + if key.scope == *scope { + Some(key.group.to_owned()) + } else { + None + } + }) + .collect::>() +} + +/// Returns a list of all known scope-group combinations. +/// +/// Returns a [`Vec<(ScopGroupKey)>`] representing all the registered +/// combinations that form an identifying tuple +pub fn which_scopes_and_groups() -> Vec { let monitor = get_monitor(); monitor .map @@ -232,13 +387,33 @@ pub fn which_groups() -> Vec { .collect::>() } -/// Subscribes the provided [crate::Actor] to the scope or group for updates +/// Returns a list of all known scopes +/// +/// Returns a [`Vec`] representing all the registered scopes +pub fn which_scopes() -> Vec { + let monitor = get_monitor(); + monitor + .map + .iter() + .map(|kvp| { + let key = kvp.key(); + key.scope.to_owned() + }) + .collect::>() +} + +/// Subscribes the provided [crate::Actor] to the group in the default scope +/// for updates /// -/// * `group_name` - The scope or group to monitor +/// * `group_name` - The group to monitor /// * `actor` - The [ActorCell] representing who will receive updates -pub fn monitor(group_name: GroupName, actor: ActorCell) { +pub fn monitor(group: GroupName, actor: ActorCell) { + let key = ScopeGroupKey { + scope: DEFAULT_SCOPE.to_owned(), + group, + }; let monitor = get_monitor(); - match monitor.listeners.entry(group_name) { + match monitor.listeners.entry(key) { Occupied(mut occupied) => occupied.get_mut().push(actor), Vacant(vacancy) => { vacancy.insert(vec![actor]); @@ -246,13 +421,52 @@ pub fn monitor(group_name: GroupName, actor: ActorCell) { } } -/// Unsubscribes the provided [crate::Actor] from the scope or group for updates +/// Subscribes the provided [crate::Actor] to the scope for updates /// -/// * `group_name` - The scope or group to monitor +/// * `scope` - the scope to monitor /// * `actor` - The [ActorCell] representing who will receive updates +pub fn monitor_scope(scope: ScopeName, actor: ActorCell) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: ALL_GROUPS_NOTIFICATION.to_owned(), + }; + let monitor = get_monitor(); + + // Register at world monitor first + match monitor.listeners.entry(key) { + Occupied(mut occupied) => occupied.get_mut().push(actor.clone()), + Vacant(vacancy) => { + vacancy.insert(vec![actor.clone()]); + } + } + + let groups_in_scope = which_scoped_groups(&scope); + for group in groups_in_scope { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group, + }; + match monitor.listeners.entry(key) { + Occupied(mut occupied) => occupied.get_mut().push(actor.clone()), + Vacant(vacancy) => { + vacancy.insert(vec![actor.clone()]); + } + } + } +} + +/// Unsubscribes the provided [crate::Actor] for updates from the group +/// in default scope +/// +/// * `group_name` - The group to demonitor +/// * `actor` - The [ActorCell] representing who will no longer receive updates pub fn demonitor(group_name: GroupName, actor: ActorId) { + let key = ScopeGroupKey { + scope: DEFAULT_SCOPE.to_owned(), + group: group_name, + }; let monitor = get_monitor(); - if let Occupied(mut entry) = monitor.listeners.entry(group_name) { + if let Occupied(mut entry) = monitor.listeners.entry(key) { let mut_ref = entry.get_mut(); mut_ref.retain(|a| a.get_id() != actor); if mut_ref.is_empty() { @@ -261,7 +475,29 @@ pub fn demonitor(group_name: GroupName, actor: ActorId) { } } -/// Remove the specified [ActorId] from monitoring all groups it might be. +/// Unsubscribes the provided [crate::Actor] from the scope for updates +/// +/// * `scope` - The scope to demonitor +/// * `actor` - The [ActorCell] representing who will no longer receive updates +pub fn demonitor_scope(scope: ScopeName, actor: ActorId) { + let monitor = get_monitor(); + let groups_in_scope = which_scoped_groups(&scope); + for group in groups_in_scope { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group, + }; + if let Occupied(mut entry) = monitor.listeners.entry(key) { + let mut_ref = entry.get_mut(); + mut_ref.retain(|a| a.get_id() != actor); + if mut_ref.is_empty() { + entry.remove(); + } + } + } +} + +/// Remove the specified [ActorId] from monitoring all groups it might be in. /// Used only during actor shutdown pub(crate) fn demonitor_all(actor: ActorId) { let monitor = get_monitor(); @@ -280,3 +516,26 @@ pub(crate) fn demonitor_all(actor: ActorId) { monitor.listeners.remove(&empty_group); } } + +/// Gets the keys for the world monitors. +/// +/// Returns a `Vec` represending all registered tuples +/// for which ane of the values is equivalent to one of the world_monitor_keys +fn get_world_monitor_keys() -> Vec { + let monitor = get_monitor(); + let mut world_monitor_keys = monitor + .listeners + .iter() + .filter_map(|kvp| { + let key = kvp.key().clone(); + if key.scope == ALL_SCOPES_NOTIFICATION || key.group == ALL_GROUPS_NOTIFICATION { + Some(key) + } else { + None + } + }) + .collect::>(); + world_monitor_keys.sort_unstable(); + world_monitor_keys.dedup(); + world_monitor_keys +} diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 8138c535..6d50a40a 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -10,9 +10,9 @@ use crate::common_test::periodic_check; use crate::concurrency::Duration; use ::function_name::named; -use crate::{Actor, ActorProcessingErr, GroupName, SupervisionEvent}; +use crate::{Actor, ActorProcessingErr, GroupName, ScopeName, SupervisionEvent}; -use crate::pg; +use crate::pg::{self}; struct TestActor; @@ -34,7 +34,7 @@ impl Actor for TestActor { #[named] #[crate::concurrency::test] #[tracing_test::traced_test] -async fn test_basic_group() { +async fn test_basic_group_in_default_scope() { let (actor, handle) = Actor::spawn(None, TestActor, ()) .await .expect("Failed to spawn test actor"); @@ -52,6 +52,66 @@ async fn test_basic_group() { handle.await.expect("Actor cleanup failed"); } +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_basic_group_in_named_scope() { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + // join the group + pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); + + let members = pg::get_scoped_members(&scope, &group); + assert_eq!(1, members.len()); + + // Cleanup + actor.stop(None); + handle.await.expect("Actor cleanup failed"); +} + +// #[named] +// #[crate::concurrency::test] +// #[tracing_test::traced_test] +// async fn test_which_scopes_and_groups() { +// let (actor, handle) = Actor::spawn(None, TestActor, ()) +// .await +// .expect("Failed to spawn test actor"); + +// let scope_a = concat!(function_name!(), "_a").to_string(); +// let scope_b = concat!(function_name!(), "_b").to_string(); +// let group_a = concat!(function_name!(), "_a").to_string(); +// let group_b = concat!(function_name!(), "_b").to_string(); + +// // join all scopes twice with each group +// let scope_group = [ +// (scope_a.clone(), group_a.clone()), +// (scope_a.clone(), group_b.clone()), +// (scope_b.clone(), group_a.clone()), +// (scope_b.clone(), group_b.clone()), +// ]; + +// for (scope, group) in scope_group.iter() { +// pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); +// pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); +// } + +// let scopes_and_groups = which_scopes_and_groups(); +// println!("Scopes and groups are: {:#?}", scopes_and_groups); +// assert_eq!(4, scopes_and_groups.len()); + +// // Cleanup +// actor.stop(None); +// handle.await.expect("Actor cleanup failed"); + +// let scopes_and_groups = which_scopes_and_groups(); +// assert!(scopes_and_groups.is_empty()); +// } + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -89,6 +149,84 @@ async fn test_multiple_members_in_group() { } } +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_multiple_members_in_scoped_group() { + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let mut actors = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor); + handles.push(handle); + } + + // join the group + pg::join_scoped( + scope.clone(), + group.clone(), + actors + .iter() + .map(|aref| aref.clone().get_cell()) + .collect::>(), + ); + + let members = pg::get_scoped_members(&scope, &group); + assert_eq!(10, members.len()); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } +} + +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_which_scoped_groups() { + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let mut actors = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor); + handles.push(handle); + } + + // join the group + pg::join_scoped( + scope.clone(), + group.clone(), + actors + .iter() + .map(|aref| aref.clone().get_cell()) + .collect::>(), + ); + + let groups_in_scope = pg::which_scoped_groups(&scope); + assert_eq!(vec![scope.clone()], groups_in_scope); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -134,6 +272,72 @@ async fn test_multiple_groups() { } } +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_multiple_groups_in_multiple_scopes() { + let scope_a = concat!(function_name!(), "_b").to_string(); + let scope_b = concat!(function_name!(), "_b").to_string(); + + let group_a = concat!(function_name!(), "_a").to_string(); + let group_b = concat!(function_name!(), "_b").to_string(); + + let mut actors = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor); + handles.push(handle); + } + + // setup scope_a and scope_b, and group_a and group_b + let these_actors = actors[0..5] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_scoped(scope_a.clone(), group_a.clone(), these_actors); + + let these_actors = actors[5..10] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_scoped(scope_a.clone(), group_b.clone(), these_actors); + + let these_actors = actors[0..5] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_scoped(scope_b.clone(), group_a.clone(), these_actors); + + let these_actors = actors[5..10] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_scoped(scope_b.clone(), group_b.clone(), these_actors); + + let members = pg::get_scoped_members(&scope_a, &group_a); + assert_eq!(5, members.len()); + + let members = pg::get_scoped_members(&scope_a, &group_b); + assert_eq!(5, members.len()); + + let members = pg::get_scoped_members(&scope_b, &group_a); + assert_eq!(5, members.len()); + + let members = pg::get_scoped_members(&scope_b, &group_b); + assert_eq!(5, members.len()); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -159,6 +363,32 @@ async fn test_actor_leaves_pg_group_on_shutdown() { assert_eq!(0, members.len()); } +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_actor_leaves_scope_on_shupdown() { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + // join the scope and group + pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); + + let members = pg::get_scoped_members(&scope, &group); + assert_eq!(1, members.len()); + + // Cleanup + actor.stop(None); + handle.await.expect("Actor cleanup failed"); + drop(actor); + + let members = pg::get_scoped_members(&scope, &group); + assert_eq!(0, members.len()); +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -195,6 +425,51 @@ async fn test_actor_leaves_pg_group_manually() { handle.await.expect("Actor cleanup failed"); } +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_actor_leaves_scope_manually() { + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + + // join the group in scope (create on first use) + pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); + + // the scope was created and is present + let scopes = pg::which_scopes(); + assert!(scopes.contains(&scope)); + + // the group was created and is present + let groups = pg::which_groups(); + assert!(groups.contains(&group)); + + let members = pg::get_scoped_members(&scope, &group); + assert_eq!(1, members.len()); + + // leave the group + pg::leave_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); + + // pif-paf-poof the scope is gone! + let scopes = pg::which_scopes(); + assert!(!scopes.contains(&scope)); + + // pif-paf-poof the group is gone! + let groups = pg::which_groups(); + assert!(!groups.contains(&group)); + + // members comes back empty + let members = pg::get_scoped_members(&scope, &group); + assert_eq!(0, members.len()); + + // Cleanup + actor.stop(None); + handle.await.expect("Actor cleanup failed"); +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -251,10 +526,10 @@ async fn test_pg_monitoring() { ) -> Result<(), ActorProcessingErr> { if let SupervisionEvent::ProcessGroupChanged(change) = message { match change { - pg::GroupChangeMessage::Join(_which, who) => { + pg::GroupChangeMessage::Join(_scope, _which, who) => { self.counter.fetch_add(who.len() as u8, Ordering::Relaxed); } - pg::GroupChangeMessage::Leave(_which, who) => { + pg::GroupChangeMessage::Leave(_scope, _which, who) => { self.counter.fetch_sub(who.len() as u8, Ordering::Relaxed); } } @@ -300,6 +575,146 @@ async fn test_pg_monitoring() { monitor_handle.await.expect("Actor cleanup failed"); } +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_scope_monitoring() { + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let counter = Arc::new(AtomicU8::new(0u8)); + + struct AutoJoinActor { + scope: ScopeName, + pg_group: GroupName, + } + + #[async_trait::async_trait] + impl Actor for AutoJoinActor { + type Msg = (); + type Arguments = (); + type State = (); + + async fn pre_start( + &self, + myself: crate::ActorRef, + _: (), + ) -> Result { + pg::join_scoped( + self.scope.clone(), + self.pg_group.clone(), + vec![myself.into()], + ); + Ok(()) + } + } + + struct NotificationMonitor { + scope: ScopeName, + counter: Arc, + } + + #[async_trait::async_trait] + impl Actor for NotificationMonitor { + type Msg = (); + type Arguments = (); + type State = (); + + async fn pre_start( + &self, + myself: crate::ActorRef, + _: (), + ) -> Result { + pg::monitor_scope(self.scope.clone(), myself.into()); + Ok(()) + } + + async fn handle_supervisor_evt( + &self, + _myself: crate::ActorRef, + message: SupervisionEvent, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + if let SupervisionEvent::ProcessGroupChanged(change) = message { + match change { + pg::GroupChangeMessage::Join(scope_name, _which, who) => { + // ensure this test can run concurrently to others + if scope_name == function_name!() { + self.counter.fetch_add(who.len() as u8, Ordering::Relaxed); + } + } + pg::GroupChangeMessage::Leave(scope_name, _which, who) => { + // ensure this test can run concurrently to others + if scope_name == function_name!() { + self.counter.fetch_sub(who.len() as u8, Ordering::Relaxed); + } + } + } + } + Ok(()) + } + } + + let (monitor_actor, monitor_handle) = Actor::spawn( + None, + NotificationMonitor { + scope: scope.clone(), + counter: counter.clone(), + }, + (), + ) + .await + .expect("Failed to start monitor actor"); + + // this actor's startup should notify the "monitor" for scope changes + let (test_actor, test_handle) = Actor::spawn( + None, + AutoJoinActor { + scope: scope.clone(), + pg_group: group.clone(), + }, + (), + ) + .await + .expect("Failed to start test actor"); + + // start a second actor in the same scope to test if we multiply messages exponentially + let (test_actor1, test_handle1) = Actor::spawn( + None, + AutoJoinActor { + scope: scope.clone(), + pg_group: group.clone(), + }, + (), + ) + .await + .expect("Failed to start test actor"); + + // the monitor is notified async, so we need to wait a bit + periodic_check( + || counter.load(Ordering::Relaxed) == 2, + Duration::from_secs(5), + ) + .await; + + // kill the scope members + test_actor.stop(None); + test_handle.await.expect("Actor cleanup failed"); + test_actor1.stop(None); + test_handle1.await.expect("Actor cleanup failed"); + + // it should have notified that it's unsubscribed + periodic_check( + || counter.load(Ordering::Relaxed) == 0, + Duration::from_secs(5), + ) + .await; + + // cleanup + monitor_actor.stop(None); + monitor_handle.await.expect("Actor cleanup failed"); +} + #[named] #[cfg(feature = "cluster")] #[crate::concurrency::test] @@ -369,3 +784,74 @@ async fn local_vs_remote_pg_members() { handle.await.expect("Actor cleanup failed"); } } + +#[named] +#[cfg(feature = "cluster")] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn local_vs_remote_pg_members_in_named_scopes() { + use crate::ActorRuntime; + + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + struct TestRemoteActor; + struct TestRemoteActorMessage; + impl crate::Message for TestRemoteActorMessage {} + #[async_trait::async_trait] + impl Actor for TestRemoteActor { + type Msg = TestRemoteActorMessage; + type State = (); + type Arguments = (); + async fn pre_start( + &self, + _this_actor: crate::ActorRef, + _: (), + ) -> Result { + Ok(()) + } + } + + let remote_pid = crate::ActorId::Remote { node_id: 1, pid: 1 }; + + let mut actors: Vec = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor.into()); + handles.push(handle); + } + let (actor, handle) = ActorRuntime::spawn_linked_remote( + None, + TestRemoteActor, + remote_pid, + (), + actors.first().unwrap().clone(), + ) + .await + .expect("Failed to spawn remote actor"); + println!("Spawned {}", actor.get_id()); + + actors.push(actor.into()); + handles.push(handle); + + // join the group in scope + pg::join_scoped(scope.clone(), group.clone(), actors.to_vec()); + + // assert + let members = pg::get_scoped_local_members(&scope, &group); + assert_eq!(10, members.len()); + + let members = pg::get_scoped_members(&scope, &group); + assert_eq!(11, members.len()); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } +} diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 0b13913d..15de5bd3 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -12,7 +12,7 @@ use std::convert::TryInto; use std::net::SocketAddr; use ractor::message::SerializedMessage; -use ractor::pg::GroupChangeMessage; +use ractor::pg::{get_scoped_local_members, which_scopes_and_groups, GroupChangeMessage}; use ractor::registry::PidLifecycleEvent; use ractor::rpc::CallResult; use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, SpawnErr, SupervisionEvent}; @@ -682,16 +682,21 @@ impl NodeSession { state.tcp_send_control(msg); } + // setup scope monitoring + ractor::pg::monitor_scope( + ractor::pg::ALL_SCOPES_NOTIFICATION.to_string(), + myself.get_cell(), + ); // setup PG monitoring ractor::pg::monitor( ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(), myself.get_cell(), ); - // Scan all PG groups + synchronize them - let groups = ractor::pg::which_groups(); - for group in groups { - let local_members = ractor::pg::get_local_members(&group) + // Scan all scopes with their PG groups + synchronize them + let scopes_and_groups = which_scopes_and_groups(); + for key in scopes_and_groups { + let local_members = get_scoped_local_members(&key.get_scope(), &key.get_group()) .into_iter() .filter(|v| v.supports_remoting()) .map(|act| control_protocol::Actor { @@ -703,7 +708,8 @@ impl NodeSession { let control_message = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { - group, + scope: key.get_scope(), + group: key.get_group(), actors: local_members, }, )), @@ -873,6 +879,10 @@ impl Actor for NodeSession { _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { // unhook monitoring sessions + ractor::pg::demonitor_scope( + ractor::pg::ALL_SCOPES_NOTIFICATION.to_string(), + myself.get_id(), + ); ractor::pg::demonitor( ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(), myself.get_id(), @@ -998,7 +1008,7 @@ impl Actor for NodeSession { } // ======== Lifecycle event handlers (PG groups + PID registry) ======== // SupervisionEvent::ProcessGroupChanged(change) => match change { - GroupChangeMessage::Join(group, actors) => { + GroupChangeMessage::Join(scope, group, actors) => { let filtered = actors .into_iter() .filter(|act| act.supports_remoting()) @@ -1011,6 +1021,7 @@ impl Actor for NodeSession { let msg = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { + scope, group, actors: filtered, }, @@ -1019,7 +1030,7 @@ impl Actor for NodeSession { state.tcp_send_control(msg); } } - GroupChangeMessage::Leave(group, actors) => { + GroupChangeMessage::Leave(scope, group, actors) => { let filtered = actors .into_iter() .filter(|act| act.supports_remoting()) @@ -1032,6 +1043,7 @@ impl Actor for NodeSession { let msg = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgLeave( control_protocol::PgLeave { + scope, group, actors: filtered, }, diff --git a/ractor_cluster/src/node/node_session/tests.rs b/ractor_cluster/src/node/node_session/tests.rs index 6217e155..011c7baa 100644 --- a/ractor_cluster/src/node/node_session/tests.rs +++ b/ractor_cluster/src/node/node_session/tests.rs @@ -760,6 +760,7 @@ async fn node_session_handle_control() { .expect("Failed to process control message"); assert_eq!(0, state.remote_actors.len()); + let scope_name = "node_session_test_scope"; let group_name = "node_session_handle_control"; // check pg join spawns + joins to a pg group @@ -769,6 +770,7 @@ async fn node_session_handle_control() { control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { + scope: scope_name.to_string(), group: group_name.to_string(), actors: vec![control_protocol::Actor { name: None, @@ -798,6 +800,7 @@ async fn node_session_handle_control() { control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgLeave( control_protocol::PgLeave { + scope: scope_name.to_string(), group: group_name.to_string(), actors: vec![control_protocol::Actor { name: None, diff --git a/ractor_cluster/src/protocol/control.proto b/ractor_cluster/src/protocol/control.proto index eba32eee..dd5804e2 100644 --- a/ractor_cluster/src/protocol/control.proto +++ b/ractor_cluster/src/protocol/control.proto @@ -56,6 +56,8 @@ message PgJoin { string group = 1; // The actors repeated Actor actors = 2; + // the scope + string scope = 3; } // Process group leave occurred @@ -64,6 +66,8 @@ message PgLeave { string group = 1; // The actors repeated Actor actors = 2; + // The scope + string scope = 3; } // A collection of NodeSession endpoints @@ -93,4 +97,4 @@ message ControlMessage { // The list of node sessions on the remote host for transitive connections NodeSessions node_sessions = 8; } -} \ No newline at end of file +}