Skip to content

Commit

Permalink
Make spawn builder harder to misuse
Browse files Browse the repository at this point in the history
  • Loading branch information
mbernat committed May 20, 2024
1 parent 6b31343 commit d958b5e
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 54 deletions.
4 changes: 2 additions & 2 deletions benches/pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::{Duration, Instant};
use tonari_actor::{Actor, Context, Event, Recipient, System};

#[derive(Debug, Clone)]
struct StringEvent(String);
struct StringEvent(());

impl Event for StringEvent {}

Expand Down Expand Up @@ -45,7 +45,7 @@ impl Actor for PublisherActor {
PublisherMessage::PublishEvents => {
let start = Instant::now();
for _i in 0..self.iterations {
context.system_handle.publish(StringEvent("hello".to_string()))?;
context.system_handle.publish(StringEvent(()))?;
}
let elapsed = start.elapsed();

Expand Down
2 changes: 1 addition & 1 deletion examples/actor_wrapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn main() -> Result<(), Error> {
let mut system = System::new("Actor Wrapping Example");

let actor = LoggingAdapter { inner: TestActor {} };
system.prepare(actor).run_and_block()?;
system.prepare(actor).with_default_capacity().run_and_block()?;

Ok(())
}
6 changes: 3 additions & 3 deletions examples/pub_sub_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ fn main() -> Result<(), Error> {
let mut system = System::new("Example PubSub System");

let publisher_actor = PublisherActor::new();
let _ = system.prepare(publisher_actor).spawn()?;
let _ = system.prepare(SubscriberActor1).spawn()?;
let _ = system.prepare(SubscriberActor2).spawn()?;
let _ = system.prepare(publisher_actor).with_default_capacity().spawn()?;
let _ = system.prepare(SubscriberActor1).with_default_capacity().spawn()?;
let _ = system.prepare(SubscriberActor2).with_default_capacity().spawn()?;

system.publish(StringEvent("Hello from the main thread!".to_string()))?;

Expand Down
2 changes: 1 addition & 1 deletion examples/simple_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn main() -> Result<(), Error> {
let mut system = System::new("Example Timer System");

let timer_actor = TimerExampleActor::new();
system.prepare(timer_actor).run_and_block()?;
system.prepare(timer_actor).with_default_capacity().run_and_block()?;

Ok(())
}
99 changes: 52 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,59 +319,68 @@ impl From<usize> for Capacity {
}
}

/// A builder for specifying how to spawn an [`Actor`].
/// You can specify your own [`Addr`] for the Actor,
/// the capacity of the Actor's inbox, and you can specify
/// whether to spawn the Actor into its own thread or block
/// on the current calling thread.
#[must_use = "You must call .spawn() or .block_on() to run this actor"]
pub struct SpawnBuilder<'a, A: Actor, F: FnOnce() -> A> {
/// A builder for configuring [`Actor`] spawning.
/// You can specify your own [`Addr`] for the Actor, or let the system create
/// a new address either provided or default capacity.
#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to configure this builder"]
pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> {
system: &'a mut System,
capacity: Capacity,
addr: Option<Addr<A>>,
factory: F,
}

impl<'a, A: 'static + Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
SpawnBuilder<'a, A, F>
SpawnBuilderWithoutAddress<'a, A, F>
{
/// Specify an existing [`Addr`] to use with this Actor.
pub fn with_addr(self, addr: Addr<A>) -> SpawnBuilderWithAddress<'a, A, F> {
SpawnBuilderWithAddress { spawn_builder: self, addr }
}

/// Specify a capacity for the actor's receiving channel. Accepts [`Capacity`] or [`usize`].
///
/// Ignored when `.with_addr()` is used at the same time.
pub fn with_capacity(self, capacity: impl Into<Capacity>) -> Self {
Self { capacity: capacity.into(), ..self }
pub fn with_capacity(self, capacity: impl Into<Capacity>) -> SpawnBuilderWithAddress<'a, A, F> {
let addr = Addr::with_capacity(capacity);
SpawnBuilderWithAddress { spawn_builder: self, addr }
}

/// Specify an existing [`Addr`] to use with this Actor.
pub fn with_addr(self, addr: Addr<A>) -> Self {
Self { addr: Some(addr), ..self }
// Use the default capacity for the actor's receiving channel.
pub fn with_default_capacity(self) -> SpawnBuilderWithAddress<'a, A, F> {
let addr = Addr::with_capacity(Capacity::default());
SpawnBuilderWithAddress { spawn_builder: self, addr }
}
}

#[must_use = "You must call .spawn() or .run_and_block() to create an actor"]
/// After having configured the builder with an address
/// it is possible to create and run the actor either on a new thread with `spawn()`
/// or on the current thread with `run_and_block()`.
pub struct SpawnBuilderWithAddress<'a, A: Actor, F: FnOnce() -> A> {
spawn_builder: SpawnBuilderWithoutAddress<'a, A, F>,
addr: Addr<A>,
}

impl<'a, A: 'static + Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
SpawnBuilderWithAddress<'a, A, F>
{
/// Run this Actor on the current calling thread. This is a
/// blocking call. This function will exit when the Actor
/// blocking call. This function will return when the Actor
/// has stopped.
pub fn run_and_block(self) -> Result<(), ActorError> {
let factory = self.factory;
let capacity = self.capacity;
let addr = self.addr.unwrap_or_else(|| Addr::with_capacity(capacity));

self.system.block_on(factory(), addr)
let factory = self.spawn_builder.factory;
self.spawn_builder.system.block_on(factory(), self.addr)
}
}

impl<
'a,
A: 'static + Actor<Context = Context<<A as Actor>::Message>>,
F: FnOnce() -> A + Send + 'static,
> SpawnBuilder<'a, A, F>
> SpawnBuilderWithAddress<'a, A, F>
{
/// Spawn this Actor into a new thread managed by the [`System`].
pub fn spawn(self) -> Result<Addr<A>, ActorError> {
let factory = self.factory;
let capacity = self.capacity;
let addr = self.addr.unwrap_or_else(|| Addr::with_capacity(capacity));

self.system.spawn_fn_with_addr(factory, addr.clone()).map(move |_| addr)
let builder = self.spawn_builder;
builder.system.spawn_fn_with_addr(builder.factory, self.addr.clone())?;
Ok(self.addr)
}
}

Expand All @@ -391,39 +400,36 @@ impl System {
}
}

/// Prepare an actor to be spawned. Returns a [`SpawnBuilder`]
/// which can be used to customize the spawning of the actor.
pub fn prepare<A>(&mut self, actor: A) -> SpawnBuilder<A, impl FnOnce() -> A>
/// Prepare an actor to be spawned. Returns a [`SpawnBuilderWithoutAddress`]
/// which has to be further configured before spawning the actor.
pub fn prepare<A>(&mut self, actor: A) -> SpawnBuilderWithoutAddress<A, impl FnOnce() -> A>
where
A: Actor + 'static,
{
SpawnBuilder {
system: self,
capacity: Default::default(),
addr: None,
factory: move || actor,
}
SpawnBuilderWithoutAddress { system: self, factory: move || actor }
}

/// Similar to `prepare`, but an actor factory is passed instead
/// of an [`Actor`] itself. This is used when an actor needs to be
/// created on its own thread instead of the calling thread.
/// Returns a [`SpawnBuilder`] which can be used to customize the
/// spawning of the actor.
pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilder<A, F>
/// Returns a [`SpawnBuilderWithoutAddress`] which has to be further
/// configured before spawning the actor.
pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilderWithoutAddress<A, F>
where
A: Actor + 'static,
F: FnOnce() -> A + Send + 'static,
{
SpawnBuilder { system: self, capacity: Default::default(), addr: None, factory }
SpawnBuilderWithoutAddress { system: self, factory }
}

/// Spawn a normal [`Actor`] in the system, returning its address when successful.
/// This address is created by the system and uses a default capacity.
/// If you need to customize the address see [`prepare`] or [`prepare_fn`] above.
pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A>, ActorError>
where
A: Actor<Context = Context<<A as Actor>::Message>> + Send + 'static,
{
self.prepare(actor).spawn()
self.prepare(actor).with_default_capacity().spawn()
}

/// Spawn a normal Actor in the system, using a factory that produces an [`Actor`],
Expand Down Expand Up @@ -1029,7 +1035,6 @@ impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
#[cfg(test)]
mod tests {
use std::{
rc::Rc,
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
Expand Down Expand Up @@ -1095,7 +1100,7 @@ mod tests {
#[test]
fn send_constraints() {
#[derive(Default)]
struct LocalActor(Rc<()>);
struct LocalActor(());
impl Actor for LocalActor {
type Context = Context<Self::Message>;
type Error = ();
Expand All @@ -1118,10 +1123,10 @@ mod tests {
let mut system = System::new("main");

// Allowable, as the struct will be created on the new thread.
let _ = system.prepare_fn(LocalActor::default).spawn().unwrap();
let _ = system.prepare_fn(LocalActor::default).with_default_capacity().spawn().unwrap();

// Allowable, as the struct will be run on the current thread.
system.prepare(LocalActor::default()).run_and_block().unwrap();
system.prepare(LocalActor::default()).with_default_capacity().run_and_block().unwrap();

system.shutdown().unwrap();
}
Expand Down

0 comments on commit d958b5e

Please sign in to comment.