Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pg: Add ScopeName -> Vec<GroupName> mapping #190

Merged
merged 10 commits into from
Feb 8, 2024
56 changes: 38 additions & 18 deletions ractor/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@

struct PgState {
map: Arc<DashMap<ScopeGroupKey, HashMap<ActorId, ActorCell>>>,
index: Arc<DashMap<ScopeName, Vec<GroupName>>>,
listeners: Arc<DashMap<ScopeGroupKey, Vec<ActorCell>>>,
}

Expand All @@ -98,6 +99,7 @@
fn get_monitor<'a>() -> &'a PgState {
PG_MONITOR.get_or_init(|| PgState {
map: Arc::new(DashMap::new()),
index: Arc::new(DashMap::new()),
listeners: Arc::new(DashMap::new()),
})
}
Expand Down Expand Up @@ -138,6 +140,18 @@
vacancy.insert(map);
}
}
match monitor.index.entry(scope.to_owned()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the problem that this approach takes (which is why it wasn't done this way in the beginning) is that this is technically a subtle race condition. We will probably need a global lock around both of the hash maps or something, but then there's going to be the benchmarking of the performance hit for taking a global lock.

Dashmaps are sharded maps so they can do concurrent updates from multiple threads, but what we'd be doing is locking around both maps with a singleton global lock.

Unless we can try and acquire both sharded locks, doing the updates, and then only releasing the locks simultaneously.

i.e.

lock(a)
lock(b)
update(a)
update(b)
release(a)
release(b)

It will be a little ugly with the scopes, but it's the best option for concurrent updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right of course.
I hope to find the time to experiment and benchmark, can't currently promise a timeline, though :)

Copy link
Contributor Author

@leonqadirie leonqadirie Feb 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, as far as I understand we are only talking about the map and index fields which are only mutated in the join* and leave* functions.

It duplicates a bit more code but I think just locking both fields and then nesting their updates (so the locks get dropped simultaneously when leaving scope) might work.

Occupied(mut occupied) => {
let oref = occupied.get_mut();
if !oref.contains(&group) {
oref.push(group.to_owned());
}
}
Vacant(vacancy) => {
vacancy.insert(vec![group.to_owned()]);
}
}

// notify supervisors
if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
Expand Down Expand Up @@ -186,10 +200,21 @@
mut_ref.remove(&actor.get_id());
}

// the scope and group tuple is empty, remove it
// if the scope and group tuple is empty, remove it
if mut_ref.is_empty() {
occupied.remove();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to the point above, i think we'd move this before the removal of the index? I still need to think a bit if there's a race that could occur here even with the drop operations release the locks sequentially


// remove the group and possibly the scope from the monitor's index
if let Some(mut groups_in_scope) = monitor.index.get_mut(&scope) {
groups_in_scope.retain(|group_name| group_name != &group);
if groups_in_scope.is_empty() {
// drop the `RefMut` to prevent a `DashMap` deadlock
drop(groups_in_scope);
monitor.index.remove(&scope);
}
}

Check warning on line 215 in ractor/src/pg/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/pg/mod.rs#L215

Added line #L215 was not covered by tests
}

if let Some(listeners) = monitor.listeners.get(&key) {
for listener in listeners.value() {
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
Expand Down Expand Up @@ -222,15 +247,15 @@
let pg_monitor = get_monitor();
let map = pg_monitor.map.clone();

let mut empty_groups = vec![];
let mut empty_scope_group_keys = vec![];
let mut removal_events = HashMap::new();

for mut kv in map.iter_mut() {
if let Some(actor_cell) = kv.value_mut().remove(&actor) {
removal_events.insert(kv.key().clone(), actor_cell);
}
if kv.value().is_empty() {
empty_groups.push(kv.key().clone());
empty_scope_group_keys.push(kv.key().clone());
}
}

Expand Down Expand Up @@ -267,8 +292,11 @@
}

// Cleanup empty groups
for group in empty_groups {
map.remove(&group);
for scope_group_key in empty_scope_group_keys {
map.remove(&scope_group_key);
if let Some(mut groups_in_scope) = pg_monitor.index.get_mut(&scope_group_key.scope) {
groups_in_scope.retain(|group| group != &scope_group_key.group);
}
}
}

Expand Down Expand Up @@ -360,23 +388,15 @@
/// in `scope`
pub fn which_scoped_groups(scope: &ScopeName) -> Vec<GroupName> {
let monitor = get_monitor();
monitor
.map
.iter()
.filter_map(|kvp| {
let key = kvp.key();
if key.scope == *scope {
Some(key.group.to_owned())
} else {
None
}
})
.collect::<Vec<_>>()
match monitor.index.get(scope) {
Some(groups) => groups.to_owned(),
None => vec![],
}
}

/// Returns a list of all known scope-group combinations.
///
/// Returns a [`Vec<(ScopGroupKey)>`] representing all the registered
/// Returns a [`Vec<ScopeGroupKey>`] representing all the registered
/// combinations that form an identifying tuple
pub fn which_scopes_and_groups() -> Vec<ScopeGroupKey> {
let monitor = get_monitor();
Expand Down
10 changes: 9 additions & 1 deletion ractor/src/pg/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async fn test_which_scoped_groups() {
);

let groups_in_scope = pg::which_scoped_groups(&scope);
assert_eq!(vec![scope.clone()], groups_in_scope);
assert_eq!(vec![group.clone()], groups_in_scope);

// Cleanup
for actor in actors {
Expand Down Expand Up @@ -418,6 +418,10 @@ async fn test_actor_leaves_pg_group_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_members(&group);
assert_eq!(0, members.len());
Expand Down Expand Up @@ -463,6 +467,10 @@ async fn test_actor_leaves_scope_manually() {
let groups = pg::which_groups();
assert!(!groups.contains(&group));

// pif-paf-poof the group is gone from the monitor's index!
let scoped_groups = pg::which_scoped_groups(&group);
assert!(!scoped_groups.contains(&group));

// members comes back empty
let members = pg::get_scoped_members(&scope, &group);
assert_eq!(0, members.len());
Expand Down
Loading