Skip to content

Commit e9e0c89

Browse files
committed
fix(agentwire): fix infinite loop when no agents enabled
1 parent 5573093 commit e9e0c89

File tree

2 files changed

+123
-5
lines changed

2 files changed

+123
-5
lines changed

agentwire/macros/src/broker.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
187187
let handler = format_ident!("handle_{}", ident);
188188
quote! {
189189
if let Some(port) = fut.broker.#ident.enabled() {
190+
any_handler_enabled |= true;
190191
loop {
191192
match ::futures::StreamExt::poll_next_unpin(port, cx) {
193+
// check if message is newer than fence
192194
::std::task::Poll::Ready(Some(output)) if output.source_ts > fence => {
193195
match fut.broker.#handler(fut.plan, output) {
194196
::std::result::Result::Ok(::agentwire::BrokerFlow::Break) => {
@@ -210,9 +212,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
210212
}
211213
}
212214
::std::task::Poll::Ready(::std::option::Option::Some(_)) => {
213-
continue;
215+
continue; // skip message because its older than `fence`
214216
}
215217
::std::task::Poll::Ready(::std::option::Option::None) => {
218+
// channel sender is dropped, which means agent terminated
216219
return ::std::task::Poll::Ready(
217220
::std::result::Result::Err(
218221
::agentwire::BrokerError::AgentTerminated(
@@ -222,7 +225,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
222225
);
223226
}
224227
::std::task::Poll::Pending => {
225-
break;
228+
break; // No more messages to process
226229
}
227230
}
228231
}
@@ -249,7 +252,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
249252
},
250253
);
251254
let run = quote! {
252-
#[allow(missing_docs)]
255+
/// Future for [`#ident::run`].
253256
pub struct #run_fut_name<'a> {
254257
broker: &'a mut #ident,
255258
plan: &'a mut dyn #broker_plan,
@@ -265,20 +268,32 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
265268
) -> ::std::task::Poll<Self::Output> {
266269
let fence = self.fence;
267270
let fut = self.as_mut().get_mut();
271+
let mut any_handler_enabled = false;
268272
'outer: loop {
269273
#(#run_handlers)*
270274
#poll_extra
275+
#[allow(unreachable_code)]
276+
if !any_handler_enabled {
277+
// Prevent infinite loop in edge case where no handlers are
278+
// enabled.
279+
return ::std::task::Poll::Pending;
280+
}
271281
}
282+
272283
}
273284
}
274285

275286
impl #ident {
276-
#[allow(missing_docs)]
287+
/// Equivalent to [`Self::run_with_fence()`] with a fence of `Instant::now()`.
277288
pub fn run<'a>(&'a mut self, plan: &'a mut dyn #broker_plan) -> #run_fut_name<'a> {
278289
Self::run_with_fence(self, plan, ::std::time::Instant::now())
279290
}
280291

281-
#[allow(missing_docs)]
292+
/// Runs the broker, filtering any events to only those with a timestamp
293+
/// newer than `fence`.
294+
///
295+
/// Events are fed the broker's `handle_*` functions, and `plan` is passed
296+
/// there as an argument.
282297
pub fn run_with_fence<'a>(
283298
&'a mut self,
284299
plan: &'a mut dyn #broker_plan,

agentwire/tests/broker.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use agentwire::{
2+
agent::{Cell, Task},
3+
port::{self, Port},
4+
Agent, Broker, BrokerFlow,
5+
};
6+
use futures::FutureExt;
7+
8+
#[derive(Debug, thiserror::Error)]
9+
#[error("dummy error")]
10+
pub struct Error;
11+
12+
struct DummyAgent;
13+
14+
impl Port for DummyAgent {
15+
type Input = ();
16+
type Output = ();
17+
18+
const INPUT_CAPACITY: usize = 0;
19+
const OUTPUT_CAPACITY: usize = 0;
20+
}
21+
22+
impl Agent for DummyAgent {
23+
const NAME: &'static str = "dummy";
24+
}
25+
26+
impl Task for DummyAgent {
27+
type Error = Error;
28+
29+
async fn run(self, _port: agentwire::port::Inner<Self>) -> Result<(), Self::Error> {
30+
std::future::pending().await
31+
}
32+
}
33+
34+
#[derive(Broker)]
35+
#[broker(plan = PlanT, error = Error)]
36+
struct NoAgents {}
37+
38+
impl NoAgents {
39+
fn new() -> Self {
40+
new_no_agents!()
41+
}
42+
}
43+
44+
#[derive(Broker)]
45+
#[broker(plan = PlanT, error = Error)]
46+
struct OneAgent {
47+
#[agent(task, init)]
48+
dummy: Cell<DummyAgent>,
49+
}
50+
51+
impl OneAgent {
52+
fn new() -> Self {
53+
new_one_agent!()
54+
}
55+
56+
#[expect(dead_code, reason = "agent never enabled")]
57+
fn init_dummy(&mut self) -> DummyAgent {
58+
DummyAgent
59+
}
60+
61+
fn handle_dummy(
62+
&mut self,
63+
_plan: &mut dyn PlanT,
64+
_output: port::Output<DummyAgent>,
65+
) -> Result<BrokerFlow, Error> {
66+
unreachable!("agent never enabled")
67+
}
68+
}
69+
70+
trait PlanT {}
71+
72+
#[derive(Debug)]
73+
struct Plan;
74+
75+
impl PlanT for Plan {}
76+
77+
#[test]
78+
fn test_broker_with_no_agents_never_blocks() {
79+
let waker = futures::task::noop_waker();
80+
let mut cx = std::task::Context::from_waker(&waker);
81+
let mut plan = Plan;
82+
83+
let mut no_agents = NoAgents::new();
84+
let mut run_fut = no_agents.run(&mut plan);
85+
// poll should always immediately return, instead of looping forever.
86+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
87+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
88+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
89+
}
90+
91+
#[test]
92+
fn test_broker_with_one_agent_never_blocks() {
93+
let waker = futures::task::noop_waker();
94+
let mut cx = std::task::Context::from_waker(&waker);
95+
let mut plan = Plan;
96+
97+
let mut one_agent = OneAgent::new();
98+
let mut run_fut = one_agent.run(&mut plan);
99+
// poll should always immediately return, instead of looping forever.
100+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
101+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
102+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
103+
}

0 commit comments

Comments
 (0)