Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions agents/s08_background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self):
self.tasks = {} # task_id -> {status, result, command}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)

def run(self, command: str) -> str:
"""Start a background thread, return task_id immediately."""
Expand Down Expand Up @@ -79,13 +80,14 @@ def _execute(self, task_id: str, command: str):
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._lock:
with self._condition:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
})
self._condition.notify_all()

def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
Expand All @@ -101,7 +103,23 @@ def check(self, task_id: str = None) -> str:

def drain_notifications(self) -> list:
"""Return and clear all pending completion notifications."""
with self._lock:
with self._condition:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs

def _has_running_tasks_locked(self) -> bool:
return any(task["status"] == "running" for task in self.tasks.values())

def has_running_tasks(self) -> bool:
with self._condition:
return self._has_running_tasks_locked()

def wait_for_notifications(self) -> list:
"""Block until at least one background notification is available or all tasks are done."""
with self._condition:
while not self._notification_queue and self._has_running_tasks_locked():
self._condition.wait()
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
Expand All @@ -110,6 +128,17 @@ def drain_notifications(self) -> list:
BG = BackgroundManager()


def inject_background_results(messages: list, notifs: list) -> bool:
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
messages.append({"role": "assistant", "content": "Noted background results."})
return True
return False


# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
Expand Down Expand Up @@ -187,19 +216,15 @@ def run_edit(path: str, old_text: str, new_text: str) -> str:
def agent_loop(messages: list):
while True:
# Drain background notifications and inject as system message before LLM call
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
messages.append({"role": "assistant", "content": "Noted background results."})
inject_background_results(messages, BG.drain_notifications())
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
if BG.has_running_tasks() and inject_background_results(messages, BG.wait_for_notifications()):
continue
return
results = []
for block in response.content:
Expand Down
27 changes: 22 additions & 5 deletions agents/s_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,16 @@ def drain(self) -> list:
notifs.append(self.notifications.get_nowait())
return notifs

def has_running_tasks(self) -> bool:
return any(task["status"] == "running" for task in self.tasks.values())

def wait_for_notifications(self) -> list:
first = self.notifications.get()
notifs = [first]
while not self.notifications.empty():
notifs.append(self.notifications.get_nowait())
return notifs


# === SECTION: messaging (s09) ===
class MessageBus:
Expand Down Expand Up @@ -651,6 +661,15 @@ def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> st


# === SECTION: agent_loop ===
def inject_background_results(messages: list, notifs: list) -> bool:
if notifs:
txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs)
messages.append({"role": "user", "content": f"<background-results>\n{txt}\n</background-results>"})
messages.append({"role": "assistant", "content": "Noted background results."})
return True
return False


def agent_loop(messages: list):
rounds_without_todo = 0
while True:
Expand All @@ -660,11 +679,7 @@ def agent_loop(messages: list):
print("[auto-compact triggered]")
messages[:] = auto_compact(messages)
# s08: drain background notifications
notifs = BG.drain()
if notifs:
txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs)
messages.append({"role": "user", "content": f"<background-results>\n{txt}\n</background-results>"})
messages.append({"role": "assistant", "content": "Noted background results."})
inject_background_results(messages, BG.drain())
# s10: check lead inbox
inbox = BUS.read_inbox("lead")
if inbox:
Expand All @@ -677,6 +692,8 @@ def agent_loop(messages: list):
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
if BG.has_running_tasks() and inject_background_results(messages, BG.wait_for_notifications()):
continue
return
# Tool execution
results = []
Expand Down
135 changes: 135 additions & 0 deletions tests/test_background_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import os
import sys
import types
import unittest
from pathlib import Path
from types import SimpleNamespace


REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))

os.environ.setdefault("MODEL_ID", "test-model")

fake_anthropic = types.ModuleType("anthropic")


class FakeAnthropic:
def __init__(self, *args, **kwargs):
self.messages = SimpleNamespace(create=None)


setattr(fake_anthropic, "Anthropic", FakeAnthropic)
sys.modules.setdefault("anthropic", fake_anthropic)

fake_dotenv = types.ModuleType("dotenv")
setattr(fake_dotenv, "load_dotenv", lambda *args, **kwargs: None)
sys.modules.setdefault("dotenv", fake_dotenv)

import agents.s08_background_tasks as s08_background_tasks
import agents.s_full as s_full


class FakeMessagesAPI:
def __init__(self, responses):
self._responses = iter(responses)
self.call_count = 0

def create(self, **kwargs):
self.call_count += 1
return next(self._responses)


class FakeBackgroundManager:
def __init__(self):
self._running = True
self.wait_called = False

def drain_notifications(self):
return []

def drain(self):
return []

def has_running_tasks(self):
return self._running

def wait_for_notifications(self):
self.wait_called = True
self._running = False
return [{"task_id": "bg-1", "status": "completed", "result": "done"}]


class BackgroundNotificationTests(unittest.TestCase):
def test_s08_agent_loop_waits_for_background_results_after_end_turn(self):
messages = [{"role": "user", "content": "Run tests in the background"}]
fake_bg = FakeBackgroundManager()
fake_api = FakeMessagesAPI(
[
SimpleNamespace(
stop_reason="end_turn", content="Started background work."
),
SimpleNamespace(
stop_reason="end_turn", content="Background work completed."
),
]
)
original_bg = s08_background_tasks.BG
original_client = s08_background_tasks.client
try:
s08_background_tasks.BG = fake_bg
s08_background_tasks.client = SimpleNamespace(messages=fake_api)
s08_background_tasks.agent_loop(messages)
finally:
s08_background_tasks.BG = original_bg
s08_background_tasks.client = original_client

self.assertTrue(fake_bg.wait_called)
self.assertEqual(fake_api.call_count, 2)
self.assertTrue(
any(
message["role"] == "user"
and isinstance(message["content"], str)
and "<background-results>" in message["content"]
for message in messages
)
)

def test_s_full_agent_loop_waits_for_background_results_after_end_turn(self):
messages = [{"role": "user", "content": "Run tests in the background"}]
fake_bg = FakeBackgroundManager()
fake_api = FakeMessagesAPI(
[
SimpleNamespace(
stop_reason="end_turn", content="Started background work."
),
SimpleNamespace(
stop_reason="end_turn", content="Background work completed."
),
]
)
original_bg = s_full.BG
original_client = s_full.client
try:
s_full.BG = fake_bg
s_full.client = SimpleNamespace(messages=fake_api)
s_full.agent_loop(messages)
finally:
s_full.BG = original_bg
s_full.client = original_client

self.assertTrue(fake_bg.wait_called)
self.assertEqual(fake_api.call_count, 2)
self.assertTrue(
any(
message["role"] == "user"
and isinstance(message["content"], str)
and "<background-results>" in message["content"]
for message in messages
)
)


if __name__ == "__main__":
unittest.main()