From 1da7ab05dc093d3eea3c4569dc5b9ac93cf849a4 Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Wed, 11 Oct 2023 21:51:34 +0200 Subject: [PATCH 01/17] Add the concept of scopes Introduces primitives and and adjusts current code to use the `Default` scope, implicitly wherever possible. --- ractor/src/actor/messages.rs | 7 ++- ractor/src/lib.rs | 3 ++ ractor/src/pg/mod.rs | 53 +++++++++++++++---- ractor/src/pg/tests.rs | 4 +- ractor_cluster/src/node/node_session/mod.rs | 10 ++-- ractor_cluster/src/node/node_session/tests.rs | 4 ++ ractor_cluster/src/protocol/control.proto | 12 +++-- 7 files changed, 72 insertions(+), 21 deletions(-) diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index 13f05b54..ab50b44e 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -146,7 +146,12 @@ impl std::fmt::Display for SupervisionEvent { write!(f, "Actor panicked {actor:?} - {panic_msg}") } SupervisionEvent::ProcessGroupChanged(change) => { - write!(f, "Process group {} changed", change.get_group()) + write!( + f, + "Process group {} in scope {} changed", + change.get_group(), + change.get_scope() + ) } #[cfg(feature = "cluster")] SupervisionEvent::PidLifecycleEvent(change) => { diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index b2b1505e..264e3772 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -198,6 +198,9 @@ pub type ActorName = String; /// A process group's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) pub type GroupName = String; +/// A scope's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) +pub type ScopeName = String; + /// Represents the state of an actor. Must be safe /// to send between threads (same bounds as a [Message]) pub trait State: std::any::Any + Send + 'static {} diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 1c83bddb..f45b3559 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -26,7 +26,7 @@ use dashmap::DashMap; use once_cell::sync::OnceCell; -use crate::{ActorCell, ActorId, GroupName, SupervisionEvent}; +use crate::{ActorCell, ActorId, GroupName, ScopeName, SupervisionEvent}; /// Key to monitor all of the groups pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; @@ -34,21 +34,52 @@ pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; #[cfg(test)] mod tests; +/// Scopes are sets of process groups. Each group is in exactly one scope. +/// A process may join any number of groups in any number of scopes. +/// +/// Scopes are designed to decouple single mesh into a set of overlay networks, +/// reducing amount of traffic required to propagate group membership information. +#[derive(Clone)] +pub enum Scope { + /// The default scope + Default, + /// A named scope + Named(ScopeName), +} + +impl Scope { + /// Returns the scope's name as String. `Scope::Default` returns "Default". + pub fn as_str(&self) -> String { + match self { + Scope::Default => "Default".to_string(), + Scope::Named(scope_name) => scope_name.to_string(), + } + } +} + /// Represents a change in a process group's membership #[derive(Clone)] pub enum GroupChangeMessage { /// Some actors joined a group - Join(GroupName, Vec), + Join(Scope, GroupName, Vec), /// Some actors left a group - Leave(GroupName, Vec), + Leave(Scope, GroupName, Vec), } impl GroupChangeMessage { /// Retrieve the group that changed pub fn get_group(&self) -> GroupName { match self { - Self::Join(name, _) => name.clone(), - Self::Leave(name, _) => name.clone(), + Self::Join(_, name, _) => name.clone(), + Self::Leave(_, name, _) => name.clone(), + } + } + + /// Retrieve the name of the scope in which the group change took place + pub fn get_scope(&self) -> ScopeName { + match self { + Self::Join(scope, _, _) => scope.as_str(), + Self::Leave(scope, _, _) => scope.as_str(), } } } @@ -93,7 +124,7 @@ pub fn join(group: GroupName, actors: Vec) { if let Some(listeners) = monitor.listeners.get(&group) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(group.clone(), actors.clone()), + GroupChangeMessage::Join(Scope::Default, group.clone(), actors.clone()), )); } } @@ -101,7 +132,7 @@ pub fn join(group: GroupName, actors: Vec) { if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(group.clone(), actors.clone()), + GroupChangeMessage::Join(Scope::Default, group.clone(), actors.clone()), )); } } @@ -127,7 +158,7 @@ pub fn leave(group: GroupName, actors: Vec) { if let Some(listeners) = monitor.listeners.get(&group) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), actors.clone()), + GroupChangeMessage::Leave(Scope::Default, group.clone(), actors.clone()), )); } } @@ -135,7 +166,7 @@ pub fn leave(group: GroupName, actors: Vec) { if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), actors.clone()), + GroupChangeMessage::Leave(Scope::Default, group.clone(), actors.clone()), )); } } @@ -167,7 +198,7 @@ pub(crate) fn leave_all(actor: ActorId) { if let Some(this_listeners) = all_listeners.get(&group) { this_listeners.iter().for_each(|listener| { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), vec![cell.clone()]), + GroupChangeMessage::Leave(Scope::Default, group.clone(), vec![cell.clone()]), )); }); } @@ -175,7 +206,7 @@ pub(crate) fn leave_all(actor: ActorId) { if let Some(listeners) = all_listeners.get(ALL_GROUPS_NOTIFICATION) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(group.clone(), vec![cell.clone()]), + GroupChangeMessage::Leave(Scope::Default, group.clone(), vec![cell.clone()]), )); } } diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 8138c535..1c15321d 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -251,10 +251,10 @@ async fn test_pg_monitoring() { ) -> Result<(), ActorProcessingErr> { if let SupervisionEvent::ProcessGroupChanged(change) = message { match change { - pg::GroupChangeMessage::Join(_which, who) => { + pg::GroupChangeMessage::Join(_scope, _which, who) => { self.counter.fetch_add(who.len() as u8, Ordering::Relaxed); } - pg::GroupChangeMessage::Leave(_which, who) => { + pg::GroupChangeMessage::Leave(_scope, _which, who) => { self.counter.fetch_sub(who.len() as u8, Ordering::Relaxed); } } diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 0b13913d..6fdc9c8c 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -12,7 +12,7 @@ use std::convert::TryInto; use std::net::SocketAddr; use ractor::message::SerializedMessage; -use ractor::pg::GroupChangeMessage; +use ractor::pg::{GroupChangeMessage, Scope}; use ractor::registry::PidLifecycleEvent; use ractor::rpc::CallResult; use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, SpawnErr, SupervisionEvent}; @@ -690,6 +690,7 @@ impl NodeSession { // Scan all PG groups + synchronize them let groups = ractor::pg::which_groups(); + let scope = Scope::Default; for group in groups { let local_members = ractor::pg::get_local_members(&group) .into_iter() @@ -703,6 +704,7 @@ impl NodeSession { let control_message = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { + scope: scope.as_str(), group, actors: local_members, }, @@ -998,7 +1000,7 @@ impl Actor for NodeSession { } // ======== Lifecycle event handlers (PG groups + PID registry) ======== // SupervisionEvent::ProcessGroupChanged(change) => match change { - GroupChangeMessage::Join(group, actors) => { + GroupChangeMessage::Join(scope, group, actors) => { let filtered = actors .into_iter() .filter(|act| act.supports_remoting()) @@ -1011,6 +1013,7 @@ impl Actor for NodeSession { let msg = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { + scope: scope.as_str(), group, actors: filtered, }, @@ -1019,7 +1022,7 @@ impl Actor for NodeSession { state.tcp_send_control(msg); } } - GroupChangeMessage::Leave(group, actors) => { + GroupChangeMessage::Leave(scope, group, actors) => { let filtered = actors .into_iter() .filter(|act| act.supports_remoting()) @@ -1032,6 +1035,7 @@ impl Actor for NodeSession { let msg = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgLeave( control_protocol::PgLeave { + scope: scope.as_str(), group, actors: filtered, }, diff --git a/ractor_cluster/src/node/node_session/tests.rs b/ractor_cluster/src/node/node_session/tests.rs index 6217e155..1cf4313e 100644 --- a/ractor_cluster/src/node/node_session/tests.rs +++ b/ractor_cluster/src/node/node_session/tests.rs @@ -11,6 +11,7 @@ use std::sync::{ }; use ractor::concurrency::sleep; +use ractor::pg::Scope; use crate::node::NodeConnectionMode; use crate::NodeSessionMessage; @@ -760,6 +761,7 @@ async fn node_session_handle_control() { .expect("Failed to process control message"); assert_eq!(0, state.remote_actors.len()); + let scope_name = Scope::Named(String::from("node_session_test_scope")); let group_name = "node_session_handle_control"; // check pg join spawns + joins to a pg group @@ -769,6 +771,7 @@ async fn node_session_handle_control() { control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { + scope: scope_name.as_str(), group: group_name.to_string(), actors: vec![control_protocol::Actor { name: None, @@ -798,6 +801,7 @@ async fn node_session_handle_control() { control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgLeave( control_protocol::PgLeave { + scope: scope_name.as_str(), group: group_name.to_string(), actors: vec![control_protocol::Actor { name: None, diff --git a/ractor_cluster/src/protocol/control.proto b/ractor_cluster/src/protocol/control.proto index eba32eee..89c058e9 100644 --- a/ractor_cluster/src/protocol/control.proto +++ b/ractor_cluster/src/protocol/control.proto @@ -52,18 +52,22 @@ message Terminate { // Process group join occurred message PgJoin { + // the scope + string scope = 1; // The group - string group = 1; + string group = 2; // The actors - repeated Actor actors = 2; + repeated Actor actors = 3; } // Process group leave occurred message PgLeave { + // the scope + string scope = 1; // The group - string group = 1; + string group = 2; // The actors - repeated Actor actors = 2; + repeated Actor actors = 3; } // A collection of NodeSession endpoints From b7b684364144b8ba35d3e31d95e10e763742a487 Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Wed, 11 Oct 2023 22:52:49 +0200 Subject: [PATCH 02/17] Fix comments --- ractor/src/pg/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index f45b3559..bbe1a8a2 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -220,7 +220,7 @@ pub(crate) fn leave_all(actor: ActorId) { /// Returns all the actors running on the local node in the group `group`. /// -/// * `group_name` - Either a statically named group or scope +/// * `group_name` - Either a statically named group /// /// Returns a [`Vec`] representing the members of this paging group pub fn get_local_members(group_name: &GroupName) -> Vec { @@ -265,7 +265,7 @@ pub fn which_groups() -> Vec { /// Subscribes the provided [crate::Actor] to the scope or group for updates /// -/// * `group_name` - The scope or group to monitor +/// * `group_name` - The group to monitor /// * `actor` - The [ActorCell] representing who will receive updates pub fn monitor(group_name: GroupName, actor: ActorCell) { let monitor = get_monitor(); @@ -292,7 +292,7 @@ pub fn demonitor(group_name: GroupName, actor: ActorId) { } } -/// Remove the specified [ActorId] from monitoring all groups it might be. +/// Remove the specified [ActorId] from monitoring all groups it might be in. /// Used only during actor shutdown pub(crate) fn demonitor_all(actor: ActorId) { let monitor = get_monitor(); From b23ddd76328dc9408969c8616a9824ee2798424b Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Wed, 11 Oct 2023 22:56:50 +0200 Subject: [PATCH 03/17] Outline the interface and add TODOs --- ractor/src/pg/mod.rs | 103 ++++++++++++++++++-- ractor_cluster/src/node/node_session/mod.rs | 1 + 2 files changed, 97 insertions(+), 7 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index bbe1a8a2..22b5a2e1 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -28,12 +28,16 @@ use once_cell::sync::OnceCell; use crate::{ActorCell, ActorId, GroupName, ScopeName, SupervisionEvent}; +// TODO: Check if this is still needed at the end /// Key to monitor all of the groups pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; #[cfg(test)] mod tests; +// TODO: Research if there is a need to explicitly start a `Scope` analogous +// to [Erlang's `pg` module](https://www.erlang.org/doc/man/pg.html). + /// Scopes are sets of process groups. Each group is in exactly one scope. /// A process may join any number of groups in any number of scopes. /// @@ -84,6 +88,7 @@ impl GroupChangeMessage { } } +// TODO: Add scopes to `PgState` struct PgState { map: Arc>>, listeners: Arc>>, @@ -91,6 +96,7 @@ struct PgState { static PG_MONITOR: OnceCell = OnceCell::new(); +// TODO: Add scopes to `get_monitor` fn get_monitor<'a>() -> &'a PgState { PG_MONITOR.get_or_init(|| PgState { map: Arc::new(DashMap::new()), @@ -98,7 +104,7 @@ fn get_monitor<'a>() -> &'a PgState { }) } -/// Join actors to the group `group` +/// Join actors to the group `group` in the default scope /// /// * `group` - The statically named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group @@ -138,7 +144,17 @@ pub fn join(group: GroupName, actors: Vec) { } } -/// Leaves the specified [crate::Actor]s from the PG group +/// Join actors to the group `group` within the scope `scope` +/// +/// * `scope` - the statically named scope. Will be created if first actors join +/// * `group` - The statically named group. Will be created if first actors to join +/// * `actors` - The list of [crate::Actor]s to add to the group +#[allow(unused_variables)] +pub fn join_named_scope(scope: &Scope, group: GroupName, actors: Vec) { + todo!(); +} + +/// Leaves the specified [crate::Actor]s from the PG group in the default scope /// /// * `group` - The statically named group /// * `actors` - The list of actors to remove from the group @@ -174,6 +190,17 @@ pub fn leave(group: GroupName, actors: Vec) { } } +/// Leaves the specified [crate::Actor]s from the PG group within the scope `scope` +/// +/// * `scope` - The statically named scope +/// * `group` - The statically named group +/// * `actors` - The list of actors to remove from the group +#[allow(unused_variables)] +pub fn leave_named_scope(scope: &Scope, group: GroupName, actors: Vec) { + todo!(); +} + +// TODO: Leave all groups in all scopes /// Leave all groups for a specific [ActorId]. /// Used only during actor shutdown pub(crate) fn leave_all(actor: ActorId) { @@ -218,7 +245,8 @@ pub(crate) fn leave_all(actor: ActorId) { } } -/// Returns all the actors running on the local node in the group `group`. +/// Returns all actors running on the local node in the group `group` +/// in the default scope. /// /// * `group_name` - Either a statically named group /// @@ -237,7 +265,20 @@ pub fn get_local_members(group_name: &GroupName) -> Vec { } } -/// Returns all the actors running on any node in the group `group`. +/// Returns all actors running on the local node in the group `group` +/// in scope `scope` +/// +/// * `scope_name` - A statically named scope +/// * `group_name` - Either a statically named group +/// +/// Returns a [`Vec`] representing the members of this paging group +#[allow(unused_variables)] +pub fn get_local_members_in_scope(scope: &Scope, group_name: &GroupName) -> Vec { + todo!(); +} + +/// Returns all the actors running on any node in the group `group` +/// in the default scope. /// /// * `group_name` - Either a statically named group or scope /// @@ -251,6 +292,17 @@ pub fn get_members(group_name: &GroupName) -> Vec { } } +/// Returns all the actors running on any node in the group `group` +/// in the scope `scope`. +/// +/// * `group_name` - Either a statically named group or scope +/// +/// Returns a [`Vec`] with the member actors +#[allow(unused_variables)] +pub fn get_members_in_scope(scope: &Scope, group_name: &GroupName) -> Vec { + todo!(); +} + /// Return a list of all known groups /// /// Returns a [`Vec`] representing all the registered group names @@ -263,7 +315,26 @@ pub fn which_groups() -> Vec { .collect::>() } -/// Subscribes the provided [crate::Actor] to the scope or group for updates +/// Returns a list of all known groups in scope `scope` +/// +/// * `scope` - The scope to retrieve the groups from +/// +/// Returns a [`Vec`] representing all the registered group names +/// in `scope` +#[allow(unused_variables)] +pub fn which_groups_in_scope(scope: &Scope) -> Vec { + todo!(); +} + +/// Returns a list of all known scopes +/// +/// Returns a [`Vec`] representing all the registered scopes +#[allow(unused_variables)] +pub fn which_scopes() -> Vec { + todo!(); +} + +/// Subscribes the provided [crate::Actor] to the group for updates /// /// * `group_name` - The group to monitor /// * `actor` - The [ActorCell] representing who will receive updates @@ -277,10 +348,19 @@ pub fn monitor(group_name: GroupName, actor: ActorCell) { } } -/// Unsubscribes the provided [crate::Actor] from the scope or group for updates +/// Subscribes the provided [crate::Actor] to the scope for updates /// -/// * `group_name` - The scope or group to monitor +/// * `scope` - the scope to monitor /// * `actor` - The [ActorCell] representing who will receive updates +#[allow(unused_variables)] +pub fn monitor_scope(scope: &Scope, actor: ActorCell) { + todo!(); +} + +/// Unsubscribes the provided [crate::Actor] from the group for updates +/// +/// * `group_name` - The group to demonitor +/// * `actor` - The [ActorCell] representing who will no longer receive updates pub fn demonitor(group_name: GroupName, actor: ActorId) { let monitor = get_monitor(); if let Occupied(mut entry) = monitor.listeners.entry(group_name) { @@ -292,6 +372,15 @@ pub fn demonitor(group_name: GroupName, actor: ActorId) { } } +/// Unsubscribes the provided [crate::Actor] from the scope for updates +/// +/// * `scope` - The scope to demonitor +/// * `actor` - The [ActorCell] representing who will no longer receive updates +#[allow(unused_variables)] +pub fn demonitor_scope(scope: &Scope, actor: ActorId) { + todo!(); +} + /// Remove the specified [ActorId] from monitoring all groups it might be in. /// Used only during actor shutdown pub(crate) fn demonitor_all(actor: ActorId) { diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 6fdc9c8c..886224f3 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -690,6 +690,7 @@ impl NodeSession { // Scan all PG groups + synchronize them let groups = ractor::pg::which_groups(); + // TODO: Add this for all scopes! let scope = Scope::Default; for group in groups { let local_members = ractor::pg::get_local_members(&group) From 650948b030a5769e167e0529e43a07fe381a6cab Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Fri, 13 Oct 2023 21:37:21 +0200 Subject: [PATCH 04/17] Outline of tests to write or change --- ractor/src/pg/tests.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 1c15321d..5ba1a264 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -52,6 +52,13 @@ async fn test_basic_group() { handle.await.expect("Actor cleanup failed"); } +#[crate::concurrency::test] +#[tracing_test::traced_test] +#[allow(unused_variables)] +async fn test_default_scope() { + todo!(); +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -89,6 +96,13 @@ async fn test_multiple_members_in_group() { } } +#[crate::concurrency::test] +#[tracing_test::traced_test] +#[allow(unused_variables)] +async fn test_multiple_members_in_scope() { + todo!(); +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -134,6 +148,13 @@ async fn test_multiple_groups() { } } +#[crate::concurrency::test] +#[tracing_test::traced_test] +#[allow(unused_variables)] +async fn test_multiple_scopes() { + todo!(); +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -159,6 +180,13 @@ async fn test_actor_leaves_pg_group_on_shutdown() { assert_eq!(0, members.len()); } +#[crate::concurrency::test] +#[tracing_test::traced_test] +#[allow(unused_variables)] +async fn test_actor_leaves_scope_on_shupdown() { + todo!(); +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -195,6 +223,13 @@ async fn test_actor_leaves_pg_group_manually() { handle.await.expect("Actor cleanup failed"); } +#[crate::concurrency::test] +#[tracing_test::traced_test] +#[allow(unused_variables)] +async fn test_actor_leaves_scope_manually() { + todo!(); +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -300,6 +335,7 @@ async fn test_pg_monitoring() { monitor_handle.await.expect("Actor cleanup failed"); } +//TODO: Add scopes #[named] #[cfg(feature = "cluster")] #[crate::concurrency::test] From 2f27fb396215ea8b54d9b4a543950586a76baaee Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Fri, 13 Oct 2023 23:00:58 +0200 Subject: [PATCH 05/17] Use Strings for `Scope` and add them to monitor --- ractor/src/pg/mod.rs | 140 +++++++++++------- ractor_cluster/src/node/node_session/mod.rs | 10 +- ractor_cluster/src/node/node_session/tests.rs | 7 +- 3 files changed, 91 insertions(+), 66 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 22b5a2e1..b099aed8 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -28,7 +28,10 @@ use once_cell::sync::OnceCell; use crate::{ActorCell, ActorId, GroupName, ScopeName, SupervisionEvent}; -// TODO: Check if this is still needed at the end +/// Key to set and monitor the default scope +pub const DEFAULT_SCOPE: &str = "__default__"; + +// TODO: Check how to handle a call to all groups vs to all in a scope. /// Key to monitor all of the groups pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; @@ -38,36 +41,13 @@ mod tests; // TODO: Research if there is a need to explicitly start a `Scope` analogous // to [Erlang's `pg` module](https://www.erlang.org/doc/man/pg.html). -/// Scopes are sets of process groups. Each group is in exactly one scope. -/// A process may join any number of groups in any number of scopes. -/// -/// Scopes are designed to decouple single mesh into a set of overlay networks, -/// reducing amount of traffic required to propagate group membership information. -#[derive(Clone)] -pub enum Scope { - /// The default scope - Default, - /// A named scope - Named(ScopeName), -} - -impl Scope { - /// Returns the scope's name as String. `Scope::Default` returns "Default". - pub fn as_str(&self) -> String { - match self { - Scope::Default => "Default".to_string(), - Scope::Named(scope_name) => scope_name.to_string(), - } - } -} - /// Represents a change in a process group's membership #[derive(Clone)] pub enum GroupChangeMessage { /// Some actors joined a group - Join(Scope, GroupName, Vec), + Join(ScopeName, GroupName, Vec), /// Some actors left a group - Leave(Scope, GroupName, Vec), + Leave(ScopeName, GroupName, Vec), } impl GroupChangeMessage { @@ -82,16 +62,16 @@ impl GroupChangeMessage { /// Retrieve the name of the scope in which the group change took place pub fn get_scope(&self) -> ScopeName { match self { - Self::Join(scope, _, _) => scope.as_str(), - Self::Leave(scope, _, _) => scope.as_str(), + Self::Join(scope, _, _) => scope.to_string(), + Self::Leave(scope, _, _) => scope.to_string(), } } } // TODO: Add scopes to `PgState` struct PgState { - map: Arc>>, - listeners: Arc>>, + map: Arc>>, + listeners: Arc>>, } static PG_MONITOR: OnceCell = OnceCell::new(); @@ -111,7 +91,10 @@ fn get_monitor<'a>() -> &'a PgState { pub fn join(group: GroupName, actors: Vec) { let monitor = get_monitor(); // insert into the monitor group - match monitor.map.entry(group.clone()) { + match monitor + .map + .entry((DEFAULT_SCOPE.to_owned().clone(), group.clone())) + { Occupied(mut occupied) => { let oref = occupied.get_mut(); for actor in actors.iter() { @@ -127,18 +110,24 @@ pub fn join(group: GroupName, actors: Vec) { } } // notify supervisors - if let Some(listeners) = monitor.listeners.get(&group) { + if let Some(listeners) = monitor + .listeners + .get(&(DEFAULT_SCOPE.to_owned(), group.clone())) + { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(Scope::Default, group.clone(), actors.clone()), + GroupChangeMessage::Join(DEFAULT_SCOPE.to_owned(), group.clone(), actors.clone()), )); } } // notify the world monitors - if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { + if let Some(listeners) = monitor + .listeners + .get(&(DEFAULT_SCOPE.to_owned(), ALL_GROUPS_NOTIFICATION.to_owned())) + { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(Scope::Default, group.clone(), actors.clone()), + GroupChangeMessage::Join(DEFAULT_SCOPE.to_owned(), group.clone(), actors.clone()), )); } } @@ -150,7 +139,7 @@ pub fn join(group: GroupName, actors: Vec) { /// * `group` - The statically named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group #[allow(unused_variables)] -pub fn join_named_scope(scope: &Scope, group: GroupName, actors: Vec) { +pub fn join_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { todo!(); } @@ -160,7 +149,7 @@ pub fn join_named_scope(scope: &Scope, group: GroupName, actors: Vec) /// * `actors` - The list of actors to remove from the group pub fn leave(group: GroupName, actors: Vec) { let monitor = get_monitor(); - match monitor.map.entry(group.clone()) { + match monitor.map.entry((DEFAULT_SCOPE.to_owned(), group.clone())) { Vacant(_) => {} Occupied(mut occupied) => { let mut_ref = occupied.get_mut(); @@ -171,18 +160,32 @@ pub fn leave(group: GroupName, actors: Vec) { if mut_ref.is_empty() { occupied.remove(); } - if let Some(listeners) = monitor.listeners.get(&group) { + if let Some(listeners) = monitor + .listeners + .get(&(DEFAULT_SCOPE.to_owned(), group.clone())) + { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(Scope::Default, group.clone(), actors.clone()), + GroupChangeMessage::Leave( + DEFAULT_SCOPE.to_owned(), + group.clone(), + actors.clone(), + ), )); } } // notify the world monitors - if let Some(listeners) = monitor.listeners.get(ALL_GROUPS_NOTIFICATION) { + if let Some(listeners) = monitor + .listeners + .get(&(DEFAULT_SCOPE.to_owned(), ALL_GROUPS_NOTIFICATION.to_owned())) + { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(Scope::Default, group.clone(), actors.clone()), + GroupChangeMessage::Leave( + DEFAULT_SCOPE.to_owned(), + group.clone(), + actors.clone(), + ), )); } } @@ -196,7 +199,7 @@ pub fn leave(group: GroupName, actors: Vec) { /// * `group` - The statically named group /// * `actors` - The list of actors to remove from the group #[allow(unused_variables)] -pub fn leave_named_scope(scope: &Scope, group: GroupName, actors: Vec) { +pub fn leave_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { todo!(); } @@ -221,19 +224,29 @@ pub(crate) fn leave_all(actor: ActorId) { // notify the listeners let all_listeners = pg_monitor.listeners.clone(); - for (group, cell) in removal_events.into_iter() { - if let Some(this_listeners) = all_listeners.get(&group) { + for ((scope_name, group_name), cell) in removal_events.into_iter() { + if let Some(this_listeners) = all_listeners.get(&(scope_name.clone(), group_name.clone())) { this_listeners.iter().for_each(|listener| { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(Scope::Default, group.clone(), vec![cell.clone()]), + GroupChangeMessage::Leave( + scope_name.clone(), + group_name.clone(), + vec![cell.clone()], + ), )); }); } // notify the world monitors - if let Some(listeners) = all_listeners.get(ALL_GROUPS_NOTIFICATION) { + if let Some(listeners) = + all_listeners.get(&(scope_name.clone(), ALL_GROUPS_NOTIFICATION.to_owned())) + { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave(Scope::Default, group.clone(), vec![cell.clone()]), + GroupChangeMessage::Leave( + scope_name.clone(), + group_name.clone(), + vec![cell.clone()], + ), )); } } @@ -253,7 +266,10 @@ pub(crate) fn leave_all(actor: ActorId) { /// Returns a [`Vec`] representing the members of this paging group pub fn get_local_members(group_name: &GroupName) -> Vec { let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(group_name) { + if let Some(actors) = monitor + .map + .get(&(DEFAULT_SCOPE.to_owned(), group_name.to_owned())) + { actors .value() .values() @@ -273,7 +289,7 @@ pub fn get_local_members(group_name: &GroupName) -> Vec { /// /// Returns a [`Vec`] representing the members of this paging group #[allow(unused_variables)] -pub fn get_local_members_in_scope(scope: &Scope, group_name: &GroupName) -> Vec { +pub fn get_local_members_in_scope(scope: ScopeName, group_name: &GroupName) -> Vec { todo!(); } @@ -285,7 +301,10 @@ pub fn get_local_members_in_scope(scope: &Scope, group_name: &GroupName) -> Vec< /// Returns a [`Vec`] with the member actors pub fn get_members(group_name: &GroupName) -> Vec { let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(group_name) { + if let Some(actors) = monitor + .map + .get(&(DEFAULT_SCOPE.to_owned(), group_name.to_owned())) + { actors.value().values().cloned().collect::>() } else { vec![] @@ -299,7 +318,7 @@ pub fn get_members(group_name: &GroupName) -> Vec { /// /// Returns a [`Vec`] with the member actors #[allow(unused_variables)] -pub fn get_members_in_scope(scope: &Scope, group_name: &GroupName) -> Vec { +pub fn get_members_in_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { todo!(); } @@ -312,6 +331,7 @@ pub fn which_groups() -> Vec { .map .iter() .map(|kvp| kvp.key().clone()) + .map(|(_scope, group)| group.clone()) .collect::>() } @@ -322,7 +342,7 @@ pub fn which_groups() -> Vec { /// Returns a [`Vec`] representing all the registered group names /// in `scope` #[allow(unused_variables)] -pub fn which_groups_in_scope(scope: &Scope) -> Vec { +pub fn which_groups_in_scope(scope: ScopeName) -> Vec { todo!(); } @@ -330,7 +350,7 @@ pub fn which_groups_in_scope(scope: &Scope) -> Vec { /// /// Returns a [`Vec`] representing all the registered scopes #[allow(unused_variables)] -pub fn which_scopes() -> Vec { +pub fn which_scopes() -> Vec { todo!(); } @@ -340,7 +360,10 @@ pub fn which_scopes() -> Vec { /// * `actor` - The [ActorCell] representing who will receive updates pub fn monitor(group_name: GroupName, actor: ActorCell) { let monitor = get_monitor(); - match monitor.listeners.entry(group_name) { + match monitor + .listeners + .entry((DEFAULT_SCOPE.to_owned(), group_name)) + { Occupied(mut occupied) => occupied.get_mut().push(actor), Vacant(vacancy) => { vacancy.insert(vec![actor]); @@ -353,7 +376,7 @@ pub fn monitor(group_name: GroupName, actor: ActorCell) { /// * `scope` - the scope to monitor /// * `actor` - The [ActorCell] representing who will receive updates #[allow(unused_variables)] -pub fn monitor_scope(scope: &Scope, actor: ActorCell) { +pub fn monitor_scope(scope: ScopeName, actor: ActorCell) { todo!(); } @@ -363,7 +386,10 @@ pub fn monitor_scope(scope: &Scope, actor: ActorCell) { /// * `actor` - The [ActorCell] representing who will no longer receive updates pub fn demonitor(group_name: GroupName, actor: ActorId) { let monitor = get_monitor(); - if let Occupied(mut entry) = monitor.listeners.entry(group_name) { + if let Occupied(mut entry) = monitor + .listeners + .entry((DEFAULT_SCOPE.to_owned(), group_name)) + { let mut_ref = entry.get_mut(); mut_ref.retain(|a| a.get_id() != actor); if mut_ref.is_empty() { @@ -377,7 +403,7 @@ pub fn demonitor(group_name: GroupName, actor: ActorId) { /// * `scope` - The scope to demonitor /// * `actor` - The [ActorCell] representing who will no longer receive updates #[allow(unused_variables)] -pub fn demonitor_scope(scope: &Scope, actor: ActorId) { +pub fn demonitor_scope(scope: ScopeName, actor: ActorId) { todo!(); } diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 886224f3..e6c0cf60 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -12,7 +12,7 @@ use std::convert::TryInto; use std::net::SocketAddr; use ractor::message::SerializedMessage; -use ractor::pg::{GroupChangeMessage, Scope}; +use ractor::pg::{GroupChangeMessage, DEFAULT_SCOPE}; use ractor::registry::PidLifecycleEvent; use ractor::rpc::CallResult; use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, SpawnErr, SupervisionEvent}; @@ -691,7 +691,7 @@ impl NodeSession { // Scan all PG groups + synchronize them let groups = ractor::pg::which_groups(); // TODO: Add this for all scopes! - let scope = Scope::Default; + let scope = DEFAULT_SCOPE; for group in groups { let local_members = ractor::pg::get_local_members(&group) .into_iter() @@ -705,7 +705,7 @@ impl NodeSession { let control_message = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { - scope: scope.as_str(), + scope: scope.to_owned(), group, actors: local_members, }, @@ -1014,7 +1014,7 @@ impl Actor for NodeSession { let msg = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { - scope: scope.as_str(), + scope, group, actors: filtered, }, @@ -1036,7 +1036,7 @@ impl Actor for NodeSession { let msg = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgLeave( control_protocol::PgLeave { - scope: scope.as_str(), + scope, group, actors: filtered, }, diff --git a/ractor_cluster/src/node/node_session/tests.rs b/ractor_cluster/src/node/node_session/tests.rs index 1cf4313e..011c7baa 100644 --- a/ractor_cluster/src/node/node_session/tests.rs +++ b/ractor_cluster/src/node/node_session/tests.rs @@ -11,7 +11,6 @@ use std::sync::{ }; use ractor::concurrency::sleep; -use ractor::pg::Scope; use crate::node::NodeConnectionMode; use crate::NodeSessionMessage; @@ -761,7 +760,7 @@ async fn node_session_handle_control() { .expect("Failed to process control message"); assert_eq!(0, state.remote_actors.len()); - let scope_name = Scope::Named(String::from("node_session_test_scope")); + let scope_name = "node_session_test_scope"; let group_name = "node_session_handle_control"; // check pg join spawns + joins to a pg group @@ -771,7 +770,7 @@ async fn node_session_handle_control() { control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { - scope: scope_name.as_str(), + scope: scope_name.to_string(), group: group_name.to_string(), actors: vec![control_protocol::Actor { name: None, @@ -801,7 +800,7 @@ async fn node_session_handle_control() { control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgLeave( control_protocol::PgLeave { - scope: scope_name.as_str(), + scope: scope_name.to_string(), group: group_name.to_string(), actors: vec![control_protocol::Actor { name: None, From acf39fc26ff1ed57a4c9a8de1b07df5cf36747ce Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sat, 14 Oct 2023 14:35:34 +0200 Subject: [PATCH 06/17] Reworked interfaces of tests to write and change --- ractor/src/pg/tests.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 5ba1a264..c98e2673 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -34,7 +34,7 @@ impl Actor for TestActor { #[named] #[crate::concurrency::test] #[tracing_test::traced_test] -async fn test_basic_group() { +async fn test_basic_group_in_default_scope() { let (actor, handle) = Actor::spawn(None, TestActor, ()) .await .expect("Failed to spawn test actor"); @@ -335,7 +335,13 @@ async fn test_pg_monitoring() { monitor_handle.await.expect("Actor cleanup failed"); } -//TODO: Add scopes +#[crate::concurrency::test] +#[tracing_test::traced_test] +#[allow(unused_variables)] +async fn test_scope_monitoring() { + todo!() +} + #[named] #[cfg(feature = "cluster")] #[crate::concurrency::test] @@ -405,3 +411,11 @@ async fn local_vs_remote_pg_members() { handle.await.expect("Actor cleanup failed"); } } + +#[cfg(feature = "cluster")] +#[crate::concurrency::test] +#[tracing_test::traced_test] +#[allow(unused_variables)] +async fn local_vs_remote_pg_members_in_named_scopes() { + todo!(); +} From d2168bf78f6c4e63ade266580c75fe709c695230 Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sat, 14 Oct 2023 14:36:21 +0200 Subject: [PATCH 07/17] Added and changed tests to define new pg interface --- ractor/src/pg/tests.rs | 189 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 176 insertions(+), 13 deletions(-) diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index c98e2673..46621478 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -52,11 +52,26 @@ async fn test_basic_group_in_default_scope() { handle.await.expect("Actor cleanup failed"); } +#[named] #[crate::concurrency::test] #[tracing_test::traced_test] -#[allow(unused_variables)] -async fn test_default_scope() { - todo!(); +async fn test_basic_group_in_named_scope() { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + // join the group + pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + + let members = pg::get_members_with_scope(&scope, &group); + assert_eq!(1, members.len()); + + // Cleanup + actor.stop(None); + handle.await.expect("Actor cleanup failed"); } #[named] @@ -96,11 +111,43 @@ async fn test_multiple_members_in_group() { } } +#[named] #[crate::concurrency::test] #[tracing_test::traced_test] -#[allow(unused_variables)] -async fn test_multiple_members_in_scope() { - todo!(); +async fn test_multiple_members_in_group_in_named_scope() { + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let mut actors = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor); + handles.push(handle); + } + + // join the group + pg::join_with_named_scope( + scope.clone(), + group.clone(), + actors + .iter() + .map(|aref| aref.clone().get_cell()) + .collect::>(), + ); + + let members = pg::get_members_with_scope(&scope, &group); + assert_eq!(10, members.len()); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } } #[named] @@ -148,11 +195,70 @@ async fn test_multiple_groups() { } } +#[named] #[crate::concurrency::test] #[tracing_test::traced_test] -#[allow(unused_variables)] -async fn test_multiple_scopes() { - todo!(); +async fn test_multiple_groups_in_multiple_scopes() { + let scope_a = concat!(function_name!(), "_b").to_string(); + let scope_b = concat!(function_name!(), "_b").to_string(); + + let group_a = concat!(function_name!(), "_a").to_string(); + let group_b = concat!(function_name!(), "_b").to_string(); + + let mut actors = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor); + handles.push(handle); + } + + // setup scope_a and scope_b, and group_a and group_b + let these_actors = actors[0..5] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_with_named_scope(scope_a.clone(), group_a.clone(), these_actors); + + let these_actors = actors[5..10] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_with_named_scope(scope_a.clone(), group_b.clone(), these_actors); + + let these_actors = actors[0..5] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_with_named_scope(scope_b.clone(), group_a.clone(), these_actors); + + let these_actors = actors[5..10] + .iter() + .map(|a| a.clone().get_cell()) + .collect::>(); + pg::join_with_named_scope(scope_b.clone(), group_b.clone(), these_actors); + + let members = pg::get_members_with_scope(&scope_a, &group_a); + assert_eq!(5, members.len()); + + let members = pg::get_members_with_scope(&scope_a, &group_b); + assert_eq!(5, members.len()); + + let members = pg::get_members_with_scope(&scope_b, &group_a); + assert_eq!(5, members.len()); + + let members = pg::get_members_with_scope(&scope_b, &group_b); + assert_eq!(5, members.len()); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } } #[named] @@ -180,11 +286,30 @@ async fn test_actor_leaves_pg_group_on_shutdown() { assert_eq!(0, members.len()); } +#[named] #[crate::concurrency::test] #[tracing_test::traced_test] -#[allow(unused_variables)] async fn test_actor_leaves_scope_on_shupdown() { - todo!(); + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + // join the scope and group + pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + + let members = pg::get_members_with_scope(&scope, &group); + assert_eq!(1, members.len()); + + // Cleanup + actor.stop(None); + handle.await.expect("Actor cleanup failed"); + drop(actor); + + let members = pg::get_members_with_scope(&scope, &group); + assert_eq!(0, members.len()); } #[named] @@ -223,11 +348,49 @@ async fn test_actor_leaves_pg_group_manually() { handle.await.expect("Actor cleanup failed"); } +#[named] #[crate::concurrency::test] #[tracing_test::traced_test] -#[allow(unused_variables)] async fn test_actor_leaves_scope_manually() { - todo!(); + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + + // join the group in scope (create on first use) + pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + + // the scope was created and is present + let scopes = pg::which_scopes(); + assert!(scopes.contains(&scope)); + + // the group was created and is present + let groups = pg::which_groups(); + assert!(groups.contains(&group)); + + let members = pg::get_members_with_scope(&scope, &group); + assert_eq!(1, members.len()); + + // leave the group + pg::leave_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + + // pif-paf-poof the scope is gone! + let scopes = pg::which_scopes(); + assert!(!scopes.contains(&scope)); + + // pif-paf-poof the group is gone! + let groups = pg::which_groups(); + assert!(!groups.contains(&group)); + + // members comes back empty + let members = pg::get_members_with_scope(&scope, &group); + assert_eq!(0, members.len()); + + // Cleanup + actor.stop(None); + handle.await.expect("Actor cleanup failed"); } #[named] From ec6fb90128dff10402cefc28e4e23867f8b4a3ba Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sat, 14 Oct 2023 14:37:18 +0200 Subject: [PATCH 08/17] Added logic to new interface --- ractor/src/pg/mod.rs | 175 ++++++++++++++++---- ractor_cluster/src/node/node_session/mod.rs | 12 +- 2 files changed, 150 insertions(+), 37 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index b099aed8..1189f3bf 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -38,9 +38,6 @@ pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; #[cfg(test)] mod tests; -// TODO: Research if there is a need to explicitly start a `Scope` analogous -// to [Erlang's `pg` module](https://www.erlang.org/doc/man/pg.html). - /// Represents a change in a process group's membership #[derive(Clone)] pub enum GroupChangeMessage { @@ -68,7 +65,6 @@ impl GroupChangeMessage { } } -// TODO: Add scopes to `PgState` struct PgState { map: Arc>>, listeners: Arc>>, @@ -76,7 +72,6 @@ struct PgState { static PG_MONITOR: OnceCell = OnceCell::new(); -// TODO: Add scopes to `get_monitor` fn get_monitor<'a>() -> &'a PgState { PG_MONITOR.get_or_init(|| PgState { map: Arc::new(DashMap::new()), @@ -138,9 +133,43 @@ pub fn join(group: GroupName, actors: Vec) { /// * `scope` - the statically named scope. Will be created if first actors join /// * `group` - The statically named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group -#[allow(unused_variables)] -pub fn join_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { - todo!(); +pub fn join_with_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { + let monitor = get_monitor(); + // insert into the monitor group + match monitor.map.entry((scope.to_owned().clone(), group.clone())) { + Occupied(mut occupied) => { + let oref = occupied.get_mut(); + for actor in actors.iter() { + oref.insert(actor.get_id(), actor.clone()); + } + } + Vacant(vacancy) => { + let map = actors + .iter() + .map(|a| (a.get_id(), a.clone())) + .collect::>(); + vacancy.insert(map); + } + } + // notify supervisors + if let Some(listeners) = monitor.listeners.get(&(scope.to_owned(), group.clone())) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Join(scope.to_owned(), group.clone(), actors.clone()), + )); + } + } + // notify the world monitors + if let Some(listeners) = monitor + .listeners + .get(&(scope.to_owned(), ALL_GROUPS_NOTIFICATION.to_owned())) + { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Join(scope.to_owned(), group.clone(), actors.clone()), + )); + } + } } /// Leaves the specified [crate::Actor]s from the PG group in the default scope @@ -198,12 +227,42 @@ pub fn leave(group: GroupName, actors: Vec) { /// * `scope` - The statically named scope /// * `group` - The statically named group /// * `actors` - The list of actors to remove from the group -#[allow(unused_variables)] -pub fn leave_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { - todo!(); +pub fn leave_with_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { + let monitor = get_monitor(); + match monitor.map.entry((scope.to_owned(), group.clone())) { + Vacant(_) => {} + Occupied(mut occupied) => { + let mut_ref = occupied.get_mut(); + for actor in actors.iter() { + mut_ref.remove(&actor.get_id()); + } + + // the scope and group tuple is empty, remove it + if mut_ref.is_empty() { + occupied.remove(); + } + if let Some(listeners) = monitor.listeners.get(&(scope.to_owned(), group.clone())) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Leave(scope.to_owned(), group.clone(), actors.clone()), + )); + } + } + // notify the world monitors + if let Some(listeners) = monitor + .listeners + .get(&(scope.to_owned(), ALL_GROUPS_NOTIFICATION.to_owned())) + { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Leave(scope.to_owned(), group.clone(), actors.clone()), + )); + } + } + } + } } -// TODO: Leave all groups in all scopes /// Leave all groups for a specific [ActorId]. /// Used only during actor shutdown pub(crate) fn leave_all(actor: ActorId) { @@ -288,9 +347,18 @@ pub fn get_local_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group /// /// Returns a [`Vec`] representing the members of this paging group -#[allow(unused_variables)] -pub fn get_local_members_in_scope(scope: ScopeName, group_name: &GroupName) -> Vec { - todo!(); +pub fn get_local_members_with_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { + let monitor = get_monitor(); + if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { + actors + .value() + .values() + .filter(|a| a.get_id().is_local()) + .cloned() + .collect::>() + } else { + vec![] + } } /// Returns all the actors running on any node in the group `group` @@ -317,9 +385,13 @@ pub fn get_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group or scope /// /// Returns a [`Vec`] with the member actors -#[allow(unused_variables)] -pub fn get_members_in_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { - todo!(); +pub fn get_members_with_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { + let monitor = get_monitor(); + if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { + actors.value().values().cloned().collect::>() + } else { + vec![] + } } /// Return a list of all known groups @@ -341,20 +413,45 @@ pub fn which_groups() -> Vec { /// /// Returns a [`Vec`] representing all the registered group names /// in `scope` -#[allow(unused_variables)] -pub fn which_groups_in_scope(scope: ScopeName) -> Vec { - todo!(); +pub fn which_groups_in_named_scope(scope: &ScopeName) -> Vec { + let monitor = get_monitor(); + monitor + .map + .iter() + .map(|kvp| kvp.key().clone()) + .filter(|(scope_name, _group)| scope == scope_name) + .map(|(_scope_name, group)| group.clone()) + .collect::>() +} + +/// Returns a list of all known scope-group combinations. +/// +/// Returns a [`Vec<(ScopeName,GroupName)>`] representing all the registered +/// combinations that form an identifying tuple +pub fn which_scopes_and_groups() -> Vec<(ScopeName, GroupName)> { + let monitor = get_monitor(); + monitor + .map + .iter() + .map(|kvp| kvp.key().clone()) + .collect::>() } /// Returns a list of all known scopes /// -/// Returns a [`Vec`] representing all the registered scopes -#[allow(unused_variables)] +/// Returns a [`Vec`] representing all the registered scopes pub fn which_scopes() -> Vec { - todo!(); + let monitor = get_monitor(); + monitor + .map + .iter() + .map(|kvp| kvp.key().clone()) + .map(|(scope, _group)| scope.clone()) + .collect::>() } -/// Subscribes the provided [crate::Actor] to the group for updates +/// Subscribes the provided [crate::Actor] to the group in the default scope +/// for updates /// /// * `group_name` - The group to monitor /// * `actor` - The [ActorCell] representing who will receive updates @@ -375,12 +472,21 @@ pub fn monitor(group_name: GroupName, actor: ActorCell) { /// /// * `scope` - the scope to monitor /// * `actor` - The [ActorCell] representing who will receive updates -#[allow(unused_variables)] pub fn monitor_scope(scope: ScopeName, actor: ActorCell) { - todo!(); + let monitor = get_monitor(); + let groups_in_scope = which_groups_in_named_scope(&scope); + for group in groups_in_scope { + match monitor.listeners.entry((scope.to_owned(), group)) { + Occupied(mut occupied) => occupied.get_mut().push(actor.clone()), + Vacant(vacancy) => { + vacancy.insert(vec![actor.clone()]); + } + } + } } -/// Unsubscribes the provided [crate::Actor] from the group for updates +/// Unsubscribes the provided [crate::Actor] for updates from the group +/// in default scope /// /// * `group_name` - The group to demonitor /// * `actor` - The [ActorCell] representing who will no longer receive updates @@ -402,9 +508,18 @@ pub fn demonitor(group_name: GroupName, actor: ActorId) { /// /// * `scope` - The scope to demonitor /// * `actor` - The [ActorCell] representing who will no longer receive updates -#[allow(unused_variables)] pub fn demonitor_scope(scope: ScopeName, actor: ActorId) { - todo!(); + let monitor = get_monitor(); + let groups_in_scope = which_groups_in_named_scope(&scope); + for group in groups_in_scope { + if let Occupied(mut entry) = monitor.listeners.entry((scope.to_owned(), group)) { + let mut_ref = entry.get_mut(); + mut_ref.retain(|a| a.get_id() != actor); + if mut_ref.is_empty() { + entry.remove(); + } + } + } } /// Remove the specified [ActorId] from monitoring all groups it might be in. diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index e6c0cf60..34c2f9fa 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -12,7 +12,7 @@ use std::convert::TryInto; use std::net::SocketAddr; use ractor::message::SerializedMessage; -use ractor::pg::{GroupChangeMessage, DEFAULT_SCOPE}; +use ractor::pg::{which_scopes_and_groups, GroupChangeMessage}; use ractor::registry::PidLifecycleEvent; use ractor::rpc::CallResult; use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, SpawnErr, SupervisionEvent}; @@ -688,12 +688,10 @@ impl NodeSession { myself.get_cell(), ); - // Scan all PG groups + synchronize them - let groups = ractor::pg::which_groups(); - // TODO: Add this for all scopes! - let scope = DEFAULT_SCOPE; - for group in groups { - let local_members = ractor::pg::get_local_members(&group) + // Scan all scopes with their PG groups + synchronize them + let scopes_and_groups = which_scopes_and_groups(); + for (scope, group) in scopes_and_groups { + let local_members = ractor::pg::get_local_members_with_scope(&scope, &group) .into_iter() .filter(|v| v.supports_remoting()) .map(|act| control_protocol::Actor { From a353e225d87a4b4eef5e538e20f86fafc8d9c78f Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sun, 15 Oct 2023 00:42:28 +0200 Subject: [PATCH 09/17] Rework global monitoring --- ractor/src/pg/mod.rs | 100 ++++++++++++++------ ractor_cluster/src/node/node_session/mod.rs | 9 ++ 2 files changed, 80 insertions(+), 29 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 1189f3bf..df55e483 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -28,12 +28,14 @@ use once_cell::sync::OnceCell; use crate::{ActorCell, ActorId, GroupName, ScopeName, SupervisionEvent}; -/// Key to set and monitor the default scope -pub const DEFAULT_SCOPE: &str = "__default__"; +/// Key to set the default scope +pub const DEFAULT_SCOPE: &str = "__default_scope__"; -// TODO: Check how to handle a call to all groups vs to all in a scope. -/// Key to monitor all of the groups -pub const ALL_GROUPS_NOTIFICATION: &str = "__world__"; +/// Key to monitor all of the scopes +pub const ALL_SCOPES_NOTIFICATION: &str = "__world_scope__"; + +/// Key to monitor all of the groups in a scope +pub const ALL_GROUPS_NOTIFICATION: &str = "__world_group_"; #[cfg(test)] mod tests; @@ -115,15 +117,20 @@ pub fn join(group: GroupName, actors: Vec) { )); } } + // notify the world monitors - if let Some(listeners) = monitor - .listeners - .get(&(DEFAULT_SCOPE.to_owned(), ALL_GROUPS_NOTIFICATION.to_owned())) - { - for listener in listeners.value() { - let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(DEFAULT_SCOPE.to_owned(), group.clone(), actors.clone()), - )); + let world_monitor_keys = get_world_monitor_keys(); + for key in world_monitor_keys { + if let Some(listeners) = monitor.listeners.get(&key) { + for listener in listeners.value() { + let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( + GroupChangeMessage::Join( + DEFAULT_SCOPE.to_owned(), + group.clone(), + actors.clone(), + ), + )); + } } } } @@ -160,14 +167,14 @@ pub fn join_with_named_scope(scope: ScopeName, group: GroupName, actors: Vec occupied.get_mut().push(actor.clone()), + Vacant(vacancy) => { + vacancy.insert(vec![actor.clone()]); + } + } + let groups_in_scope = which_groups_in_named_scope(&scope); for group in groups_in_scope { match monitor.listeners.entry((scope.to_owned(), group)) { @@ -541,3 +564,22 @@ pub(crate) fn demonitor_all(actor: ActorId) { monitor.listeners.remove(&empty_group); } } + +/// Gets the keys for the world monitors. +/// +/// Returns a `Vec` represending all registered tuples +/// for which ane of the values is equivalent to one of the world_monitor_keys +fn get_world_monitor_keys() -> Vec<(ScopeName, GroupName)> { + let monitor = get_monitor(); + let mut world_monitor_keys = monitor + .listeners + .iter() + .map(|kvp| kvp.key().clone()) + .filter(|(scope_name, group_name)| { + scope_name == ALL_SCOPES_NOTIFICATION || group_name == ALL_GROUPS_NOTIFICATION + }) + .collect::>(); + world_monitor_keys.sort_unstable(); + world_monitor_keys.dedup(); + world_monitor_keys +} diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 34c2f9fa..8a32a7a9 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -682,6 +682,11 @@ impl NodeSession { state.tcp_send_control(msg); } + // setup scope monitoring + ractor::pg::monitor_scope( + ractor::pg::ALL_SCOPES_NOTIFICATION.to_string(), + myself.get_cell(), + ); // setup PG monitoring ractor::pg::monitor( ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(), @@ -874,6 +879,10 @@ impl Actor for NodeSession { _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { // unhook monitoring sessions + ractor::pg::demonitor_scope( + ractor::pg::ALL_SCOPES_NOTIFICATION.to_string(), + myself.get_id(), + ); ractor::pg::demonitor( ractor::pg::ALL_GROUPS_NOTIFICATION.to_string(), myself.get_id(), From 81d546f9a46c417158e101a12ac120e6bcc9e1a2 Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sun, 15 Oct 2023 00:46:43 +0200 Subject: [PATCH 10/17] Add missing tests --- ractor/src/pg/tests.rs | 247 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 241 insertions(+), 6 deletions(-) diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 46621478..e018524d 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -10,9 +10,9 @@ use crate::common_test::periodic_check; use crate::concurrency::Duration; use ::function_name::named; -use crate::{Actor, ActorProcessingErr, GroupName, SupervisionEvent}; +use crate::{Actor, ActorProcessingErr, GroupName, ScopeName, SupervisionEvent}; -use crate::pg; +use crate::pg::{self}; struct TestActor; @@ -150,6 +150,45 @@ async fn test_multiple_members_in_group_in_named_scope() { } } +#[named] +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_which_groups_in_named_scope() { + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let mut actors = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor); + handles.push(handle); + } + + // join the group + pg::join_with_named_scope( + scope.clone(), + group.clone(), + actors + .iter() + .map(|aref| aref.clone().get_cell()) + .collect::>(), + ); + + let groups_in_scope = pg::which_groups_in_named_scope(&scope); + assert_eq!(vec![scope.clone()], groups_in_scope); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } +} + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -498,11 +537,144 @@ async fn test_pg_monitoring() { monitor_handle.await.expect("Actor cleanup failed"); } +#[named] #[crate::concurrency::test] #[tracing_test::traced_test] -#[allow(unused_variables)] async fn test_scope_monitoring() { - todo!() + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + let counter = Arc::new(AtomicU8::new(0u8)); + + struct AutoJoinActor { + scope: ScopeName, + pg_group: GroupName, + } + + #[async_trait::async_trait] + impl Actor for AutoJoinActor { + type Msg = (); + type Arguments = (); + type State = (); + + async fn pre_start( + &self, + myself: crate::ActorRef, + _: (), + ) -> Result { + pg::join_with_named_scope( + self.scope.clone(), + self.pg_group.clone(), + vec![myself.into()], + ); + Ok(()) + } + } + + struct NotificationMonitor { + scope: ScopeName, + counter: Arc, + } + + #[async_trait::async_trait] + impl Actor for NotificationMonitor { + type Msg = (); + type Arguments = (); + type State = (); + + async fn pre_start( + &self, + myself: crate::ActorRef, + _: (), + ) -> Result { + pg::monitor_scope(self.scope.clone(), myself.into()); + Ok(()) + } + + async fn handle_supervisor_evt( + &self, + _myself: crate::ActorRef, + message: SupervisionEvent, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + if let SupervisionEvent::ProcessGroupChanged(change) = message { + match change { + pg::GroupChangeMessage::Join(scope_name, _which, who) => { + // ensure this test can run concurrently to others + if scope_name == function_name!() { + self.counter.fetch_add(who.len() as u8, Ordering::Relaxed); + } + } + pg::GroupChangeMessage::Leave(scope_name, _which, who) => { + // ensure this test can run concurrently to others + if scope_name == function_name!() { + self.counter.fetch_sub(who.len() as u8, Ordering::Relaxed); + } + } + } + } + Ok(()) + } + } + + let (monitor_actor, monitor_handle) = Actor::spawn( + None, + NotificationMonitor { + scope: scope.clone(), + counter: counter.clone(), + }, + (), + ) + .await + .expect("Failed to start monitor actor"); + + // this actor's startup should notify the "monitor" for scope changes + let (test_actor, test_handle) = Actor::spawn( + None, + AutoJoinActor { + scope: scope.clone(), + pg_group: group.clone(), + }, + (), + ) + .await + .expect("Failed to start test actor"); + + // start a second actor in the same scope to test if we multiply messages exponentially + let (test_actor1, test_handle1) = Actor::spawn( + None, + AutoJoinActor { + scope: scope.clone(), + pg_group: group.clone(), + }, + (), + ) + .await + .expect("Failed to start test actor"); + + // the monitor is notified async, so we need to wait a bit + periodic_check( + || counter.load(Ordering::Relaxed) == 2, + Duration::from_secs(5), + ) + .await; + + // kill the scope members + test_actor.stop(None); + test_handle.await.expect("Actor cleanup failed"); + test_actor1.stop(None); + test_handle1.await.expect("Actor cleanup failed"); + + // it should have notified that it's unsubscribed + periodic_check( + || counter.load(Ordering::Relaxed) == 0, + Duration::from_secs(5), + ) + .await; + + // cleanup + monitor_actor.stop(None); + monitor_handle.await.expect("Actor cleanup failed"); } #[named] @@ -575,10 +747,73 @@ async fn local_vs_remote_pg_members() { } } +#[named] #[cfg(feature = "cluster")] #[crate::concurrency::test] #[tracing_test::traced_test] -#[allow(unused_variables)] async fn local_vs_remote_pg_members_in_named_scopes() { - todo!(); + use crate::ActorRuntime; + + let scope = function_name!().to_string(); + let group = function_name!().to_string(); + + struct TestRemoteActor; + struct TestRemoteActorMessage; + impl crate::Message for TestRemoteActorMessage {} + #[async_trait::async_trait] + impl Actor for TestRemoteActor { + type Msg = TestRemoteActorMessage; + type State = (); + type Arguments = (); + async fn pre_start( + &self, + _this_actor: crate::ActorRef, + _: (), + ) -> Result { + Ok(()) + } + } + + let remote_pid = crate::ActorId::Remote { node_id: 1, pid: 1 }; + + let mut actors: Vec = vec![]; + let mut handles = vec![]; + for _ in 0..10 { + let (actor, handle) = Actor::spawn(None, TestActor, ()) + .await + .expect("Failed to spawn test actor"); + actors.push(actor.into()); + handles.push(handle); + } + let (actor, handle) = ActorRuntime::spawn_linked_remote( + None, + TestRemoteActor, + remote_pid, + (), + actors.first().unwrap().clone(), + ) + .await + .expect("Failed to spawn remote actor"); + println!("Spawned {}", actor.get_id()); + + actors.push(actor.into()); + handles.push(handle); + + // join the group in scope + pg::join_with_named_scope(scope.clone(), group.clone(), actors.to_vec()); + + // assert + let members = pg::get_local_members_with_scope(&scope, &group); + assert_eq!(10, members.len()); + + let members = pg::get_members_with_scope(&scope, &group); + assert_eq!(11, members.len()); + + // Cleanup + for actor in actors { + actor.stop(None); + } + for handle in handles.into_iter() { + handle.await.expect("Actor cleanup failed"); + } } From 9ce257780d390e7ef8b3a8976f8b489e62547f9a Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sun, 15 Oct 2023 01:11:12 +0200 Subject: [PATCH 11/17] Renamed functions for more consistent interface --- ractor/src/pg/mod.rs | 7 ++++-- ractor/src/pg/tests.rs | 24 ++++++++++----------- ractor_cluster/src/node/node_session/mod.rs | 2 +- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index df55e483..3d585c90 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -358,7 +358,10 @@ pub fn get_local_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group /// /// Returns a [`Vec`] representing the members of this paging group -pub fn get_local_members_with_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { +pub fn get_local_members_with_named_scope( + scope: &ScopeName, + group_name: &GroupName, +) -> Vec { let monitor = get_monitor(); if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { actors @@ -396,7 +399,7 @@ pub fn get_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group or scope /// /// Returns a [`Vec`] with the member actors -pub fn get_members_with_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { +pub fn get_members_with_named_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { let monitor = get_monitor(); if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { actors.value().values().cloned().collect::>() diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index e018524d..427d2927 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -66,7 +66,7 @@ async fn test_basic_group_in_named_scope() { // join the group pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); - let members = pg::get_members_with_scope(&scope, &group); + let members = pg::get_members_with_named_scope(&scope, &group); assert_eq!(1, members.len()); // Cleanup @@ -138,7 +138,7 @@ async fn test_multiple_members_in_group_in_named_scope() { .collect::>(), ); - let members = pg::get_members_with_scope(&scope, &group); + let members = pg::get_members_with_named_scope(&scope, &group); assert_eq!(10, members.len()); // Cleanup @@ -279,16 +279,16 @@ async fn test_multiple_groups_in_multiple_scopes() { .collect::>(); pg::join_with_named_scope(scope_b.clone(), group_b.clone(), these_actors); - let members = pg::get_members_with_scope(&scope_a, &group_a); + let members = pg::get_members_with_named_scope(&scope_a, &group_a); assert_eq!(5, members.len()); - let members = pg::get_members_with_scope(&scope_a, &group_b); + let members = pg::get_members_with_named_scope(&scope_a, &group_b); assert_eq!(5, members.len()); - let members = pg::get_members_with_scope(&scope_b, &group_a); + let members = pg::get_members_with_named_scope(&scope_b, &group_a); assert_eq!(5, members.len()); - let members = pg::get_members_with_scope(&scope_b, &group_b); + let members = pg::get_members_with_named_scope(&scope_b, &group_b); assert_eq!(5, members.len()); // Cleanup @@ -339,7 +339,7 @@ async fn test_actor_leaves_scope_on_shupdown() { // join the scope and group pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); - let members = pg::get_members_with_scope(&scope, &group); + let members = pg::get_members_with_named_scope(&scope, &group); assert_eq!(1, members.len()); // Cleanup @@ -347,7 +347,7 @@ async fn test_actor_leaves_scope_on_shupdown() { handle.await.expect("Actor cleanup failed"); drop(actor); - let members = pg::get_members_with_scope(&scope, &group); + let members = pg::get_members_with_named_scope(&scope, &group); assert_eq!(0, members.len()); } @@ -409,7 +409,7 @@ async fn test_actor_leaves_scope_manually() { let groups = pg::which_groups(); assert!(groups.contains(&group)); - let members = pg::get_members_with_scope(&scope, &group); + let members = pg::get_members_with_named_scope(&scope, &group); assert_eq!(1, members.len()); // leave the group @@ -424,7 +424,7 @@ async fn test_actor_leaves_scope_manually() { assert!(!groups.contains(&group)); // members comes back empty - let members = pg::get_members_with_scope(&scope, &group); + let members = pg::get_members_with_named_scope(&scope, &group); assert_eq!(0, members.len()); // Cleanup @@ -803,10 +803,10 @@ async fn local_vs_remote_pg_members_in_named_scopes() { pg::join_with_named_scope(scope.clone(), group.clone(), actors.to_vec()); // assert - let members = pg::get_local_members_with_scope(&scope, &group); + let members = pg::get_local_members_with_named_scope(&scope, &group); assert_eq!(10, members.len()); - let members = pg::get_members_with_scope(&scope, &group); + let members = pg::get_members_with_named_scope(&scope, &group); assert_eq!(11, members.len()); // Cleanup diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 8a32a7a9..f6fad004 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -696,7 +696,7 @@ impl NodeSession { // Scan all scopes with their PG groups + synchronize them let scopes_and_groups = which_scopes_and_groups(); for (scope, group) in scopes_and_groups { - let local_members = ractor::pg::get_local_members_with_scope(&scope, &group) + let local_members = ractor::pg::get_local_members_with_named_scope(&scope, &group) .into_iter() .filter(|v| v.supports_remoting()) .map(|act| control_protocol::Actor { From 149d870bb76574edb3ece24702fe78cd4681d25d Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Thu, 19 Oct 2023 20:07:07 +0200 Subject: [PATCH 12/17] Rename functions and deduplicate code Fix CI tests --- ractor/src/pg/mod.rs | 134 ++------------------ ractor/src/pg/tests.rs | 92 ++++++++++---- ractor_cluster/src/node/node_session/mod.rs | 4 +- 3 files changed, 78 insertions(+), 152 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 3d585c90..a0bf6e13 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -86,53 +86,7 @@ fn get_monitor<'a>() -> &'a PgState { /// * `group` - The statically named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group pub fn join(group: GroupName, actors: Vec) { - let monitor = get_monitor(); - // insert into the monitor group - match monitor - .map - .entry((DEFAULT_SCOPE.to_owned().clone(), group.clone())) - { - Occupied(mut occupied) => { - let oref = occupied.get_mut(); - for actor in actors.iter() { - oref.insert(actor.get_id(), actor.clone()); - } - } - Vacant(vacancy) => { - let map = actors - .iter() - .map(|a| (a.get_id(), a.clone())) - .collect::>(); - vacancy.insert(map); - } - } - // notify supervisors - if let Some(listeners) = monitor - .listeners - .get(&(DEFAULT_SCOPE.to_owned(), group.clone())) - { - for listener in listeners.value() { - let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join(DEFAULT_SCOPE.to_owned(), group.clone(), actors.clone()), - )); - } - } - - // notify the world monitors - let world_monitor_keys = get_world_monitor_keys(); - for key in world_monitor_keys { - if let Some(listeners) = monitor.listeners.get(&key) { - for listener in listeners.value() { - let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Join( - DEFAULT_SCOPE.to_owned(), - group.clone(), - actors.clone(), - ), - )); - } - } - } + join_scoped(DEFAULT_SCOPE.to_owned(), group, actors); } /// Join actors to the group `group` within the scope `scope` @@ -140,7 +94,7 @@ pub fn join(group: GroupName, actors: Vec) { /// * `scope` - the statically named scope. Will be created if first actors join /// * `group` - The statically named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group -pub fn join_with_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { +pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec) { let monitor = get_monitor(); // insert into the monitor group match monitor.map.entry((scope.to_owned().clone(), group.clone())) { @@ -184,49 +138,7 @@ pub fn join_with_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { - let monitor = get_monitor(); - match monitor.map.entry((DEFAULT_SCOPE.to_owned(), group.clone())) { - Vacant(_) => {} - Occupied(mut occupied) => { - let mut_ref = occupied.get_mut(); - for actor in actors.iter() { - mut_ref.remove(&actor.get_id()); - } - // the group is empty, remove it - if mut_ref.is_empty() { - occupied.remove(); - } - if let Some(listeners) = monitor - .listeners - .get(&(DEFAULT_SCOPE.to_owned(), group.clone())) - { - for listener in listeners.value() { - let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave( - DEFAULT_SCOPE.to_owned(), - group.clone(), - actors.clone(), - ), - )); - } - } - // notify the world monitors - if let Some(listeners) = monitor - .listeners - .get(&(DEFAULT_SCOPE.to_owned(), ALL_GROUPS_NOTIFICATION.to_owned())) - { - for listener in listeners.value() { - let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( - GroupChangeMessage::Leave( - DEFAULT_SCOPE.to_owned(), - group.clone(), - actors.clone(), - ), - )); - } - } - } - } + leave_scoped(DEFAULT_SCOPE.to_owned(), group, actors); } /// Leaves the specified [crate::Actor]s from the PG group within the scope `scope` @@ -234,7 +146,7 @@ pub fn leave(group: GroupName, actors: Vec) { /// * `scope` - The statically named scope /// * `group` - The statically named group /// * `actors` - The list of actors to remove from the group -pub fn leave_with_named_scope(scope: ScopeName, group: GroupName, actors: Vec) { +pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec) { let monitor = get_monitor(); match monitor.map.entry((scope.to_owned(), group.clone())) { Vacant(_) => {} @@ -335,20 +247,7 @@ pub(crate) fn leave_all(actor: ActorId) { /// /// Returns a [`Vec`] representing the members of this paging group pub fn get_local_members(group_name: &GroupName) -> Vec { - let monitor = get_monitor(); - if let Some(actors) = monitor - .map - .get(&(DEFAULT_SCOPE.to_owned(), group_name.to_owned())) - { - actors - .value() - .values() - .filter(|a| a.get_id().is_local()) - .cloned() - .collect::>() - } else { - vec![] - } + get_scoped_local_members(&DEFAULT_SCOPE.to_owned(), group_name) } /// Returns all actors running on the local node in the group `group` @@ -358,10 +257,7 @@ pub fn get_local_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group /// /// Returns a [`Vec`] representing the members of this paging group -pub fn get_local_members_with_named_scope( - scope: &ScopeName, - group_name: &GroupName, -) -> Vec { +pub fn get_scoped_local_members(scope: &ScopeName, group_name: &GroupName) -> Vec { let monitor = get_monitor(); if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { actors @@ -382,15 +278,7 @@ pub fn get_local_members_with_named_scope( /// /// Returns a [`Vec`] with the member actors pub fn get_members(group_name: &GroupName) -> Vec { - let monitor = get_monitor(); - if let Some(actors) = monitor - .map - .get(&(DEFAULT_SCOPE.to_owned(), group_name.to_owned())) - { - actors.value().values().cloned().collect::>() - } else { - vec![] - } + get_scoped_members(&DEFAULT_SCOPE.to_owned(), group_name) } /// Returns all the actors running on any node in the group `group` @@ -399,7 +287,7 @@ pub fn get_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group or scope /// /// Returns a [`Vec`] with the member actors -pub fn get_members_with_named_scope(scope: &ScopeName, group_name: &GroupName) -> Vec { +pub fn get_scoped_members(scope: &ScopeName, group_name: &GroupName) -> Vec { let monitor = get_monitor(); if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { actors.value().values().cloned().collect::>() @@ -427,7 +315,7 @@ pub fn which_groups() -> Vec { /// /// Returns a [`Vec`] representing all the registered group names /// in `scope` -pub fn which_groups_in_named_scope(scope: &ScopeName) -> Vec { +pub fn which_scoped_groups(scope: &ScopeName) -> Vec { let monitor = get_monitor(); monitor .map @@ -500,7 +388,7 @@ pub fn monitor_scope(scope: ScopeName, actor: ActorCell) { } } - let groups_in_scope = which_groups_in_named_scope(&scope); + let groups_in_scope = which_scoped_groups(&scope); for group in groups_in_scope { match monitor.listeners.entry((scope.to_owned(), group)) { Occupied(mut occupied) => occupied.get_mut().push(actor.clone()), @@ -536,7 +424,7 @@ pub fn demonitor(group_name: GroupName, actor: ActorId) { /// * `actor` - The [ActorCell] representing who will no longer receive updates pub fn demonitor_scope(scope: ScopeName, actor: ActorId) { let monitor = get_monitor(); - let groups_in_scope = which_groups_in_named_scope(&scope); + let groups_in_scope = which_scoped_groups(&scope); for group in groups_in_scope { if let Occupied(mut entry) = monitor.listeners.entry((scope.to_owned(), group)) { let mut_ref = entry.get_mut(); diff --git a/ractor/src/pg/tests.rs b/ractor/src/pg/tests.rs index 427d2927..6d50a40a 100644 --- a/ractor/src/pg/tests.rs +++ b/ractor/src/pg/tests.rs @@ -64,9 +64,9 @@ async fn test_basic_group_in_named_scope() { let group = function_name!().to_string(); // join the group - pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); - let members = pg::get_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_members(&scope, &group); assert_eq!(1, members.len()); // Cleanup @@ -74,6 +74,44 @@ async fn test_basic_group_in_named_scope() { handle.await.expect("Actor cleanup failed"); } +// #[named] +// #[crate::concurrency::test] +// #[tracing_test::traced_test] +// async fn test_which_scopes_and_groups() { +// let (actor, handle) = Actor::spawn(None, TestActor, ()) +// .await +// .expect("Failed to spawn test actor"); + +// let scope_a = concat!(function_name!(), "_a").to_string(); +// let scope_b = concat!(function_name!(), "_b").to_string(); +// let group_a = concat!(function_name!(), "_a").to_string(); +// let group_b = concat!(function_name!(), "_b").to_string(); + +// // join all scopes twice with each group +// let scope_group = [ +// (scope_a.clone(), group_a.clone()), +// (scope_a.clone(), group_b.clone()), +// (scope_b.clone(), group_a.clone()), +// (scope_b.clone(), group_b.clone()), +// ]; + +// for (scope, group) in scope_group.iter() { +// pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); +// pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); +// } + +// let scopes_and_groups = which_scopes_and_groups(); +// println!("Scopes and groups are: {:#?}", scopes_and_groups); +// assert_eq!(4, scopes_and_groups.len()); + +// // Cleanup +// actor.stop(None); +// handle.await.expect("Actor cleanup failed"); + +// let scopes_and_groups = which_scopes_and_groups(); +// assert!(scopes_and_groups.is_empty()); +// } + #[named] #[crate::concurrency::test] #[tracing_test::traced_test] @@ -114,7 +152,7 @@ async fn test_multiple_members_in_group() { #[named] #[crate::concurrency::test] #[tracing_test::traced_test] -async fn test_multiple_members_in_group_in_named_scope() { +async fn test_multiple_members_in_scoped_group() { let scope = function_name!().to_string(); let group = function_name!().to_string(); @@ -129,7 +167,7 @@ async fn test_multiple_members_in_group_in_named_scope() { } // join the group - pg::join_with_named_scope( + pg::join_scoped( scope.clone(), group.clone(), actors @@ -138,7 +176,7 @@ async fn test_multiple_members_in_group_in_named_scope() { .collect::>(), ); - let members = pg::get_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_members(&scope, &group); assert_eq!(10, members.len()); // Cleanup @@ -153,7 +191,7 @@ async fn test_multiple_members_in_group_in_named_scope() { #[named] #[crate::concurrency::test] #[tracing_test::traced_test] -async fn test_which_groups_in_named_scope() { +async fn test_which_scoped_groups() { let scope = function_name!().to_string(); let group = function_name!().to_string(); @@ -168,7 +206,7 @@ async fn test_which_groups_in_named_scope() { } // join the group - pg::join_with_named_scope( + pg::join_scoped( scope.clone(), group.clone(), actors @@ -177,7 +215,7 @@ async fn test_which_groups_in_named_scope() { .collect::>(), ); - let groups_in_scope = pg::which_groups_in_named_scope(&scope); + let groups_in_scope = pg::which_scoped_groups(&scope); assert_eq!(vec![scope.clone()], groups_in_scope); // Cleanup @@ -259,36 +297,36 @@ async fn test_multiple_groups_in_multiple_scopes() { .iter() .map(|a| a.clone().get_cell()) .collect::>(); - pg::join_with_named_scope(scope_a.clone(), group_a.clone(), these_actors); + pg::join_scoped(scope_a.clone(), group_a.clone(), these_actors); let these_actors = actors[5..10] .iter() .map(|a| a.clone().get_cell()) .collect::>(); - pg::join_with_named_scope(scope_a.clone(), group_b.clone(), these_actors); + pg::join_scoped(scope_a.clone(), group_b.clone(), these_actors); let these_actors = actors[0..5] .iter() .map(|a| a.clone().get_cell()) .collect::>(); - pg::join_with_named_scope(scope_b.clone(), group_a.clone(), these_actors); + pg::join_scoped(scope_b.clone(), group_a.clone(), these_actors); let these_actors = actors[5..10] .iter() .map(|a| a.clone().get_cell()) .collect::>(); - pg::join_with_named_scope(scope_b.clone(), group_b.clone(), these_actors); + pg::join_scoped(scope_b.clone(), group_b.clone(), these_actors); - let members = pg::get_members_with_named_scope(&scope_a, &group_a); + let members = pg::get_scoped_members(&scope_a, &group_a); assert_eq!(5, members.len()); - let members = pg::get_members_with_named_scope(&scope_a, &group_b); + let members = pg::get_scoped_members(&scope_a, &group_b); assert_eq!(5, members.len()); - let members = pg::get_members_with_named_scope(&scope_b, &group_a); + let members = pg::get_scoped_members(&scope_b, &group_a); assert_eq!(5, members.len()); - let members = pg::get_members_with_named_scope(&scope_b, &group_b); + let members = pg::get_scoped_members(&scope_b, &group_b); assert_eq!(5, members.len()); // Cleanup @@ -337,9 +375,9 @@ async fn test_actor_leaves_scope_on_shupdown() { let group = function_name!().to_string(); // join the scope and group - pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); - let members = pg::get_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_members(&scope, &group); assert_eq!(1, members.len()); // Cleanup @@ -347,7 +385,7 @@ async fn test_actor_leaves_scope_on_shupdown() { handle.await.expect("Actor cleanup failed"); drop(actor); - let members = pg::get_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_members(&scope, &group); assert_eq!(0, members.len()); } @@ -399,7 +437,7 @@ async fn test_actor_leaves_scope_manually() { .expect("Failed to spawn test actor"); // join the group in scope (create on first use) - pg::join_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + pg::join_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); // the scope was created and is present let scopes = pg::which_scopes(); @@ -409,11 +447,11 @@ async fn test_actor_leaves_scope_manually() { let groups = pg::which_groups(); assert!(groups.contains(&group)); - let members = pg::get_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_members(&scope, &group); assert_eq!(1, members.len()); // leave the group - pg::leave_with_named_scope(scope.clone(), group.clone(), vec![actor.clone().into()]); + pg::leave_scoped(scope.clone(), group.clone(), vec![actor.clone().into()]); // pif-paf-poof the scope is gone! let scopes = pg::which_scopes(); @@ -424,7 +462,7 @@ async fn test_actor_leaves_scope_manually() { assert!(!groups.contains(&group)); // members comes back empty - let members = pg::get_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_members(&scope, &group); assert_eq!(0, members.len()); // Cleanup @@ -562,7 +600,7 @@ async fn test_scope_monitoring() { myself: crate::ActorRef, _: (), ) -> Result { - pg::join_with_named_scope( + pg::join_scoped( self.scope.clone(), self.pg_group.clone(), vec![myself.into()], @@ -800,13 +838,13 @@ async fn local_vs_remote_pg_members_in_named_scopes() { handles.push(handle); // join the group in scope - pg::join_with_named_scope(scope.clone(), group.clone(), actors.to_vec()); + pg::join_scoped(scope.clone(), group.clone(), actors.to_vec()); // assert - let members = pg::get_local_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_local_members(&scope, &group); assert_eq!(10, members.len()); - let members = pg::get_members_with_named_scope(&scope, &group); + let members = pg::get_scoped_members(&scope, &group); assert_eq!(11, members.len()); // Cleanup diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index f6fad004..2e7e1a34 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -12,7 +12,7 @@ use std::convert::TryInto; use std::net::SocketAddr; use ractor::message::SerializedMessage; -use ractor::pg::{which_scopes_and_groups, GroupChangeMessage}; +use ractor::pg::{get_scoped_local_members, which_scopes_and_groups, GroupChangeMessage}; use ractor::registry::PidLifecycleEvent; use ractor::rpc::CallResult; use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, SpawnErr, SupervisionEvent}; @@ -696,7 +696,7 @@ impl NodeSession { // Scan all scopes with their PG groups + synchronize them let scopes_and_groups = which_scopes_and_groups(); for (scope, group) in scopes_and_groups { - let local_members = ractor::pg::get_local_members_with_named_scope(&scope, &group) + let local_members = get_scoped_local_members(&scope, &group) .into_iter() .filter(|v| v.supports_remoting()) .map(|act| control_protocol::Actor { From ee3a2c8862db55c97be6d40235e4866c52fd8c30 Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Thu, 19 Oct 2023 20:07:49 +0200 Subject: [PATCH 13/17] Deduplicate groups in `which_groups()` --- ractor/src/pg/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index a0bf6e13..4d561677 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -301,12 +301,15 @@ pub fn get_scoped_members(scope: &ScopeName, group_name: &GroupName) -> Vec`] representing all the registered group names pub fn which_groups() -> Vec { let monitor = get_monitor(); - monitor + let mut groups = monitor .map .iter() .map(|kvp| kvp.key().clone()) .map(|(_scope, group)| group.clone()) - .collect::>() + .collect::>(); + groups.sort_unstable(); + groups.dedup(); + groups } /// Returns a list of all known groups in scope `scope` From c29efc1184b1904406813f214c18c7bf389c743d Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Thu, 19 Oct 2023 20:09:48 +0200 Subject: [PATCH 14/17] Keep the protobuf spec backwards compatible --- ractor_cluster/src/protocol/control.proto | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ractor_cluster/src/protocol/control.proto b/ractor_cluster/src/protocol/control.proto index 89c058e9..dd5804e2 100644 --- a/ractor_cluster/src/protocol/control.proto +++ b/ractor_cluster/src/protocol/control.proto @@ -52,22 +52,22 @@ message Terminate { // Process group join occurred message PgJoin { - // the scope - string scope = 1; // The group - string group = 2; + string group = 1; // The actors - repeated Actor actors = 3; + repeated Actor actors = 2; + // the scope + string scope = 3; } // Process group leave occurred message PgLeave { - // the scope - string scope = 1; // The group - string group = 2; + string group = 1; // The actors - repeated Actor actors = 3; + repeated Actor actors = 2; + // The scope + string scope = 3; } // A collection of NodeSession endpoints @@ -97,4 +97,4 @@ message ControlMessage { // The list of node sessions on the remote host for transitive connections NodeSessions node_sessions = 8; } -} \ No newline at end of file +} From 75f37756cdb2f78171c7b4b66291bcb51d27be25 Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Thu, 19 Oct 2023 22:28:33 +0200 Subject: [PATCH 15/17] Change identifying tuple to proper type --- ractor/src/pg/mod.rs | 141 +++++++++++++------- ractor_cluster/src/node/node_session/mod.rs | 8 +- 2 files changed, 99 insertions(+), 50 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 4d561677..9b35ebfc 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -67,9 +67,30 @@ impl GroupChangeMessage { } } +/// Represents the combination of a `ScopeName` and a `GroupName` +/// that uniquely identifies a specific group in a specific scope +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct ScopeGroupKey { + /// the `ScopeName` + scope: ScopeName, + /// The `GroupName` + group: GroupName, +} + +impl ScopeGroupKey { + /// Retrieve the struct's scope + pub fn get_scope(&self) -> ScopeName { + self.scope.to_owned() + } + /// Retrieve the struct's group + pub fn get_group(&self) -> GroupName { + self.group.to_owned() + } +} + struct PgState { - map: Arc>>, - listeners: Arc>>, + map: Arc>>, + listeners: Arc>>, } static PG_MONITOR: OnceCell = OnceCell::new(); @@ -95,9 +116,14 @@ pub fn join(group: GroupName, actors: Vec) { /// * `group` - The statically named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); + // insert into the monitor group - match monitor.map.entry((scope.to_owned().clone(), group.clone())) { + match monitor.map.entry(key.to_owned()) { Occupied(mut occupied) => { let oref = occupied.get_mut(); for actor in actors.iter() { @@ -113,7 +139,7 @@ pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec) { } } // notify supervisors - if let Some(listeners) = monitor.listeners.get(&(scope.to_owned(), group.clone())) { + if let Some(listeners) = monitor.listeners.get(&key) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( GroupChangeMessage::Join(scope.to_owned(), group.clone(), actors.clone()), @@ -147,8 +173,12 @@ pub fn leave(group: GroupName, actors: Vec) { /// * `group` - The statically named group /// * `actors` - The list of actors to remove from the group pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); - match monitor.map.entry((scope.to_owned(), group.clone())) { + match monitor.map.entry(key.to_owned()) { Vacant(_) => {} Occupied(mut occupied) => { let mut_ref = occupied.get_mut(); @@ -160,7 +190,7 @@ pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec) if mut_ref.is_empty() { occupied.remove(); } - if let Some(listeners) = monitor.listeners.get(&(scope.to_owned(), group.clone())) { + if let Some(listeners) = monitor.listeners.get(&key) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( GroupChangeMessage::Leave(scope.to_owned(), group.clone(), actors.clone()), @@ -206,27 +236,29 @@ pub(crate) fn leave_all(actor: ActorId) { // notify the listeners let all_listeners = pg_monitor.listeners.clone(); - for ((scope_name, group_name), cell) in removal_events.into_iter() { - if let Some(this_listeners) = all_listeners.get(&(scope_name.clone(), group_name.clone())) { + for (scope_and_group, cell) in removal_events.into_iter() { + if let Some(this_listeners) = all_listeners.get(&scope_and_group) { this_listeners.iter().for_each(|listener| { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( GroupChangeMessage::Leave( - scope_name.clone(), - group_name.clone(), + scope_and_group.scope.clone(), + scope_and_group.group.clone(), vec![cell.clone()], ), )); }); } // notify the world monitors - if let Some(listeners) = - all_listeners.get(&(scope_name.clone(), ALL_GROUPS_NOTIFICATION.to_owned())) - { + let world_monitor_scoped = ScopeGroupKey { + scope: scope_and_group.scope, + group: ALL_GROUPS_NOTIFICATION.to_owned(), + }; + if let Some(listeners) = all_listeners.get(&world_monitor_scoped) { for listener in listeners.value() { let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged( GroupChangeMessage::Leave( - scope_name.clone(), - group_name.clone(), + world_monitor_scoped.scope.clone(), + world_monitor_scoped.group.clone(), vec![cell.clone()], ), )); @@ -246,8 +278,8 @@ pub(crate) fn leave_all(actor: ActorId) { /// * `group_name` - Either a statically named group /// /// Returns a [`Vec`] representing the members of this paging group -pub fn get_local_members(group_name: &GroupName) -> Vec { - get_scoped_local_members(&DEFAULT_SCOPE.to_owned(), group_name) +pub fn get_local_members(group: &GroupName) -> Vec { + get_scoped_local_members(&DEFAULT_SCOPE.to_owned(), group) } /// Returns all actors running on the local node in the group `group` @@ -257,9 +289,13 @@ pub fn get_local_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group /// /// Returns a [`Vec`] representing the members of this paging group -pub fn get_scoped_local_members(scope: &ScopeName, group_name: &GroupName) -> Vec { +pub fn get_scoped_local_members(scope: &ScopeName, group: &GroupName) -> Vec { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { + if let Some(actors) = monitor.map.get(&key) { actors .value() .values() @@ -287,9 +323,13 @@ pub fn get_members(group_name: &GroupName) -> Vec { /// * `group_name` - Either a statically named group or scope /// /// Returns a [`Vec`] with the member actors -pub fn get_scoped_members(scope: &ScopeName, group_name: &GroupName) -> Vec { +pub fn get_scoped_members(scope: &ScopeName, group: &GroupName) -> Vec { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: group.to_owned(), + }; let monitor = get_monitor(); - if let Some(actors) = monitor.map.get(&(scope.to_owned(), group_name.to_owned())) { + if let Some(actors) = monitor.map.get(&key) { actors.value().values().cloned().collect::>() } else { vec![] @@ -305,7 +345,7 @@ pub fn which_groups() -> Vec { .map .iter() .map(|kvp| kvp.key().clone()) - .map(|(_scope, group)| group.clone()) + .map(|kvp| kvp.group.clone()) .collect::>(); groups.sort_unstable(); groups.dedup(); @@ -324,8 +364,8 @@ pub fn which_scoped_groups(scope: &ScopeName) -> Vec { .map .iter() .map(|kvp| kvp.key().clone()) - .filter(|(scope_name, _group)| scope == scope_name) - .map(|(_scope_name, group)| group.clone()) + .filter(|kvp| kvp.scope == *scope) + .map(|kvp| kvp.group.clone()) .collect::>() } @@ -333,7 +373,7 @@ pub fn which_scoped_groups(scope: &ScopeName) -> Vec { /// /// Returns a [`Vec<(ScopeName,GroupName)>`] representing all the registered /// combinations that form an identifying tuple -pub fn which_scopes_and_groups() -> Vec<(ScopeName, GroupName)> { +pub fn which_scopes_and_groups() -> Vec { let monitor = get_monitor(); monitor .map @@ -351,7 +391,7 @@ pub fn which_scopes() -> Vec { .map .iter() .map(|kvp| kvp.key().clone()) - .map(|(scope, _group)| scope.clone()) + .map(|kvp| kvp.scope.clone()) .collect::>() } @@ -360,12 +400,13 @@ pub fn which_scopes() -> Vec { /// /// * `group_name` - The group to monitor /// * `actor` - The [ActorCell] representing who will receive updates -pub fn monitor(group_name: GroupName, actor: ActorCell) { +pub fn monitor(group: GroupName, actor: ActorCell) { + let key = ScopeGroupKey { + scope: DEFAULT_SCOPE.to_owned(), + group, + }; let monitor = get_monitor(); - match monitor - .listeners - .entry((DEFAULT_SCOPE.to_owned(), group_name)) - { + match monitor.listeners.entry(key) { Occupied(mut occupied) => occupied.get_mut().push(actor), Vacant(vacancy) => { vacancy.insert(vec![actor]); @@ -378,13 +419,14 @@ pub fn monitor(group_name: GroupName, actor: ActorCell) { /// * `scope` - the scope to monitor /// * `actor` - The [ActorCell] representing who will receive updates pub fn monitor_scope(scope: ScopeName, actor: ActorCell) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group: ALL_GROUPS_NOTIFICATION.to_owned(), + }; let monitor = get_monitor(); // Register at world monitor first - match monitor - .listeners - .entry((scope.clone(), ALL_GROUPS_NOTIFICATION.to_owned())) - { + match monitor.listeners.entry(key) { Occupied(mut occupied) => occupied.get_mut().push(actor.clone()), Vacant(vacancy) => { vacancy.insert(vec![actor.clone()]); @@ -393,7 +435,11 @@ pub fn monitor_scope(scope: ScopeName, actor: ActorCell) { let groups_in_scope = which_scoped_groups(&scope); for group in groups_in_scope { - match monitor.listeners.entry((scope.to_owned(), group)) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group, + }; + match monitor.listeners.entry(key) { Occupied(mut occupied) => occupied.get_mut().push(actor.clone()), Vacant(vacancy) => { vacancy.insert(vec![actor.clone()]); @@ -408,11 +454,12 @@ pub fn monitor_scope(scope: ScopeName, actor: ActorCell) { /// * `group_name` - The group to demonitor /// * `actor` - The [ActorCell] representing who will no longer receive updates pub fn demonitor(group_name: GroupName, actor: ActorId) { + let key = ScopeGroupKey { + scope: DEFAULT_SCOPE.to_owned(), + group: group_name, + }; let monitor = get_monitor(); - if let Occupied(mut entry) = monitor - .listeners - .entry((DEFAULT_SCOPE.to_owned(), group_name)) - { + if let Occupied(mut entry) = monitor.listeners.entry(key) { let mut_ref = entry.get_mut(); mut_ref.retain(|a| a.get_id() != actor); if mut_ref.is_empty() { @@ -429,7 +476,11 @@ pub fn demonitor_scope(scope: ScopeName, actor: ActorId) { let monitor = get_monitor(); let groups_in_scope = which_scoped_groups(&scope); for group in groups_in_scope { - if let Occupied(mut entry) = monitor.listeners.entry((scope.to_owned(), group)) { + let key = ScopeGroupKey { + scope: scope.to_owned(), + group, + }; + if let Occupied(mut entry) = monitor.listeners.entry(key) { let mut_ref = entry.get_mut(); mut_ref.retain(|a| a.get_id() != actor); if mut_ref.is_empty() { @@ -463,16 +514,14 @@ pub(crate) fn demonitor_all(actor: ActorId) { /// /// Returns a `Vec` represending all registered tuples /// for which ane of the values is equivalent to one of the world_monitor_keys -fn get_world_monitor_keys() -> Vec<(ScopeName, GroupName)> { +fn get_world_monitor_keys() -> Vec { let monitor = get_monitor(); let mut world_monitor_keys = monitor .listeners .iter() .map(|kvp| kvp.key().clone()) - .filter(|(scope_name, group_name)| { - scope_name == ALL_SCOPES_NOTIFICATION || group_name == ALL_GROUPS_NOTIFICATION - }) - .collect::>(); + .filter(|kvp| kvp.scope == ALL_SCOPES_NOTIFICATION || kvp.group == ALL_GROUPS_NOTIFICATION) + .collect::>(); world_monitor_keys.sort_unstable(); world_monitor_keys.dedup(); world_monitor_keys diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index 2e7e1a34..15de5bd3 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -695,8 +695,8 @@ impl NodeSession { // Scan all scopes with their PG groups + synchronize them let scopes_and_groups = which_scopes_and_groups(); - for (scope, group) in scopes_and_groups { - let local_members = get_scoped_local_members(&scope, &group) + for key in scopes_and_groups { + let local_members = get_scoped_local_members(&key.get_scope(), &key.get_group()) .into_iter() .filter(|v| v.supports_remoting()) .map(|act| control_protocol::Actor { @@ -708,8 +708,8 @@ impl NodeSession { let control_message = control_protocol::ControlMessage { msg: Some(control_protocol::control_message::Msg::PgJoin( control_protocol::PgJoin { - scope: scope.to_owned(), - group, + scope: key.get_scope(), + group: key.get_group(), actors: local_members, }, )), From 0abef7c867b51024fe670f84fb84290da0344ec5 Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sat, 4 Nov 2023 13:07:00 +0100 Subject: [PATCH 16/17] More idiomatic use of `map`s and `filter_map`s --- ractor/src/pg/mod.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index 9b35ebfc..fdfd579e 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -344,8 +344,7 @@ pub fn which_groups() -> Vec { let mut groups = monitor .map .iter() - .map(|kvp| kvp.key().clone()) - .map(|kvp| kvp.group.clone()) + .map(|kvp| kvp.key().group.to_owned()) .collect::>(); groups.sort_unstable(); groups.dedup(); @@ -363,9 +362,14 @@ pub fn which_scoped_groups(scope: &ScopeName) -> Vec { monitor .map .iter() - .map(|kvp| kvp.key().clone()) - .filter(|kvp| kvp.scope == *scope) - .map(|kvp| kvp.group.clone()) + .filter_map(|kvp| { + let key = kvp.key(); + if key.scope == *scope { + Some(key.group.to_owned()) + } else { + None + } + }) .collect::>() } @@ -390,8 +394,10 @@ pub fn which_scopes() -> Vec { monitor .map .iter() - .map(|kvp| kvp.key().clone()) - .map(|kvp| kvp.scope.clone()) + .map(|kvp| { + let key = kvp.key(); + key.scope.to_owned() + }) .collect::>() } @@ -519,8 +525,14 @@ fn get_world_monitor_keys() -> Vec { let mut world_monitor_keys = monitor .listeners .iter() - .map(|kvp| kvp.key().clone()) - .filter(|kvp| kvp.scope == ALL_SCOPES_NOTIFICATION || kvp.group == ALL_GROUPS_NOTIFICATION) + .filter_map(|kvp| { + let key = kvp.key().clone(); + if key.scope == ALL_SCOPES_NOTIFICATION || key.group == ALL_GROUPS_NOTIFICATION { + Some(key) + } else { + None + } + }) .collect::>(); world_monitor_keys.sort_unstable(); world_monitor_keys.dedup(); From 7b1ee20ef2c2ff369f44a8a495a9945bffb294df Mon Sep 17 00:00:00 2001 From: Leon Qadirie Date: Sat, 4 Nov 2023 13:17:40 +0100 Subject: [PATCH 17/17] Fixed and improved documentation --- ractor/src/pg/mod.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index fdfd579e..0b205f28 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -104,7 +104,7 @@ fn get_monitor<'a>() -> &'a PgState { /// Join actors to the group `group` in the default scope /// -/// * `group` - The statically named group. Will be created if first actors to join +/// * `group` - The named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group pub fn join(group: GroupName, actors: Vec) { join_scoped(DEFAULT_SCOPE.to_owned(), group, actors); @@ -112,8 +112,8 @@ pub fn join(group: GroupName, actors: Vec) { /// Join actors to the group `group` within the scope `scope` /// -/// * `scope` - the statically named scope. Will be created if first actors join -/// * `group` - The statically named group. Will be created if first actors to join +/// * `scope` - the named scope. Will be created if first actors to join +/// * `group` - The named group. Will be created if first actors to join /// * `actors` - The list of [crate::Actor]s to add to the group pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec) { let key = ScopeGroupKey { @@ -161,7 +161,7 @@ pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec) { /// Leaves the specified [crate::Actor]s from the PG group in the default scope /// -/// * `group` - The statically named group +/// * `group` - A named group /// * `actors` - The list of actors to remove from the group pub fn leave(group: GroupName, actors: Vec) { leave_scoped(DEFAULT_SCOPE.to_owned(), group, actors); @@ -169,8 +169,8 @@ pub fn leave(group: GroupName, actors: Vec) { /// Leaves the specified [crate::Actor]s from the PG group within the scope `scope` /// -/// * `scope` - The statically named scope -/// * `group` - The statically named group +/// * `scope` - A named scope +/// * `group` - A named group /// * `actors` - The list of actors to remove from the group pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec) { let key = ScopeGroupKey { @@ -275,7 +275,7 @@ pub(crate) fn leave_all(actor: ActorId) { /// Returns all actors running on the local node in the group `group` /// in the default scope. /// -/// * `group_name` - Either a statically named group +/// * `group` - A named group /// /// Returns a [`Vec`] representing the members of this paging group pub fn get_local_members(group: &GroupName) -> Vec { @@ -285,8 +285,8 @@ pub fn get_local_members(group: &GroupName) -> Vec { /// Returns all actors running on the local node in the group `group` /// in scope `scope` /// -/// * `scope_name` - A statically named scope -/// * `group_name` - Either a statically named group +/// * `scope_name` - A named scope +/// * `group_name` - A named group /// /// Returns a [`Vec`] representing the members of this paging group pub fn get_scoped_local_members(scope: &ScopeName, group: &GroupName) -> Vec { @@ -310,7 +310,7 @@ pub fn get_scoped_local_members(scope: &ScopeName, group: &GroupName) -> Vec`] with the member actors pub fn get_members(group_name: &GroupName) -> Vec { @@ -320,7 +320,8 @@ pub fn get_members(group_name: &GroupName) -> Vec { /// Returns all the actors running on any node in the group `group` /// in the scope `scope`. /// -/// * `group_name` - Either a statically named group or scope +/// * `scope` - A named scope +/// * `group` - A named group /// /// Returns a [`Vec`] with the member actors pub fn get_scoped_members(scope: &ScopeName, group: &GroupName) -> Vec { @@ -375,7 +376,7 @@ pub fn which_scoped_groups(scope: &ScopeName) -> Vec { /// Returns a list of all known scope-group combinations. /// -/// Returns a [`Vec<(ScopeName,GroupName)>`] representing all the registered +/// Returns a [`Vec<(ScopGroupKey)>`] representing all the registered /// combinations that form an identifying tuple pub fn which_scopes_and_groups() -> Vec { let monitor = get_monitor(); @@ -518,7 +519,7 @@ pub(crate) fn demonitor_all(actor: ActorId) { /// Gets the keys for the world monitors. /// -/// Returns a `Vec` represending all registered tuples +/// Returns a `Vec` represending all registered tuples /// for which ane of the values is equivalent to one of the world_monitor_keys fn get_world_monitor_keys() -> Vec { let monitor = get_monitor();