diff --git a/core/framework/runtime/execution_stream.py b/core/framework/runtime/execution_stream.py index b63053f9f0..cf5b1a49b6 100644 --- a/core/framework/runtime/execution_stream.py +++ b/core/framework/runtime/execution_stream.py @@ -239,6 +239,9 @@ def __init__( # State self._running = False + # Tracks executions paused via pause_execution() so the CancelledError + # handler can write status="paused" instead of "cancelled". + self._pause_requested: set[str] = set() async def start(self) -> None: """Start the execution stream.""" @@ -617,9 +620,10 @@ async def _run_execution(self, ctx: ExecutionContext) -> None: ) # Update context status based on result - if has_result and result.paused_at: + if execution_id in self._pause_requested or (has_result and result.paused_at): ctx.status = "paused" ctx.completed_at = datetime.now() + self._pause_requested.discard(execution_id) else: ctx.status = "cancelled" @@ -915,6 +919,29 @@ async def cancel_execution(self, execution_id: str) -> bool: return True return False + async def pause_execution(self, execution_id: str) -> bool: + """ + Pause a running execution so it can be resumed later. + + Marks the execution as pause-requested before cancelling its task so + that the CancelledError handler writes ``status="paused"`` to disk + instead of ``status="cancelled"``. The caller can then resume the + execution via the ``/resume`` endpoint. + + Args: + execution_id: Execution to pause + + Returns: + True if the execution was found and paused, False if not found + """ + task = self._execution_tasks.get(execution_id) + if task and not task.done(): + self._pause_requested.add(execution_id) + task.cancel() + await asyncio.wait({task}, timeout=5.0) + return True + return False + # === STATS AND MONITORING === def get_active_count(self) -> int: diff --git a/core/framework/server/README.md b/core/framework/server/README.md index dfbf292de6..5ff9d4f530 100644 --- a/core/framework/server/README.md +++ b/core/framework/server/README.md @@ -136,7 +136,7 @@ DELETE /api/sessions/{session_id}/worker | `POST` | `/api/sessions/{session_id}/inject` | Inject input into a waiting node | | `POST` | `/api/sessions/{session_id}/chat` | Smart chat routing | | `POST` | `/api/sessions/{session_id}/stop` | Cancel a running execution | -| `POST` | `/api/sessions/{session_id}/pause` | Alias for stop | +| `POST` | `/api/sessions/{session_id}/pause` | Pause a running execution (resumable) | | `POST` | `/api/sessions/{session_id}/resume` | Resume a paused execution | | `POST` | `/api/sessions/{session_id}/replay` | Re-run from a checkpoint | | `GET` | `/api/sessions/{session_id}/goal-progress` | Evaluate goal progress | @@ -173,6 +173,16 @@ POST /api/sessions/{session_id}/inject ```jsonc POST /api/sessions/{session_id}/stop { "execution_id": "..." } +// Returns: { "stopped": true, "execution_id": "..." } +// Terminates execution permanently. Use /trigger to start fresh. +``` + +**Pause:** +```jsonc +POST /api/sessions/{session_id}/pause +{ "execution_id": "..." } +// Returns: { "paused": true, "execution_id": "..." } +// Freezes execution at current state. Use /resume to continue. ``` **Resume:** diff --git a/core/framework/server/routes_execution.py b/core/framework/server/routes_execution.py index ca9f83f315..ae813c3a4b 100644 --- a/core/framework/server/routes_execution.py +++ b/core/framework/server/routes_execution.py @@ -194,6 +194,22 @@ async def handle_resume(request: web.Request) -> web.Response: except (json.JSONDecodeError, OSError) as e: return web.json_response({"error": f"Failed to read session: {e}"}, status=500) + # Only paused sessions can be resumed. Stopped (cancelled) sessions must + # be re-started from scratch using the /trigger endpoint. + session_status = state.get("status", "") + if session_status == "cancelled": + return web.json_response( + { + "error": ( + "Cannot resume a stopped session. " + "Use POST /trigger to start a new execution." + ), + "session_id": worker_session_id, + "status": session_status, + }, + status=409, + ) + if checkpoint_id: resume_session_state = { "resume_session_id": worker_session_id, @@ -223,6 +239,10 @@ async def handle_resume(request: web.Request) -> web.Response: session_state=resume_session_state, ) + # Re-enable timer-driven entry points that were paused alongside + # the execution. + session.worker_runtime.resume_timers() + return web.json_response( { "execution_id": execution_id, @@ -267,6 +287,47 @@ async def handle_stop(request: web.Request) -> web.Response: return web.json_response({"stopped": False, "error": "Execution not found"}, status=404) +async def handle_pause(request: web.Request) -> web.Response: + """POST /api/sessions/{session_id}/pause — pause a running execution. + + Freezes the execution at its current state and writes status=\"paused\" + to disk so it can be resumed later via /resume. + + Body: {"execution_id": "..."} + """ + session, err = resolve_session(request) + if err: + return err + + if not session.worker_runtime: + return web.json_response({"error": "No worker loaded in this session"}, status=503) + + body = await request.json() + execution_id = body.get("execution_id") + + if not execution_id: + return web.json_response({"error": "execution_id is required"}, status=400) + + for graph_id in session.worker_runtime.list_graphs(): + reg = session.worker_runtime.get_graph_registration(graph_id) + if reg is None: + continue + for _ep_id, stream in reg.streams.items(): + paused = await stream.pause_execution(execution_id) + if paused: + # Also pause timer-driven entry points so no new runs + # are triggered while the execution is frozen. + session.worker_runtime.pause_timers() + return web.json_response( + { + "paused": True, + "execution_id": execution_id, + } + ) + + return web.json_response({"paused": False, "error": "Execution not found"}, status=404) + + async def handle_replay(request: web.Request) -> web.Response: """POST /api/sessions/{session_id}/replay — re-run from a checkpoint. @@ -340,7 +401,7 @@ def register_routes(app: web.Application) -> None: app.router.add_post("/api/sessions/{session_id}/trigger", handle_trigger) app.router.add_post("/api/sessions/{session_id}/inject", handle_inject) app.router.add_post("/api/sessions/{session_id}/chat", handle_chat) - app.router.add_post("/api/sessions/{session_id}/pause", handle_stop) + app.router.add_post("/api/sessions/{session_id}/pause", handle_pause) app.router.add_post("/api/sessions/{session_id}/resume", handle_resume) app.router.add_post("/api/sessions/{session_id}/stop", handle_stop) app.router.add_post("/api/sessions/{session_id}/cancel-queen", handle_cancel_queen) diff --git a/core/framework/server/tests/test_api.py b/core/framework/server/tests/test_api.py index 8aac59b86e..d25b395cd6 100644 --- a/core/framework/server/tests/test_api.py +++ b/core/framework/server/tests/test_api.py @@ -78,6 +78,9 @@ class MockStream: async def cancel_execution(self, execution_id: str) -> bool: return execution_id in self._execution_tasks + async def pause_execution(self, execution_id: str) -> bool: + return execution_id in self._execution_tasks + @dataclass class MockGraphRegistration: @@ -126,6 +129,12 @@ def find_awaiting_node(self): def get_stats(self): return {"running": True, "executions": 1} + def pause_timers(self) -> None: + pass + + def resume_timers(self) -> None: + pass + class MockAgentInfo: name: str = "test_agent" @@ -558,6 +567,26 @@ async def test_pause_missing_execution_id(self): ) assert resp.status == 400 + @pytest.mark.asyncio + async def test_pause_success(self): + """Pausing a running execution marks it as paused and stops timer-driven runs.""" + session = _make_session() + # Replace pause_timers with a MagicMock so we can assert it was called + session.worker_runtime.pause_timers = MagicMock() + session.worker_runtime._mock_streams["default"]._execution_tasks["exec_abc"] = MagicMock() + app = _make_app_with_session(session) + async with TestClient(TestServer(app)) as client: + resp = await client.post( + "/api/sessions/test_agent/pause", + json={"execution_id": "exec_abc"}, + ) + assert resp.status == 200 + data = await resp.json() + assert data["paused"] is True + assert data["execution_id"] == "exec_abc" + # Timer-driven entry points must be frozen alongside the live execution + session.worker_runtime.pause_timers.assert_called_once() + @pytest.mark.asyncio async def test_goal_progress(self): session = _make_session() @@ -572,11 +601,13 @@ async def test_goal_progress(self): class TestResume: @pytest.mark.asyncio async def test_resume_from_session_state(self, sample_session, tmp_agent_dir): - """Resume using session state (paused_at).""" + """Resume using session state (paused_at) and re-enables timers.""" session_id, session_dir, state = sample_session tmp_path, agent_name, base = tmp_agent_dir session = _make_session(tmp_dir=tmp_path / ".hive" / "agents" / agent_name) + # Replace resume_timers with a MagicMock so we can assert it was called + session.worker_runtime.resume_timers = MagicMock() app = _make_app_with_session(session) async with TestClient(TestServer(app)) as client: @@ -589,6 +620,8 @@ async def test_resume_from_session_state(self, sample_session, tmp_agent_dir): assert data["execution_id"] == "exec_test_123" assert data["resumed_from"] == session_id assert data["checkpoint_id"] is None + # Timers must be re-enabled when resuming + session.worker_runtime.resume_timers.assert_called_once() @pytest.mark.asyncio async def test_resume_with_checkpoint(self, sample_session, tmp_agent_dir): @@ -633,6 +666,33 @@ async def test_resume_session_not_found(self): ) assert resp.status == 404 + @pytest.mark.asyncio + async def test_resume_rejected_for_cancelled_session(self, tmp_agent_dir): + """Resuming a stopped (cancelled) session must return 409.""" + tmp_path, agent_name, base = tmp_agent_dir + session_id = "session_cancelled" + session_dir = base / "sessions" / session_id + session_dir.mkdir(parents=True) + cancelled_state = { + "status": "cancelled", + "input_data": {}, + "memory": {}, + "progress": {}, + } + (session_dir / "state.json").write_text(json.dumps(cancelled_state)) + + session = _make_session(tmp_dir=tmp_path / ".hive" / "agents" / agent_name) + app = _make_app_with_session(session) + + async with TestClient(TestServer(app)) as client: + resp = await client.post( + "/api/sessions/test_agent/resume", + json={"session_id": session_id}, + ) + assert resp.status == 409 + data = await resp.json() + assert "stopped" in data["error"].lower() or "cancelled" in data["error"].lower() + class TestStop: @pytest.mark.asyncio diff --git a/docs/roadmap.md b/docs/roadmap.md index b9197088fb..2ac9cf63c8 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -644,11 +644,11 @@ Expose basic REST/WebSocket endpoints for external control (Start, Stop, Pause, - [x] State persistence - [x] Recovery mechanisms - [ ] **REST API Endpoints** - - [ ] Start endpoint for agent execution - - [ ] Stop endpoint for graceful shutdown - - [ ] Pause endpoint for execution suspension - - [ ] Resume endpoint for continuation - - [ ] Status query endpoint for monitoring + - [x] Start endpoint for agent execution + - [x] Stop endpoint for graceful shutdown + - [x] Pause endpoint for execution suspension + - [x] Resume endpoint for continuation + - [x] Status query endpoint for monitoring - [ ] **WebSocket API** - [ ] Real-time event streaming to clients - [ ] Bidirectional communication diff --git a/docs/server-cli-arch.md b/docs/server-cli-arch.md index 02c9e098f8..829c76b017 100644 --- a/docs/server-cli-arch.md +++ b/docs/server-cli-arch.md @@ -168,7 +168,8 @@ Every HTTP endpoint is a direct, thin delegation to a shared runtime method. No | `POST /api/agents/{id}/chat` | Auto-route | `runtime.inject_input()` or `runtime.trigger()` | | `POST /api/agents/{id}/inject` | Send user input | `runtime.inject_input(node_id, content)` | | `POST /api/agents/{id}/resume` | Resume session | `runtime.trigger()` with `session_state` | -| `POST /api/agents/{id}/stop` | Pause execution | Cancels the execution task | +| `POST /api/agents/{id}/pause` | Pause execution (resumable) | `stream.pause_execution()` → writes `status="paused"` to disk | +| `POST /api/agents/{id}/stop` | Terminate execution permanently | `stream.cancel_execution()` → writes `status="cancelled"` to disk | | `POST /api/agents/{id}/replay` | Replay checkpoint | Checkpoint restore → `runtime.trigger()` | | `GET /api/agents/{id}/goal-progress` | Goal progress | `runtime.get_goal_progress()` |