Skip to content

Commit

Permalink
pg: Add ScopeName -> Vec<GroupName> mapping (#190)
Browse files Browse the repository at this point in the history
* Fix documentation and comments

* Make test logic more consistent

* Add `index` field to `PgState`

* Add comment for `drop`

* Remove stale comment

* Draft of simultaneous locks for both `DashMap`s

* Rename variables to avoid rebindings in the scope

* Reduce race condition potential of `leave_*` functions

---------

Co-authored-by: Leon Qadirie <qadirie.leon@outlook>
  • Loading branch information
leonqadirie and Leon Qadirie authored Feb 8, 2024
1 parent ce6d394 commit bd1f69b
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 27 deletions.
92 changes: 66 additions & 26 deletions ractor/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl ScopeGroupKey {

struct PgState {
map: Arc<DashMap<ScopeGroupKey, HashMap<ActorId, ActorCell>>>,
index: Arc<DashMap<ScopeName, Vec<GroupName>>>,
listeners: Arc<DashMap<ScopeGroupKey, Vec<ActorCell>>>,
}

Expand All @@ -98,6 +99,7 @@ static PG_MONITOR: OnceCell<PgState> = OnceCell::new();
fn get_monitor<'a>() -> &'a PgState {
PG_MONITOR.get_or_init(|| PgState {
map: Arc::new(DashMap::new()),
index: Arc::new(DashMap::new()),
listeners: Arc::new(DashMap::new()),
})
}
Expand All @@ -112,7 +114,7 @@ pub fn join(group: GroupName, actors: Vec<ActorCell>) {

/// Join actors to the group `group` within the scope `scope`
///
/// * `scope` - the named scope. Will be created if first actors to join
/// * `scope` - The named scope. Will be created if first actors to join
/// * `group` - The named group. Will be created if first actors to join
/// * `actors` - The list of [crate::Actor]s to add to the group
pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>) {
Expand All @@ -122,22 +124,49 @@ pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>) {
};
let monitor = get_monitor();

// lock the `PgState`'s `map` and `index` DashMaps.
let monitor_map = monitor.map.entry(key.to_owned());
let monitor_idx = monitor.index.entry(scope.to_owned());

// insert into the monitor group
match monitor.map.entry(key.to_owned()) {
Occupied(mut occupied) => {
let oref = occupied.get_mut();
match monitor_map {
Occupied(mut occupied_map) => {
let oref = occupied_map.get_mut();
for actor in actors.iter() {
oref.insert(actor.get_id(), actor.clone());
}
match monitor_idx {
Occupied(mut occupied_idx) => {
let oref = occupied_idx.get_mut();
if !oref.contains(&group) {
oref.push(group.to_owned());
}
}
Vacant(vacancy) => {
vacancy.insert(vec![group.to_owned()]);
}
}
}
Vacant(vacancy) => {
let map = actors
.iter()
.map(|a| (a.get_id(), a.clone()))
.collect::<HashMap<_, _>>();
vacancy.insert(map);
match monitor_idx {
Occupied(mut occupied_idx) => {
let oref = occupied_idx.get_mut();
if !oref.contains(&group) {
oref.push(group.to_owned());
}
}
Vacant(vacancy) => {
vacancy.insert(vec![group.to_owned()]);
}
}
}
}

// notify supervisors
if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
Expand Down Expand Up @@ -178,18 +207,34 @@ pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>)
group: group.to_owned(),
};
let monitor = get_monitor();
match monitor.map.entry(key.to_owned()) {

// lock the `PgState`'s `map` and `index` DashMaps.
let monitor_map = monitor.map.entry(key.to_owned());
let monitor_idx = monitor.index.get_mut(&scope);

match monitor_map {
Vacant(_) => {}
Occupied(mut occupied) => {
let mut_ref = occupied.get_mut();
Occupied(mut occupied_map) => {
let mut_ref = occupied_map.get_mut();
for actor in actors.iter() {
mut_ref.remove(&actor.get_id());
}

// the scope and group tuple is empty, remove it
// if the scope and group tuple is empty, remove it
if mut_ref.is_empty() {
occupied.remove();
occupied_map.remove();
}

// remove the group and possibly the scope from the monitor's index
if let Some(mut groups_in_scope) = monitor_idx {
groups_in_scope.retain(|group_name| group_name != &group);
if groups_in_scope.is_empty() {
// drop the `RefMut` to prevent a `DashMap` deadlock
drop(groups_in_scope);
monitor.index.remove(&scope);
}
}

if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
Expand Down Expand Up @@ -222,15 +267,15 @@ pub(crate) fn leave_all(actor: ActorId) {
let pg_monitor = get_monitor();
let map = pg_monitor.map.clone();

let mut empty_groups = vec![];
let mut empty_scope_group_keys = vec![];
let mut removal_events = HashMap::new();

for mut kv in map.iter_mut() {
if let Some(actor_cell) = kv.value_mut().remove(&actor) {
removal_events.insert(kv.key().clone(), actor_cell);
}
if kv.value().is_empty() {
empty_groups.push(kv.key().clone());
empty_scope_group_keys.push(kv.key().clone());
}
}

Expand Down Expand Up @@ -267,8 +312,11 @@ pub(crate) fn leave_all(actor: ActorId) {
}

// Cleanup empty groups
for group in empty_groups {
map.remove(&group);
for scope_group_key in empty_scope_group_keys {
map.remove(&scope_group_key);
if let Some(mut groups_in_scope) = pg_monitor.index.get_mut(&scope_group_key.scope) {
groups_in_scope.retain(|group| group != &scope_group_key.group);
}
}
}

Expand Down Expand Up @@ -360,23 +408,15 @@ pub fn which_groups() -> Vec<GroupName> {
/// in `scope`
pub fn which_scoped_groups(scope: &ScopeName) -> Vec<GroupName> {
let monitor = get_monitor();
monitor
.map
.iter()
.filter_map(|kvp| {
let key = kvp.key();
if key.scope == *scope {
Some(key.group.to_owned())
} else {
None
}
})
.collect::<Vec<_>>()
match monitor.index.get(scope) {
Some(groups) => groups.to_owned(),
None => vec![],
}
}

/// Returns a list of all known scope-group combinations.
///
/// Returns a [`Vec<(ScopGroupKey)>`] representing all the registered
/// Returns a [`Vec<ScopeGroupKey>`] representing all the registered
/// combinations that form an identifying tuple
pub fn which_scopes_and_groups() -> Vec<ScopeGroupKey> {
let monitor = get_monitor();
Expand Down
10 changes: 9 additions & 1 deletion ractor/src/pg/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async fn test_which_scoped_groups() {
);

let groups_in_scope = pg::which_scoped_groups(&scope);
assert_eq!(vec![scope.clone()], groups_in_scope);
assert_eq!(vec![group.clone()], groups_in_scope);

// Cleanup
for actor in actors {
Expand Down Expand Up @@ -418,6 +418,10 @@ async fn test_actor_leaves_pg_group_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_members(&group);
assert_eq!(0, members.len());
Expand Down Expand Up @@ -463,6 +467,10 @@ async fn test_actor_leaves_scope_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_scoped_members(&scope, &group);
assert_eq!(0, members.len());
Expand Down

0 comments on commit bd1f69b

Please sign in to comment.