Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions agentwire/macros/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
let handler = format_ident!("handle_{}", ident);
quote! {
if let Some(port) = fut.broker.#ident.enabled() {
any_handler_enabled |= true;
loop {
match ::futures::StreamExt::poll_next_unpin(port, cx) {
// check if message is newer than fence
::std::task::Poll::Ready(Some(output)) if output.source_ts > fence => {
match fut.broker.#handler(fut.plan, output) {
::std::result::Result::Ok(::agentwire::BrokerFlow::Break) => {
Expand All @@ -210,9 +212,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
}
}
::std::task::Poll::Ready(::std::option::Option::Some(_)) => {
continue;
continue; // skip message because its older than `fence`
}
::std::task::Poll::Ready(::std::option::Option::None) => {
// channel sender is dropped, which means agent terminated
return ::std::task::Poll::Ready(
::std::result::Result::Err(
::agentwire::BrokerError::AgentTerminated(
Expand All @@ -222,7 +225,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
);
}
::std::task::Poll::Pending => {
break;
break; // No more messages to process
}
}
}
Expand All @@ -249,7 +252,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
},
);
let run = quote! {
#[allow(missing_docs)]
/// Future for [`#ident::run`].
pub struct #run_fut_name<'a> {
broker: &'a mut #ident,
plan: &'a mut dyn #broker_plan,
Expand All @@ -265,20 +268,32 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
) -> ::std::task::Poll<Self::Output> {
let fence = self.fence;
let fut = self.as_mut().get_mut();
let mut any_handler_enabled = false;
'outer: loop {
#(#run_handlers)*
#poll_extra
#[allow(unreachable_code)]
if !any_handler_enabled {
// Prevent infinite loop in edge case where no handlers are
// enabled.
return ::std::task::Poll::Pending;
}
}

}
}

impl #ident {
#[allow(missing_docs)]
/// Equivalent to [`Self::run_with_fence()`] with a fence of `Instant::now()`.
pub fn run<'a>(&'a mut self, plan: &'a mut dyn #broker_plan) -> #run_fut_name<'a> {
Self::run_with_fence(self, plan, ::std::time::Instant::now())
}

#[allow(missing_docs)]
/// Runs the broker, filtering any events to only those with a timestamp
/// newer than `fence`.
///
/// Events are fed the broker's `handle_*` functions, and `plan` is passed
/// there as an argument.
pub fn run_with_fence<'a>(
&'a mut self,
plan: &'a mut dyn #broker_plan,
Expand Down
103 changes: 103 additions & 0 deletions agentwire/tests/broker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use agentwire::{
agent::{Cell, Task},
port::{self, Port},
Agent, Broker, BrokerFlow,
};
use futures::FutureExt;

#[derive(Debug, thiserror::Error)]
#[error("dummy error")]
pub struct Error;

struct DummyAgent;

impl Port for DummyAgent {
type Input = ();
type Output = ();

const INPUT_CAPACITY: usize = 0;
const OUTPUT_CAPACITY: usize = 0;
}

impl Agent for DummyAgent {
const NAME: &'static str = "dummy";
}

impl Task for DummyAgent {
type Error = Error;

async fn run(self, _port: agentwire::port::Inner<Self>) -> Result<(), Self::Error> {
std::future::pending().await
}
}

#[derive(Broker)]
#[broker(plan = PlanT, error = Error)]
struct NoAgents {}

impl NoAgents {
fn new() -> Self {
new_no_agents!()
}
}

#[derive(Broker)]
#[broker(plan = PlanT, error = Error)]
struct OneAgent {
#[agent(task, init)]
dummy: Cell<DummyAgent>,
}

impl OneAgent {
fn new() -> Self {
new_one_agent!()
}

#[expect(dead_code, reason = "agent never enabled")]
fn init_dummy(&mut self) -> DummyAgent {
DummyAgent
}

fn handle_dummy(
&mut self,
_plan: &mut dyn PlanT,
_output: port::Output<DummyAgent>,
) -> Result<BrokerFlow, Error> {
unreachable!("agent never enabled")
}
}

trait PlanT {}

#[derive(Debug)]
struct Plan;

impl PlanT for Plan {}

#[test]
fn test_broker_with_no_agents_never_blocks() {
let waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
let mut plan = Plan;

let mut no_agents = NoAgents::new();
let mut run_fut = no_agents.run(&mut plan);
// poll should always immediately return, instead of looping forever.
assert!(run_fut.poll_unpin(&mut cx).is_pending());
Copy link
Collaborator Author

@TheButlah TheButlah Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prior to this PR, the code would have blocked infinitely here.

assert!(run_fut.poll_unpin(&mut cx).is_pending());
assert!(run_fut.poll_unpin(&mut cx).is_pending());
}

#[test]
fn test_broker_with_one_agent_never_blocks() {
let waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
let mut plan = Plan;

let mut one_agent = OneAgent::new();
let mut run_fut = one_agent.run(&mut plan);
// poll should always immediately return, instead of looping forever.
assert!(run_fut.poll_unpin(&mut cx).is_pending());
assert!(run_fut.poll_unpin(&mut cx).is_pending());
assert!(run_fut.poll_unpin(&mut cx).is_pending());
}
Loading