Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pg: Add ScopeName -> Vec<GroupName> mapping #190

Merged
merged 10 commits into from
Feb 8, 2024
92 changes: 66 additions & 26 deletions ractor/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@

struct PgState {
map: Arc<DashMap<ScopeGroupKey, HashMap<ActorId, ActorCell>>>,
index: Arc<DashMap<ScopeName, Vec<GroupName>>>,
listeners: Arc<DashMap<ScopeGroupKey, Vec<ActorCell>>>,
}

Expand All @@ -98,6 +99,7 @@
fn get_monitor<'a>() -> &'a PgState {
PG_MONITOR.get_or_init(|| PgState {
map: Arc::new(DashMap::new()),
index: Arc::new(DashMap::new()),
listeners: Arc::new(DashMap::new()),
})
}
Expand All @@ -112,7 +114,7 @@

/// Join actors to the group `group` within the scope `scope`
///
/// * `scope` - the named scope. Will be created if first actors to join
/// * `scope` - The named scope. Will be created if first actors to join
/// * `group` - The named group. Will be created if first actors to join
/// * `actors` - The list of [crate::Actor]s to add to the group
pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>) {
Expand All @@ -122,22 +124,49 @@
};
let monitor = get_monitor();

// lock the `PgState`'s `map` and `index` DashMaps.
let monitor_map = monitor.map.entry(key.to_owned());
let monitor_idx = monitor.index.entry(scope.to_owned());

// insert into the monitor group
match monitor.map.entry(key.to_owned()) {
Occupied(mut occupied) => {
let oref = occupied.get_mut();
match monitor_map {
Occupied(mut occupied_map) => {
let oref = occupied_map.get_mut();
for actor in actors.iter() {
oref.insert(actor.get_id(), actor.clone());
}
match monitor_idx {
Occupied(mut occupied_idx) => {
let oref = occupied_idx.get_mut();
if !oref.contains(&group) {
oref.push(group.to_owned());

Check warning on line 142 in ractor/src/pg/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/pg/mod.rs#L142

Added line #L142 was not covered by tests
}
}
Vacant(vacancy) => {
vacancy.insert(vec![group.to_owned()]);
}

Check warning on line 147 in ractor/src/pg/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/pg/mod.rs#L145-L147

Added lines #L145 - L147 were not covered by tests
}
}
Vacant(vacancy) => {
let map = actors
.iter()
.map(|a| (a.get_id(), a.clone()))
.collect::<HashMap<_, _>>();
vacancy.insert(map);
match monitor_idx {
Occupied(mut occupied_idx) => {
let oref = occupied_idx.get_mut();
if !oref.contains(&group) {
oref.push(group.to_owned());
}
}
Vacant(vacancy) => {
vacancy.insert(vec![group.to_owned()]);
}
}
}
}

// notify supervisors
if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
Expand Down Expand Up @@ -178,18 +207,34 @@
group: group.to_owned(),
};
let monitor = get_monitor();
match monitor.map.entry(key.to_owned()) {

// lock the `PgState`'s `map` and `index` DashMaps.
let monitor_map = monitor.map.entry(key.to_owned());
let monitor_idx = monitor.index.get_mut(&scope);

match monitor_map {
Vacant(_) => {}
Occupied(mut occupied) => {
let mut_ref = occupied.get_mut();
Occupied(mut occupied_map) => {
let mut_ref = occupied_map.get_mut();
for actor in actors.iter() {
mut_ref.remove(&actor.get_id());
}

// the scope and group tuple is empty, remove it
// if the scope and group tuple is empty, remove it
if mut_ref.is_empty() {
occupied.remove();
occupied_map.remove();
}

// remove the group and possibly the scope from the monitor's index
if let Some(mut groups_in_scope) = monitor_idx {
groups_in_scope.retain(|group_name| group_name != &group);
if groups_in_scope.is_empty() {
// drop the `RefMut` to prevent a `DashMap` deadlock
drop(groups_in_scope);
monitor.index.remove(&scope);
}
}

if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
Expand Down Expand Up @@ -222,15 +267,15 @@
let pg_monitor = get_monitor();
let map = pg_monitor.map.clone();

let mut empty_groups = vec![];
let mut empty_scope_group_keys = vec![];
let mut removal_events = HashMap::new();

for mut kv in map.iter_mut() {
if let Some(actor_cell) = kv.value_mut().remove(&actor) {
removal_events.insert(kv.key().clone(), actor_cell);
}
if kv.value().is_empty() {
empty_groups.push(kv.key().clone());
empty_scope_group_keys.push(kv.key().clone());
}
}

Expand Down Expand Up @@ -267,8 +312,11 @@
}

// Cleanup empty groups
for group in empty_groups {
map.remove(&group);
for scope_group_key in empty_scope_group_keys {
map.remove(&scope_group_key);
if let Some(mut groups_in_scope) = pg_monitor.index.get_mut(&scope_group_key.scope) {
groups_in_scope.retain(|group| group != &scope_group_key.group);
}
}
}

Expand Down Expand Up @@ -360,23 +408,15 @@
/// in `scope`
pub fn which_scoped_groups(scope: &ScopeName) -> Vec<GroupName> {
let monitor = get_monitor();
monitor
.map
.iter()
.filter_map(|kvp| {
let key = kvp.key();
if key.scope == *scope {
Some(key.group.to_owned())
} else {
None
}
})
.collect::<Vec<_>>()
match monitor.index.get(scope) {
Some(groups) => groups.to_owned(),
None => vec![],
}
}

/// Returns a list of all known scope-group combinations.
///
/// Returns a [`Vec<(ScopGroupKey)>`] representing all the registered
/// Returns a [`Vec<ScopeGroupKey>`] representing all the registered
/// combinations that form an identifying tuple
pub fn which_scopes_and_groups() -> Vec<ScopeGroupKey> {
let monitor = get_monitor();
Expand Down
10 changes: 9 additions & 1 deletion ractor/src/pg/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async fn test_which_scoped_groups() {
);

let groups_in_scope = pg::which_scoped_groups(&scope);
assert_eq!(vec![scope.clone()], groups_in_scope);
assert_eq!(vec![group.clone()], groups_in_scope);

// Cleanup
for actor in actors {
Expand Down Expand Up @@ -418,6 +418,10 @@ async fn test_actor_leaves_pg_group_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_members(&group);
assert_eq!(0, members.len());
Expand Down Expand Up @@ -463,6 +467,10 @@ async fn test_actor_leaves_scope_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_scoped_members(&scope, &group);
assert_eq!(0, members.len());
Expand Down
Loading