diff --git a/agents/s08_background_tasks.py b/agents/s08_background_tasks.py index 77a992eaf..03881693c 100644 --- a/agents/s08_background_tasks.py +++ b/agents/s08_background_tasks.py @@ -51,6 +51,7 @@ def __init__(self): self.tasks = {} # task_id -> {status, result, command} self._notification_queue = [] # completed task results self._lock = threading.Lock() + self._condition = threading.Condition(self._lock) def run(self, command: str) -> str: """Start a background thread, return task_id immediately.""" @@ -79,13 +80,14 @@ def _execute(self, task_id: str, command: str): status = "error" self.tasks[task_id]["status"] = status self.tasks[task_id]["result"] = output or "(no output)" - with self._lock: + with self._condition: self._notification_queue.append({ "task_id": task_id, "status": status, "command": command[:80], "result": (output or "(no output)")[:500], }) + self._condition.notify_all() def check(self, task_id: str = None) -> str: """Check status of one task or list all.""" @@ -101,7 +103,23 @@ def check(self, task_id: str = None) -> str: def drain_notifications(self) -> list: """Return and clear all pending completion notifications.""" - with self._lock: + with self._condition: + notifs = list(self._notification_queue) + self._notification_queue.clear() + return notifs + + def _has_running_tasks_locked(self) -> bool: + return any(task["status"] == "running" for task in self.tasks.values()) + + def has_running_tasks(self) -> bool: + with self._condition: + return self._has_running_tasks_locked() + + def wait_for_notifications(self) -> list: + """Block until at least one background notification is available or all tasks are done.""" + with self._condition: + while not self._notification_queue and self._has_running_tasks_locked(): + self._condition.wait() notifs = list(self._notification_queue) self._notification_queue.clear() return notifs @@ -110,6 +128,17 @@ def drain_notifications(self) -> list: BG = BackgroundManager() +def inject_background_results(messages: list, notifs: list) -> bool: + if notifs and messages: + notif_text = "\n".join( + f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs + ) + messages.append({"role": "user", "content": f"\n{notif_text}\n"}) + messages.append({"role": "assistant", "content": "Noted background results."}) + return True + return False + + # -- Tool implementations -- def safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() @@ -187,19 +216,15 @@ def run_edit(path: str, old_text: str, new_text: str) -> str: def agent_loop(messages: list): while True: # Drain background notifications and inject as system message before LLM call - notifs = BG.drain_notifications() - if notifs and messages: - notif_text = "\n".join( - f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs - ) - messages.append({"role": "user", "content": f"\n{notif_text}\n"}) - messages.append({"role": "assistant", "content": "Noted background results."}) + inject_background_results(messages, BG.drain_notifications()) response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": + if BG.has_running_tasks() and inject_background_results(messages, BG.wait_for_notifications()): + continue return results = [] for block in response.content: diff --git a/agents/s_full.py b/agents/s_full.py index d4dcfd3c6..72e96ff3e 100644 --- a/agents/s_full.py +++ b/agents/s_full.py @@ -359,6 +359,16 @@ def drain(self) -> list: notifs.append(self.notifications.get_nowait()) return notifs + def has_running_tasks(self) -> bool: + return any(task["status"] == "running" for task in self.tasks.values()) + + def wait_for_notifications(self) -> list: + first = self.notifications.get() + notifs = [first] + while not self.notifications.empty(): + notifs.append(self.notifications.get_nowait()) + return notifs + # === SECTION: messaging (s09) === class MessageBus: @@ -651,6 +661,15 @@ def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> st # === SECTION: agent_loop === +def inject_background_results(messages: list, notifs: list) -> bool: + if notifs: + txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs) + messages.append({"role": "user", "content": f"\n{txt}\n"}) + messages.append({"role": "assistant", "content": "Noted background results."}) + return True + return False + + def agent_loop(messages: list): rounds_without_todo = 0 while True: @@ -660,11 +679,7 @@ def agent_loop(messages: list): print("[auto-compact triggered]") messages[:] = auto_compact(messages) # s08: drain background notifications - notifs = BG.drain() - if notifs: - txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs) - messages.append({"role": "user", "content": f"\n{txt}\n"}) - messages.append({"role": "assistant", "content": "Noted background results."}) + inject_background_results(messages, BG.drain()) # s10: check lead inbox inbox = BUS.read_inbox("lead") if inbox: @@ -677,6 +692,8 @@ def agent_loop(messages: list): ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": + if BG.has_running_tasks() and inject_background_results(messages, BG.wait_for_notifications()): + continue return # Tool execution results = [] diff --git a/tests/test_background_notifications.py b/tests/test_background_notifications.py new file mode 100644 index 000000000..1a763f908 --- /dev/null +++ b/tests/test_background_notifications.py @@ -0,0 +1,135 @@ +import os +import sys +import types +import unittest +from pathlib import Path +from types import SimpleNamespace + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +os.environ.setdefault("MODEL_ID", "test-model") + +fake_anthropic = types.ModuleType("anthropic") + + +class FakeAnthropic: + def __init__(self, *args, **kwargs): + self.messages = SimpleNamespace(create=None) + + +setattr(fake_anthropic, "Anthropic", FakeAnthropic) +sys.modules.setdefault("anthropic", fake_anthropic) + +fake_dotenv = types.ModuleType("dotenv") +setattr(fake_dotenv, "load_dotenv", lambda *args, **kwargs: None) +sys.modules.setdefault("dotenv", fake_dotenv) + +import agents.s08_background_tasks as s08_background_tasks +import agents.s_full as s_full + + +class FakeMessagesAPI: + def __init__(self, responses): + self._responses = iter(responses) + self.call_count = 0 + + def create(self, **kwargs): + self.call_count += 1 + return next(self._responses) + + +class FakeBackgroundManager: + def __init__(self): + self._running = True + self.wait_called = False + + def drain_notifications(self): + return [] + + def drain(self): + return [] + + def has_running_tasks(self): + return self._running + + def wait_for_notifications(self): + self.wait_called = True + self._running = False + return [{"task_id": "bg-1", "status": "completed", "result": "done"}] + + +class BackgroundNotificationTests(unittest.TestCase): + def test_s08_agent_loop_waits_for_background_results_after_end_turn(self): + messages = [{"role": "user", "content": "Run tests in the background"}] + fake_bg = FakeBackgroundManager() + fake_api = FakeMessagesAPI( + [ + SimpleNamespace( + stop_reason="end_turn", content="Started background work." + ), + SimpleNamespace( + stop_reason="end_turn", content="Background work completed." + ), + ] + ) + original_bg = s08_background_tasks.BG + original_client = s08_background_tasks.client + try: + s08_background_tasks.BG = fake_bg + s08_background_tasks.client = SimpleNamespace(messages=fake_api) + s08_background_tasks.agent_loop(messages) + finally: + s08_background_tasks.BG = original_bg + s08_background_tasks.client = original_client + + self.assertTrue(fake_bg.wait_called) + self.assertEqual(fake_api.call_count, 2) + self.assertTrue( + any( + message["role"] == "user" + and isinstance(message["content"], str) + and "" in message["content"] + for message in messages + ) + ) + + def test_s_full_agent_loop_waits_for_background_results_after_end_turn(self): + messages = [{"role": "user", "content": "Run tests in the background"}] + fake_bg = FakeBackgroundManager() + fake_api = FakeMessagesAPI( + [ + SimpleNamespace( + stop_reason="end_turn", content="Started background work." + ), + SimpleNamespace( + stop_reason="end_turn", content="Background work completed." + ), + ] + ) + original_bg = s_full.BG + original_client = s_full.client + try: + s_full.BG = fake_bg + s_full.client = SimpleNamespace(messages=fake_api) + s_full.agent_loop(messages) + finally: + s_full.BG = original_bg + s_full.client = original_client + + self.assertTrue(fake_bg.wait_called) + self.assertEqual(fake_api.call_count, 2) + self.assertTrue( + any( + message["role"] == "user" + and isinstance(message["content"], str) + and "" in message["content"] + for message in messages + ) + ) + + +if __name__ == "__main__": + unittest.main()