Skip to content

Commit

Permalink
Add proper span handling to factories through extension trait
Browse files Browse the repository at this point in the history
  • Loading branch information
slawlor committed Dec 17, 2024
1 parent 6a08fe2 commit 8cf16ae
Show file tree
Hide file tree
Showing 8 changed files with 528 additions and 98 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.13.5"
version = "0.14.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
16 changes: 14 additions & 2 deletions ractor/src/factory/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;
use std::{hash::Hash, time::SystemTime};

use bon::Builder;
use tracing::Span;

use crate::{concurrency::Duration, Message};
use crate::{ActorRef, RpcReplyPort};
Expand Down Expand Up @@ -40,7 +41,7 @@ pub trait JobKey: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + 'static
impl<T: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + 'static> JobKey for T {}

/// Represents options for the specified job
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, PartialEq, Clone)]
pub struct JobOptions {
/// Time job was submitted from the client
pub submit_time: SystemTime,
Expand All @@ -50,6 +51,9 @@ pub struct JobOptions {
pub worker_time: SystemTime,
/// Time-to-live for the job
pub ttl: Option<Duration>,
/// The parent span we want to propagate to the worker.
/// Spans don't propagate over the wire in networks
pub span: Option<Span>,
}

impl Default for JobOptions {
Expand All @@ -59,6 +63,10 @@ impl Default for JobOptions {
factory_time: SystemTime::now(),
worker_time: SystemTime::now(),
ttl: None,
#[cfg(feature = "message_span_propogation")]
span: Some(Span::current()),
#[cfg(not(feature = "message_span_propogation"))]
span: None,
}
}
}
Expand Down Expand Up @@ -86,7 +94,10 @@ impl BytesConvertable for JobOptions {

fn from_bytes(mut data: Vec<u8>) -> Self {
if data.len() != 16 {
Self::default()
Self {
span: None,
..Default::default()
}

Check warning on line 100 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L97-L100

Added lines #L97 - L100 were not covered by tests
} else {
let ttl_bytes = data.split_off(8);

Expand All @@ -100,6 +111,7 @@ impl BytesConvertable for JobOptions {
} else {
None
},
span: None,
..Default::default()
}
}
Expand Down
57 changes: 21 additions & 36 deletions ractor/src/factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,53 +66,38 @@
//! /// the business logic for each message that will be done in parallel.
//! struct ExampleWorker;
//! #[cfg_attr(feature = "async-trait", ractor::async_trait)]
//! impl Actor for ExampleWorker {
//! type Msg = WorkerMessage<(), ExampleMessage>;
//! type State = WorkerStartContext<(), ExampleMessage, ()>;
//! type Arguments = WorkerStartContext<(), ExampleMessage, ()>;
//! impl Worker for ExampleWorker {
//! type Key = ();
//! type Message = ExampleMessage;
//! type State = ();
//! type Arguments = ();
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! wid: WorkerId,
//! factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
//! startup_context: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! Ok(startup_context)
//! }
//! async fn handle(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! message: Self::Msg,
//! state: &mut Self::State,
//! wid: WorkerId,
//! factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
//! Job {msg, key, ..}: Job<(), ExampleMessage>,
//! _state: &mut Self::State,
//! ) -> Result<(), ActorProcessingErr> {
//! match message {
//! WorkerMessage::FactoryPing(time) => {
//! // This is a message which all factory workers **must**
//! // adhere to. It is a background processing message from the
//! // factory which is used for (a) metrics and (b) detecting
//! // stuck workers, i.e. workers which aren't making progress
//! // processing their messages
//! state
//! .factory
//! .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
//! // Actual business logic that we want to parallelize
//! tracing::trace!("Worker {} received {:?}", wid, msg);
//! match msg {
//! ExampleMessage::PrintValue(value) => {
//! tracing::info!("Worker {} printing value {value}", wid);
//! }
//! WorkerMessage::Dispatch(job) => {
//! // Actual business logic that we want to parallelize
//! tracing::trace!("Worker {} received {:?}", state.wid, job.msg);
//! match job.msg {
//! ExampleMessage::PrintValue(value) => {
//! tracing::info!("Worker {} printing value {value}", state.wid);
//! }
//! ExampleMessage::EchoValue(value, reply) => {
//! tracing::info!("Worker {} echoing value {value}", state.wid);
//! let _ = reply.send(value);
//! }
//! }
//! // job finished, on success or err we report back to the factory
//! state
//! .factory
//! .cast(FactoryMessage::Finished(state.wid, job.key))?;
//! ExampleMessage::EchoValue(value, reply) => {
//! tracing::info!("Worker {} echoing value {value}", wid);
//! let _ = reply.send(value);
//! }
//! }
//! Ok(())
//! Ok(key)
//! }
//! }
//! /// Used by the factory to build new [ExampleWorker]s.
Expand Down Expand Up @@ -200,7 +185,7 @@ pub use factoryimpl::{Factory, FactoryArguments, FactoryArgumentsBuilder};
pub use job::{Job, JobKey, JobOptions, MessageRetryStrategy, RetriableMessage};
pub use lifecycle::FactoryLifecycleHooks;
pub use worker::{
DeadMansSwitchConfiguration, WorkerBuilder, WorkerCapacityController, WorkerMessage,
DeadMansSwitchConfiguration, Worker, WorkerBuilder, WorkerCapacityController, WorkerMessage,
WorkerProperties, WorkerStartContext,
};

Expand Down
46 changes: 18 additions & 28 deletions ractor/src/factory/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,37 @@ struct TestWorker {
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestWorker {
type Msg = WorkerMessage<TestKey, TestMessage>;
type State = Self::Arguments;
type Arguments = WorkerStartContext<TestKey, TestMessage, ()>;
impl Worker for TestWorker {
type Key = TestKey;
type Message = TestMessage;
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<TestKey, TestMessage>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
tracing::debug!("Worker received {:?}", job.msg);
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<TestKey, TestMessage>>,
Job { msg, key, .. }: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<TestKey, ActorProcessingErr> {
tracing::debug!("Worker received {:?}", msg);

self.counter.fetch_add(1, Ordering::Relaxed);

if let Some(timeout_ms) = self.slow {
crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await;
}
self.counter.fetch_add(1, Ordering::Relaxed);

// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
if let Some(timeout_ms) = self.slow {
crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await;
}
Ok(())

Ok(key)
}
}

Expand Down
41 changes: 15 additions & 26 deletions ractor/src/factory/tests/dynamic_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,32 @@ struct TestWorker {
impl crate::Message for TestMessage {}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestWorker {
type Msg = WorkerMessage<TestKey, TestMessage>;
impl Worker for TestWorker {
type Key = TestKey;
type Message = TestMessage;
type State = Self::Arguments;
type Arguments = WorkerStartContext<TestKey, TestMessage, ()>;
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
tracing::debug!("Worker received {:?}", job.msg);

self.id_map.insert(state.wid);

// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
}
Ok(())
wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
Job { msg, key, .. }: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<Self::Key, ActorProcessingErr> {
tracing::debug!("Worker received {:?}", msg);

self.id_map.insert(wid);
Ok(key)
}
}

Expand Down
Loading

0 comments on commit 8cf16ae

Please sign in to comment.