diff --git a/agent.py b/agent.py index 50d47e4c3b..3e851aab31 100644 --- a/agent.py +++ b/agent.py @@ -1,4 +1,4 @@ -import asyncio, random, string +import asyncio, random, string, time import nest_asyncio nest_asyncio.apply() @@ -82,7 +82,10 @@ def __init__( self.log.context = self self.paused = paused self.streaming_agent = streaming_agent + self.last_active_agent: "Agent | None" = None # persists for nudge recovery + self.last_stream_time: float = 0.0 # timestamp of last LLM stream chunk self.task: DeferredTask | None = None + self._watchdog_task: asyncio.Task | None = None # auto-nudge watchdog self.created_at = created_at or datetime.now(timezone.utc) self.type = type AgentContext._counter += 1 @@ -222,13 +225,44 @@ def reset(self): self.paused = False def nudge(self): + self._stop_watchdog() # stop current watchdog before restarting self.kill_process() self.paused = False self.task = self.run_task(self.get_agent().monologue) return self.task def get_agent(self): - return self.streaming_agent or self.agent0 + return self.streaming_agent or self.last_active_agent or self.agent0 + + async def _auto_nudge_watchdog(self): + """Background task that monitors for stuck LLM streaming and triggers auto-nudge.""" + try: + while self.task and self.task.is_alive(): + await asyncio.sleep(5) # check every 5 seconds + if not self.config.auto_nudge_enabled: + continue + if self.last_stream_time == 0: + continue # no streaming started yet + elapsed = time.time() - self.last_stream_time + if elapsed > self.config.auto_nudge_timeout: + agent = self.get_agent() + msg = f"Auto-nudge triggered: no LLM response for {elapsed:.0f}s (Agent {agent.number})" + self.log.log(type="warning", content=msg) + self.nudge() + break + except asyncio.CancelledError: + pass # normal shutdown + + def _start_watchdog(self): + """Start the auto-nudge watchdog if enabled.""" + if self.config.auto_nudge_enabled and self._watchdog_task is None: + self._watchdog_task = asyncio.create_task(self._auto_nudge_watchdog()) + + def _stop_watchdog(self): + """Stop the auto-nudge watchdog.""" + if self._watchdog_task: + self._watchdog_task.cancel() + self._watchdog_task = None def communicate(self, msg: "UserMessage", broadcast_level: int = 1): self.paused = False # unpause if paused @@ -256,7 +290,9 @@ def run_task( self.task = DeferredTask( thread_name=self.__class__.__name__, ) + self.last_stream_time = 0.0 # reset for new task self.task.start_task(func, *args, **kwargs) + self._start_watchdog() # start auto-nudge monitoring return self.task # this wrapper ensures that superior agents are called back if the chat was loaded from file and original callstack is gone @@ -296,6 +332,8 @@ class AgentConfig: code_exec_ssh_port: int = 55022 code_exec_ssh_user: str = "root" code_exec_ssh_pass: str = "" + auto_nudge_enabled: bool = False + auto_nudge_timeout: int = 60 # seconds without LLM streaming before auto-nudge additional: Dict[str, Any] = field(default_factory=dict) @@ -378,6 +416,7 @@ async def monologue(self): while True: self.context.streaming_agent = self # mark self as current streamer + self.context.last_active_agent = self # persist for nudge recovery self.loop_data.iteration += 1 self.loop_data.params_temporary = {} # clear temporary params @@ -397,6 +436,7 @@ async def monologue(self): async def reasoning_callback(chunk: str, full: str): await self.handle_intervention() + self.context.last_stream_time = time.time() # update for auto-nudge if chunk == full: printer.print("Reasoning: ") # start of reasoning # Pass chunk and full data to extensions for processing @@ -414,6 +454,7 @@ async def reasoning_callback(chunk: str, full: str): async def stream_callback(chunk: str, full: str): await self.handle_intervention() + self.context.last_stream_time = time.time() # update for auto-nudge # output the agent response stream if chunk == full: printer.print("Response: ") # start of response diff --git a/docs/designs/2025-01-09-nudge-improvement-design.md b/docs/designs/2025-01-09-nudge-improvement-design.md new file mode 100644 index 0000000000..5788eba1fe --- /dev/null +++ b/docs/designs/2025-01-09-nudge-improvement-design.md @@ -0,0 +1,114 @@ +# Nudge Functionality Improvement Design + +**Date:** 2025-01-09 +**Status:** Approved +**Branch:** feat-nudge-improvement + +## Problem + +The nudge button resets execution to Agent 0 when a subordinate agent is running. Users expect nudge to restart the *current* agent, not the root agent. + +**Root cause:** The `nudge()` method calls `get_agent()`, which returns `streaming_agent or agent0`. Since `streaming_agent` is cleared to `None` at the end of each monologue (`agent.py:504`), nudge always falls back to `agent0`. + +## Solution + +### 1. Core Fix: Track Last Active Agent + +Add a `last_active_agent` field to `AgentContext` that persists beyond monologue completion. + +**Changes to `agent.py`:** + +```python +# In AgentContext.__init__(): +self.last_active_agent: Agent | None = None + +# In Agent.monologue(), where streaming_agent is set (~line 369): +self.context.last_active_agent = self + +# Updated get_agent(): +def get_agent(self): + return self.streaming_agent or self.last_active_agent or self.agent0 +``` + +The `nudge()` method remains unchanged—it already calls `get_agent().monologue()`. The fix is in what `get_agent()` returns. + +**Why this works:** +- `last_active_agent` is set when any agent starts its monologue +- Unlike `streaming_agent`, it is never cleared +- Existing `_process_chain` mechanics handle bubbling responses back to superior agents + +### 2. Auto-Nudge: LLM Streaming Timeout + +Detect when the LLM stops responding mid-stream and trigger automatic nudge. + +**Configuration (in `AgentConfig`):** + +```python +auto_nudge_enabled: bool = False +auto_nudge_timeout: int = 60 # seconds +``` + +**Implementation:** + +```python +# In AgentContext.__init__(): +self.last_stream_time: float = 0.0 + +# In streaming callbacks (reasoning_callback / stream_callback): +self.context.last_stream_time = time.time() + +# Background watchdog task: +async def _auto_nudge_watchdog(self): + while self.task and self.task.is_alive(): + await asyncio.sleep(5) + if not self.auto_nudge_enabled: + continue + if self.last_stream_time == 0: + continue + elapsed = time.time() - self.last_stream_time + if elapsed > self.auto_nudge_timeout: + self.log.log(type="warning", + content=f"Auto-nudge triggered: no LLM response for {elapsed:.0f}s") + self.nudge() + break +``` + +**Watchdog lifecycle:** +- Started when `run_task()` begins a new task +- Stops when task completes or nudge triggers +- Only monitors during active LLM streaming + +### 3. UI Feedback + +Update `python/api/nudge.py` to report which agent was nudged: + +```python +agent = context.get_agent() +context.nudge() +msg = f"Agent {agent.number} nudged." +return { + "message": msg, + "ctxid": context.id, + "agent_number": agent.number, +} +``` + +## Implementation Order + +1. **Core fix** - Add `last_active_agent`, update `get_agent()` +2. **Auto-nudge** - Add config, timestamp tracking, watchdog +3. **API update** - Enhanced nudge response + +## Files Modified + +- `agent.py` - Core changes (~30 lines) +- `python/api/nudge.py` - Enhanced response (~5 lines) + +## Testing + +| Scenario | Steps | Expected | +|----------|-------|----------| +| Nudge subordinate | Agent 0→1→2 chain, nudge at Agent 2 | Agent 2 resumes | +| Nudge after completion | Agent 2 completes, nudge before Agent 1 responds | Agent 2 restarts | +| Auto-nudge triggers | Enable auto-nudge, 60s+ no chunks | Auto-nudge fires | +| Auto-nudge disabled | Default config, stuck LLM | No auto-nudge | diff --git a/python/api/nudge.py b/python/api/nudge.py index 558734cdf0..017e9e6269 100644 --- a/python/api/nudge.py +++ b/python/api/nudge.py @@ -7,12 +7,14 @@ async def process(self, input: dict, request: Request) -> dict | Response: raise Exception("No context id provided") context = self.use_context(ctxid) + agent = context.get_agent() context.nudge() - msg = "Process reset, agent nudged." + msg = f"Agent {agent.number} nudged." context.log.log(type="info", content=msg) - + return { "message": msg, "ctxid": context.id, + "agent_number": agent.number, } \ No newline at end of file diff --git a/tests/test_nudge.py b/tests/test_nudge.py new file mode 100644 index 0000000000..79c4d926b7 --- /dev/null +++ b/tests/test_nudge.py @@ -0,0 +1,250 @@ +""" +Unit tests for the nudge functionality. + +Tests verify that: +1. last_active_agent is set when an agent starts its monologue +2. get_agent() returns the correct agent based on priority +3. nudge() restarts the correct agent (subordinate, not always agent0) +""" + +import sys +import os +import asyncio +from unittest.mock import Mock, MagicMock, AsyncMock, patch + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import agent as agent_module +from agent import Agent, AgentContext, AgentConfig + + +def create_mock_config(): + """Create a mock AgentConfig with required fields.""" + config = Mock(spec=AgentConfig) + config.auto_nudge_enabled = False + config.auto_nudge_timeout = 60 + return config + + +class TestGetAgent: + """Tests for AgentContext.get_agent() method.""" + + def setup_method(self): + """Set up test fixtures.""" + self.config = create_mock_config() + self.context = Mock(spec=AgentContext) + self.context.streaming_agent = None + self.context.last_active_agent = None + self.context.agent0 = Mock(spec=Agent) + self.context.agent0.number = 0 + + def test_get_agent_returns_streaming_agent_when_set(self): + """get_agent() should return streaming_agent when it's set.""" + streaming = Mock(spec=Agent) + streaming.number = 1 + self.context.streaming_agent = streaming + + # Call the actual method + result = AgentContext.get_agent(self.context) + + assert result == streaming + assert result.number == 1 + + def test_get_agent_returns_last_active_when_streaming_is_none(self): + """get_agent() should return last_active_agent when streaming_agent is None.""" + last_active = Mock(spec=Agent) + last_active.number = 2 + self.context.streaming_agent = None + self.context.last_active_agent = last_active + + result = AgentContext.get_agent(self.context) + + assert result == last_active + assert result.number == 2 + + def test_get_agent_falls_back_to_agent0(self): + """get_agent() should fall back to agent0 when both are None.""" + self.context.streaming_agent = None + self.context.last_active_agent = None + + result = AgentContext.get_agent(self.context) + + assert result == self.context.agent0 + assert result.number == 0 + + def test_get_agent_priority_order(self): + """get_agent() should check streaming_agent first, then last_active_agent, then agent0.""" + streaming = Mock(spec=Agent) + streaming.number = 1 + last_active = Mock(spec=Agent) + last_active.number = 2 + + self.context.streaming_agent = streaming + self.context.last_active_agent = last_active + + # streaming_agent should take priority + result = AgentContext.get_agent(self.context) + assert result.number == 1 + + # When streaming is None, last_active should be used + self.context.streaming_agent = None + result = AgentContext.get_agent(self.context) + assert result.number == 2 + + +class TestNudgeSubordinateAgent: + """Tests for nudging subordinate agents (the main bug fix).""" + + def test_nudge_uses_last_active_agent_not_agent0(self): + """ + When a subordinate agent was last active, nudge should restart + that agent, not agent0. + + This is the core bug fix - before, nudge always restarted agent0 + because streaming_agent was cleared at monologue end. + """ + context = Mock(spec=AgentContext) + context.streaming_agent = None # Cleared after monologue + context.paused = True + context._watchdog_task = None + + # Simulate Agent 2 was the last active (subordinate) + agent2 = Mock(spec=Agent) + agent2.number = 2 + agent2.monologue = AsyncMock() + context.last_active_agent = agent2 + + # Agent 0 should NOT be used + agent0 = Mock(spec=Agent) + agent0.number = 0 + context.agent0 = agent0 + + # Mock the methods + context.kill_process = Mock() + context._stop_watchdog = Mock() + context.run_task = Mock(return_value=Mock()) + + # Bind get_agent to use actual implementation + context.get_agent = lambda: AgentContext.get_agent(context) + + # Call nudge + AgentContext.nudge(context) + + # Verify agent2's monologue was started, not agent0's + context.run_task.assert_called_once() + args = context.run_task.call_args[0] + assert args[0] == agent2.monologue, "Should start agent2's monologue, not agent0's" + + +class TestAutoNudgeConfig: + """Tests for auto-nudge configuration.""" + + def test_auto_nudge_disabled_by_default(self): + """Auto-nudge should be disabled by default (checking dataclass default).""" + # Check the dataclass field default directly + from dataclasses import fields + config_fields = {f.name: f for f in fields(AgentConfig)} + assert config_fields['auto_nudge_enabled'].default is False + + def test_auto_nudge_timeout_default(self): + """Auto-nudge timeout should default to 60 seconds.""" + from dataclasses import fields + config_fields = {f.name: f for f in fields(AgentConfig)} + assert config_fields['auto_nudge_timeout'].default == 60 + + def test_auto_nudge_fields_exist(self): + """Auto-nudge config fields should exist in AgentConfig.""" + from dataclasses import fields + field_names = [f.name for f in fields(AgentConfig)] + assert 'auto_nudge_enabled' in field_names + assert 'auto_nudge_timeout' in field_names + + +class TestLastActiveAgentTracking: + """Tests for last_active_agent field behavior.""" + + def test_last_active_agent_initialized_to_none(self): + """last_active_agent should be None initially.""" + # Create a minimal mock for dependencies + with patch.object(agent_module, 'Log'): + with patch.object(agent_module, 'DeferredTask'): + context = AgentContext.__new__(AgentContext) + context.streaming_agent = None + context.last_active_agent = None + context.last_stream_time = 0.0 + context._watchdog_task = None + + assert context.last_active_agent is None + + def test_last_active_agent_not_cleared_like_streaming_agent(self): + """ + Unlike streaming_agent which is cleared at monologue end, + last_active_agent should persist. + """ + context = Mock(spec=AgentContext) + agent1 = Mock(spec=Agent) + agent1.number = 1 + + # Simulate monologue start + context.streaming_agent = agent1 + context.last_active_agent = agent1 + + # Simulate monologue end (streaming_agent cleared) + context.streaming_agent = None + # last_active_agent should still be set + + assert context.last_active_agent == agent1 + assert context.streaming_agent is None + + +def run_tests(): + """Run all tests and print results.""" + import traceback + + test_classes = [ + TestGetAgent, + TestNudgeSubordinateAgent, + TestAutoNudgeConfig, + TestLastActiveAgentTracking, + ] + + passed = 0 + failed = 0 + + for test_class in test_classes: + print(f"\n{'='*60}") + print(f"Running {test_class.__name__}") + print('='*60) + + instance = test_class() + + for method_name in dir(instance): + if method_name.startswith('test_'): + if hasattr(instance, 'setup_method'): + instance.setup_method() + + try: + method = getattr(instance, method_name) + method() + print(f" PASS: {method_name}") + passed += 1 + except AssertionError as e: + print(f" FAIL: {method_name}") + print(f" {e}") + failed += 1 + except Exception as e: + print(f" ERROR: {method_name}") + print(f" {e}") + traceback.print_exc() + failed += 1 + + print(f"\n{'='*60}") + print(f"Results: {passed} passed, {failed} failed") + print('='*60) + + return failed == 0 + + +if __name__ == "__main__": + success = run_tests() + sys.exit(0 if success else 1)