From d4622aea152445aa02a27b43c2f6f25fbbfadd48 Mon Sep 17 00:00:00 2001 From: Jen Tak <goodhoko@gmail.com> Date: Tue, 6 Aug 2024 19:48:41 +0200 Subject: [PATCH] Cache last published event let subscribers receive it upon subscription of they wish --- src/lib.rs | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1d24d1d..726ee00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -226,11 +226,12 @@ enum SystemState { /// A marker trait for types which participate in the publish-subscribe system /// of the actor framework. -pub trait Event: Clone + std::any::Any + 'static {} +pub trait Event: Clone + std::any::Any + 'static + Send + Sync {} #[derive(Default)] struct EventSubscribers { events: HashMap<TypeId, Vec<EventCallback>>, + last_value_cache: HashMap<TypeId, Arc<dyn std::any::Any + Send + Sync>>, } /// Contains the "metadata" of the system, including information about the registry @@ -290,6 +291,13 @@ impl<M> Context<M> { { self.system_handle.subscribe_recipient::<M, E>(self.myself.clone()); } + + pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError> + where + M: 'static, + { + self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone()) + } } /// Capacity of actor's normal- and high-priority inboxes. @@ -723,13 +731,36 @@ impl SystemHandle { })); } + /// Subscribe given `recipient` to events of type `E`. See [`Context::subscribe()`]. + pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>( + &self, + recipient: Recipient<M>, + ) -> Result<(), SendError> { + // Send the last cached value (if there is one) + if let Some(last_cached_value) = + self.event_subscribers.read().last_value_cache.get(&TypeId::of::<E>()) + { + if let Some(msg) = last_cached_value.downcast_ref::<E>() { + recipient.send(msg.clone().into())?; + } + } + + self.subscribe_recipient::<M, E>(recipient); + + Ok(()) + } + /// Publish an event. All actors that have previously subscribed to the type will receive it. /// /// When sending to some subscriber fails, others are still tried and vec of errors is returned. /// For direct, non-[`Clone`] or high-throughput messages please use [`Addr`] or [`Recipient`]. pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> { - let event_subscribers = self.event_subscribers.read(); - if let Some(subs) = event_subscribers.events.get(&TypeId::of::<E>()) { + let mut event_subscribers = self.event_subscribers.write(); + let type_id = TypeId::of::<E>(); + + event_subscribers.last_value_cache.insert(type_id, Arc::new(event.clone())); + + if let Some(subs) = event_subscribers.events.get(&type_id) { let errors: Vec<SendError> = subs .iter() .filter_map(|subscriber_callback| subscriber_callback(&event).err()) @@ -1287,4 +1318,46 @@ mod tests { [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ); } + + #[test] + fn last_cached_event() { + impl Event for () {} + + struct Subscriber; + impl Actor for Subscriber { + type Message = (); + type Context = Context<Self::Message>; + type Error = (); + + fn started(&mut self, context: &mut Self::Context) { + context + .subscribe_and_receive_latest::<Self::Message>() + .expect("can receive last cached value"); + } + + fn handle( + &mut self, + context: &mut Self::Context, + _: Self::Message, + ) -> Result<(), Self::Error> { + println!("Event received!"); + context.system_handle.shutdown().unwrap(); + Ok(()) + } + + fn name() -> &'static str { + "recipient" + } + } + + let mut system = System::new("last cached event"); + system.publish(()).expect("can publish event"); + + // This test will block indefinitely if the event isn't delivered. + system + .prepare(Subscriber) + .with_addr(Addr::with_capacity(1)) + .run_and_block() + .expect("actor finishes successfully"); + } }