Skip to content

Commit

Permalink
Add monitoring of actors from peers, without full linking support. (#170
Browse files Browse the repository at this point in the history
)

Adds tests of new functionality
  • Loading branch information
slawlor authored Sep 25, 2023
1 parent bb33d4f commit 7382cb2
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 12 deletions.
35 changes: 32 additions & 3 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,32 @@ impl ActorCell {
self.inner.tree.clear_supervisor();
}

/// Monitor the provided [super::Actor] for supervision events. An actor in `ractor` can
/// only have a single supervisor, denoted by the `link` function, however they
/// may have multiple `monitors`. Monitor's receive copies of the [SupervisionEvent]s,
/// with non-cloneable information removed.
///
/// * `who`: The actor to monitor
pub fn monitor(&self, who: ActorCell) {
who.inner.tree.set_monitor(self.clone());
self.inner.tree.mark_monitored(who);
}

/// Stop monitoring the provided [super::Actor] for supervision events.
///
/// * `who`: The actor to stop monitoring
pub fn unmonitor(&self, who: ActorCell) {
self.inner.tree.unmark_monitored(who.get_id());
who.inner.tree.remove_monitor(self.get_id());
}

/// Clear all the [self::Actor]s which are monitored by this [self::Actor]
pub fn clear_monitors(&self) {
for id in self.inner.tree.monitored_actors() {
self.unmonitor(id);
}
}

/// Kill this [super::Actor] forcefully (terminates async work)
pub fn kill(&self) {
let _ = self.inner.send_signal(Signal::Kill);
Expand Down Expand Up @@ -433,11 +459,14 @@ impl ActorCell {
self.inner.send_serialized(message)
}

/// Notify the supervisors that a supervision event occurred
/// Notify the supervisor and all monitors that a supervision event occurred.
/// Monitors receive a reduced copy of the supervision event which won't contain
/// the [crate::actor::BoxedState] and collapses the [crate::ActorProcessingErr]
/// exception to a [String]
///
/// * `evt` - The event to send to this [super::Actor]'s supervisors
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
self.inner.tree.notify_supervisor(evt)
pub fn notify_supervisor_and_monitors(&self, evt: SupervisionEvent) {
self.inner.tree.notify_supervisor_and_monitors(evt)
}

pub(crate) fn get_type_id(&self) -> TypeId {
Expand Down
9 changes: 6 additions & 3 deletions ractor/src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ impl<TMessage> ActorRef<TMessage> {
self.inner.clone()
}

/// Notify the supervisors that a supervision event occurred
/// Notify the supervisor and all monitors that a supervision event occurred.
/// Monitors receive a reduced copy of the supervision event which won't contain
/// the [crate::actor::BoxedState] and collapses the [crate::ActorProcessingErr]
/// exception to a [String]
///
/// * `evt` - The event to send to this [crate::Actor]'s supervisors
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
self.inner.notify_supervisor(evt)
pub fn notify_supervisor_and_monitors(&self, evt: SupervisionEvent) {
self.inner.notify_supervisor_and_monitors(evt)
}
}

Expand Down
22 changes: 22 additions & 0 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,28 @@ pub enum SupervisionEvent {
PidLifecycleEvent(crate::registry::PidLifecycleEvent),
}

impl SupervisionEvent {
/// Clone the supervision event, without requiring inner data
/// be cloneable. This means that the actor error (if present) is converted
/// to a string and copied as well as the state upon termination being not
/// propogated. If the state were cloneable, we could propogate it, however
/// that restriction is overly restrictive, so we've avoided it.
pub(crate) fn clone_no_data(&self) -> Self {
match self {
Self::ActorStarted(who) => Self::ActorStarted(who.clone()),
Self::ActorPanicked(who, what) => {
Self::ActorPanicked(who.clone(), From::from(format!("{what}")))
}
Self::ProcessGroupChanged(what) => Self::ProcessGroupChanged(what.clone()),
Self::ActorTerminated(who, _state, msg) => {
Self::ActorTerminated(who.clone(), None, msg.as_ref().cloned())
}
#[cfg(feature = "cluster")]
Self::PidLifecycleEvent(evt) => Self::PidLifecycleEvent(evt.clone()),
}
}
}

impl Debug for SupervisionEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Supervision event: {self}")
Expand Down
7 changes: 5 additions & 2 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,10 @@ where
myself.terminate();

// notify supervisors of the actor's death
myself.notify_supervisor(evt);
myself.notify_supervisor_and_monitors(evt);

// clear any monitor actors
myself.clear_monitors();

// unlink superisors
if let Some(sup) = supervisor {
Expand Down Expand Up @@ -565,7 +568,7 @@ where
.map_err(ActorErr::Panic)?;

myself.set_status(ActorStatus::Running);
myself.notify_supervisor(SupervisionEvent::ActorStarted(myself.get_cell()));
myself.notify_supervisor_and_monitors(SupervisionEvent::ActorStarted(myself.get_cell()));

let myself_clone = myself.clone();

Expand Down
37 changes: 35 additions & 2 deletions ractor/src/actor/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::ActorId;
pub struct SupervisionTree {
children: DashMap<ActorId, (u64, ActorCell)>,
supervisor: Arc<RwLock<Option<ActorCell>>>,
monitors: DashMap<ActorId, ActorCell>,
monitored: DashMap<ActorId, ActorCell>,
start_order: AtomicU64,
}

Expand Down Expand Up @@ -60,6 +62,31 @@ impl SupervisionTree {
*(self.supervisor.write().unwrap()) = None;
}

/// Set a monitor of this supervision tree
pub fn set_monitor(&self, monitor: ActorCell) {
self.monitors.insert(monitor.get_id(), monitor);
}

/// Mark that this actor is monitoring some other actors
pub fn mark_monitored(&self, who: ActorCell) {
self.monitored.insert(who.get_id(), who);
}

/// Mark that this actor is no longer monitoring some other actors
pub fn unmark_monitored(&self, who: ActorId) {
self.monitored.remove(&who);
}

/// Remove a specific monitor from the supervision tree
pub fn remove_monitor(&self, monitor: ActorId) {
self.monitors.remove(&monitor);
}

/// Get the [ActorCell]s of the monitored actors this actor monitors
pub fn monitored_actors(&self) -> Vec<ActorCell> {
self.monitored.iter().map(|a| a.value().clone()).collect()
}

/// Terminate all your supervised children and unlink them
/// from the supervision tree since the supervisor is shutting down
/// and can't deal with superivison events anyways
Expand Down Expand Up @@ -87,8 +114,14 @@ impl SupervisionTree {
}
}

/// Send a notification to all supervisors
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
/// Send a notification to the supervisor and monitors.
///
/// CAVEAT: Monitors get notified first, in order to save an unnecessary
/// clone if there are no monitors.
pub fn notify_supervisor_and_monitors(&self, evt: SupervisionEvent) {
for monitor in self.monitors.iter() {
let _ = monitor.value().send_supervisor_evt(evt.clone_no_data());
}
if let Some(parent) = &*(self.supervisor.read().unwrap()) {
let _ = parent.send_supervisor_evt(evt);
}
Expand Down
106 changes: 106 additions & 0 deletions ractor/src/actor/tests/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,3 +1146,109 @@ async fn test_supervisor_double_link() {
ah.await.unwrap();
bh.await.unwrap();
}

#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn test_simple_monitor() {
struct Peer;
struct Monitor {
counter: Arc<AtomicU8>,
}

#[crate::async_trait]
impl Actor for Peer {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
_: Self::Msg,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
myself.stop(Some("oh no!".to_string()));
Ok(())
}
}

#[crate::async_trait]
impl Actor for Monitor {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle_supervisor_evt(
&self,
_: ActorRef<Self::Msg>,
evt: SupervisionEvent,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let SupervisionEvent::ActorTerminated(_who, _state, Some(msg)) = evt {
if msg.as_str() == "oh no!" {
self.counter.fetch_add(1, Ordering::Relaxed);
}
}
Ok(())
}
}

let count = Arc::new(AtomicU8::new(0));

let (p, ph) = Actor::spawn(None, Peer, ())
.await
.expect("Failed to start peer");
let (m, mh) = Actor::spawn(
None,
Monitor {
counter: count.clone(),
},
(),
)
.await
.expect("Faield to start monitor");

m.monitor(p.get_cell());

// stopping the peer should notify the monitor, who can capture the state
p.cast(()).expect("Failed to contact peer");
periodic_check(
|| count.load(Ordering::Relaxed) == 1,
Duration::from_secs(1),
)
.await;
ph.await.unwrap();

let (p, ph) = Actor::spawn(None, Peer, ())
.await
.expect("Failed to start peer");
m.monitor(p.get_cell());
m.unmonitor(p.get_cell());

p.cast(()).expect("Failed to contact peer");
ph.await.unwrap();

// The count doesn't increment when the peer exits (we give some time
// to schedule the supervision evt)
crate::concurrency::sleep(Duration::from_millis(100)).await;
assert_eq!(1, count.load(Ordering::Relaxed));

m.stop(None);
mh.await.unwrap();
}
2 changes: 1 addition & 1 deletion ractor/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub const ALL_GROUPS_NOTIFICATION: &str = "__world__";
#[cfg(test)]
mod tests;

/// Represents a change in group or scope membership
/// Represents a change in a process group's membership
#[derive(Clone)]
pub enum GroupChangeMessage {
/// Some actors joined a group
Expand Down
5 changes: 4 additions & 1 deletion ractor/src/registry/pid_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use once_cell::sync::OnceCell;

use crate::{ActorCell, ActorId, SupervisionEvent};

/// Represents a change in group or scope membership
/// Represents a change ocurring to some actor in the global process registry. Only relevant in
/// cluster enabled functionality.
///
/// It represents actors spawning and exiting, irrespective of procress groups.
#[derive(Clone)]
pub enum PidLifecycleEvent {
/// Some actors joined a group
Expand Down

0 comments on commit 7382cb2

Please sign in to comment.