Skip to content

Commit 21e6d16

Browse files
committed
fix(agentwire): fix infinite loop when no agents enabled
1 parent 69197c7 commit 21e6d16

File tree

2 files changed

+121
-5
lines changed

2 files changed

+121
-5
lines changed

agentwire/macros/src/broker.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
171171
let handler = format_ident!("handle_{}", ident);
172172
quote! {
173173
if let Some(port) = fut.broker.#ident.enabled() {
174+
any_handler_enabled |= true;
174175
loop {
175176
match ::futures::StreamExt::poll_next_unpin(port, cx) {
177+
// check if message is newer than fence
176178
::std::task::Poll::Ready(Some(output)) if output.source_ts > fence => {
177179
match fut.broker.#handler(fut.plan, output) {
178180
::std::result::Result::Ok(::agentwire::BrokerFlow::Break) => {
@@ -194,9 +196,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
194196
}
195197
}
196198
::std::task::Poll::Ready(::std::option::Option::Some(_)) => {
197-
continue;
199+
continue; // skip message because its older than `fence`
198200
}
199201
::std::task::Poll::Ready(::std::option::Option::None) => {
202+
// channel sender is dropped, which means agent terminated
200203
return ::std::task::Poll::Ready(
201204
::std::result::Result::Err(
202205
::agentwire::BrokerError::AgentTerminated(
@@ -206,7 +209,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
206209
);
207210
}
208211
::std::task::Poll::Pending => {
209-
break;
212+
break; // No more messages to process
210213
}
211214
}
212215
}
@@ -231,7 +234,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
231234
}
232235
});
233236
let run = quote! {
234-
#[allow(missing_docs)]
237+
/// Future for [`#ident::run`].
235238
pub struct #run_fut_name<'a> {
236239
broker: &'a mut #ident,
237240
plan: &'a mut dyn #broker_plan,
@@ -247,20 +250,31 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream {
247250
) -> ::std::task::Poll<Self::Output> {
248251
let fence = self.fence;
249252
let fut = self.as_mut().get_mut();
253+
let mut any_handler_enabled = false;
250254
'outer: loop {
251255
#(#run_handlers)*
252256
#poll_extra
257+
if !any_handler_enabled {
258+
// Prevent infinite loop in edge case where no handlers are
259+
// enabled.
260+
return ::std::task::Poll::Pending;
261+
}
253262
}
263+
254264
}
255265
}
256266

257267
impl #ident {
258-
#[allow(missing_docs)]
268+
/// Equivalent to [`Self::run_with_fence()`] with a fence of `Instant::now()`.
259269
pub fn run<'a>(&'a mut self, plan: &'a mut dyn #broker_plan) -> #run_fut_name<'a> {
260270
Self::run_with_fence(self, plan, ::std::time::Instant::now())
261271
}
262272

263-
#[allow(missing_docs)]
273+
/// Runs the broker, filtering any events to only those with a timestamp
274+
/// newer than `fence`.
275+
///
276+
/// Events are fed the broker's `handle_*` functions, and `plan` is passed
277+
/// there as an argument.
264278
pub fn run_with_fence<'a>(
265279
&'a mut self,
266280
plan: &'a mut dyn #broker_plan,

agentwire/tests/broker.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use agentwire::{
2+
agent::{Cell, Task},
3+
port::{self, Port},
4+
Agent, Broker, BrokerFlow,
5+
};
6+
use futures::FutureExt;
7+
8+
#[derive(Debug, thiserror::Error)]
9+
#[error("dummy error")]
10+
pub struct Error;
11+
12+
struct DummyAgent;
13+
14+
impl Port for DummyAgent {
15+
type Input = ();
16+
type Output = ();
17+
18+
const INPUT_CAPACITY: usize = 0;
19+
const OUTPUT_CAPACITY: usize = 0;
20+
}
21+
22+
impl Agent for DummyAgent {
23+
const NAME: &'static str = "dummy";
24+
}
25+
26+
impl Task for DummyAgent {
27+
type Error = Error;
28+
29+
async fn run(self, _port: agentwire::port::Inner<Self>) -> Result<(), Self::Error> {
30+
std::future::pending().await
31+
}
32+
}
33+
34+
#[derive(Broker)]
35+
#[broker(plan = PlanT, error = Error)]
36+
struct NoAgents {}
37+
38+
impl NoAgents {
39+
fn new() -> Self {
40+
new_no_agents!()
41+
}
42+
}
43+
44+
#[derive(Broker)]
45+
#[broker(plan = PlanT, error = Error)]
46+
struct OneAgent {
47+
#[agent(task, init)]
48+
dummy: Cell<DummyAgent>,
49+
}
50+
51+
impl OneAgent {
52+
fn new() -> Self {
53+
new_one_agent!()
54+
}
55+
56+
#[expect(dead_code, reason = "agent never enabled")]
57+
fn init_dummy(&mut self) -> DummyAgent {
58+
DummyAgent
59+
}
60+
61+
fn handle_dummy(
62+
&mut self,
63+
_plan: &mut dyn PlanT,
64+
_output: port::Output<DummyAgent>,
65+
) -> Result<BrokerFlow, Error> {
66+
unreachable!("agent never enabled")
67+
}
68+
}
69+
trait PlanT {}
70+
71+
#[derive(Debug)]
72+
struct Plan;
73+
74+
impl PlanT for Plan {}
75+
76+
#[test]
77+
fn test_broker_with_no_agents_never_blocks() {
78+
let waker = futures::task::noop_waker();
79+
let mut cx = std::task::Context::from_waker(&waker);
80+
let mut plan = Plan;
81+
82+
let mut no_agents = NoAgents::new();
83+
let mut run_fut = no_agents.run(&mut plan);
84+
// poll should always immediately return, instead of looping forever.
85+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
86+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
87+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
88+
}
89+
90+
#[test]
91+
fn test_broker_with_one_agent_never_blocks() {
92+
let waker = futures::task::noop_waker();
93+
let mut cx = std::task::Context::from_waker(&waker);
94+
let mut plan = Plan;
95+
96+
let mut one_agent = OneAgent::new();
97+
let mut run_fut = one_agent.run(&mut plan);
98+
// poll should always immediately return, instead of looping forever.
99+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
100+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
101+
assert!(run_fut.poll_unpin(&mut cx).is_pending());
102+
}

0 commit comments

Comments
 (0)