Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion core/framework/runtime/execution_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ def __init__(

# State
self._running = False
# Tracks executions paused via pause_execution() so the CancelledError
# handler can write status="paused" instead of "cancelled".
self._pause_requested: set[str] = set()

async def start(self) -> None:
"""Start the execution stream."""
Expand Down Expand Up @@ -617,9 +620,10 @@ async def _run_execution(self, ctx: ExecutionContext) -> None:
)

# Update context status based on result
if has_result and result.paused_at:
if execution_id in self._pause_requested or (has_result and result.paused_at):
ctx.status = "paused"
ctx.completed_at = datetime.now()
self._pause_requested.discard(execution_id)
else:
ctx.status = "cancelled"

Expand Down Expand Up @@ -915,6 +919,29 @@ async def cancel_execution(self, execution_id: str) -> bool:
return True
return False

async def pause_execution(self, execution_id: str) -> bool:
"""
Pause a running execution so it can be resumed later.

Marks the execution as pause-requested before cancelling its task so
that the CancelledError handler writes ``status="paused"`` to disk
instead of ``status="cancelled"``. The caller can then resume the
execution via the ``/resume`` endpoint.

Args:
execution_id: Execution to pause

Returns:
True if the execution was found and paused, False if not found
"""
task = self._execution_tasks.get(execution_id)
if task and not task.done():
self._pause_requested.add(execution_id)
task.cancel()
await asyncio.wait({task}, timeout=5.0)
return True
return False

# === STATS AND MONITORING ===

def get_active_count(self) -> int:
Expand Down
12 changes: 11 additions & 1 deletion core/framework/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ DELETE /api/sessions/{session_id}/worker
| `POST` | `/api/sessions/{session_id}/inject` | Inject input into a waiting node |
| `POST` | `/api/sessions/{session_id}/chat` | Smart chat routing |
| `POST` | `/api/sessions/{session_id}/stop` | Cancel a running execution |
| `POST` | `/api/sessions/{session_id}/pause` | Alias for stop |
| `POST` | `/api/sessions/{session_id}/pause` | Pause a running execution (resumable) |
| `POST` | `/api/sessions/{session_id}/resume` | Resume a paused execution |
| `POST` | `/api/sessions/{session_id}/replay` | Re-run from a checkpoint |
| `GET` | `/api/sessions/{session_id}/goal-progress` | Evaluate goal progress |
Expand Down Expand Up @@ -173,6 +173,16 @@ POST /api/sessions/{session_id}/inject
```jsonc
POST /api/sessions/{session_id}/stop
{ "execution_id": "..." }
// Returns: { "stopped": true, "execution_id": "..." }
// Terminates execution permanently. Use /trigger to start fresh.
```

**Pause:**
```jsonc
POST /api/sessions/{session_id}/pause
{ "execution_id": "..." }
// Returns: { "paused": true, "execution_id": "..." }
// Freezes execution at current state. Use /resume to continue.
```

**Resume:**
Expand Down
63 changes: 62 additions & 1 deletion core/framework/server/routes_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ async def handle_resume(request: web.Request) -> web.Response:
except (json.JSONDecodeError, OSError) as e:
return web.json_response({"error": f"Failed to read session: {e}"}, status=500)

# Only paused sessions can be resumed. Stopped (cancelled) sessions must
# be re-started from scratch using the /trigger endpoint.
session_status = state.get("status", "")
if session_status == "cancelled":
return web.json_response(
{
"error": (
"Cannot resume a stopped session. "
"Use POST /trigger to start a new execution."
),
"session_id": worker_session_id,
"status": session_status,
},
status=409,
)

if checkpoint_id:
resume_session_state = {
"resume_session_id": worker_session_id,
Expand Down Expand Up @@ -223,6 +239,10 @@ async def handle_resume(request: web.Request) -> web.Response:
session_state=resume_session_state,
)

# Re-enable timer-driven entry points that were paused alongside
# the execution.
session.worker_runtime.resume_timers()

return web.json_response(
{
"execution_id": execution_id,
Expand Down Expand Up @@ -267,6 +287,47 @@ async def handle_stop(request: web.Request) -> web.Response:
return web.json_response({"stopped": False, "error": "Execution not found"}, status=404)


async def handle_pause(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/pause — pause a running execution.

Freezes the execution at its current state and writes status=\"paused\"
to disk so it can be resumed later via /resume.

Body: {"execution_id": "..."}
"""
session, err = resolve_session(request)
if err:
return err

if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)

body = await request.json()
execution_id = body.get("execution_id")

if not execution_id:
return web.json_response({"error": "execution_id is required"}, status=400)

for graph_id in session.worker_runtime.list_graphs():
reg = session.worker_runtime.get_graph_registration(graph_id)
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
paused = await stream.pause_execution(execution_id)
if paused:
# Also pause timer-driven entry points so no new runs
# are triggered while the execution is frozen.
session.worker_runtime.pause_timers()
return web.json_response(
{
"paused": True,
"execution_id": execution_id,
}
)

return web.json_response({"paused": False, "error": "Execution not found"}, status=404)


async def handle_replay(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/replay — re-run from a checkpoint.

Expand Down Expand Up @@ -340,7 +401,7 @@ def register_routes(app: web.Application) -> None:
app.router.add_post("/api/sessions/{session_id}/trigger", handle_trigger)
app.router.add_post("/api/sessions/{session_id}/inject", handle_inject)
app.router.add_post("/api/sessions/{session_id}/chat", handle_chat)
app.router.add_post("/api/sessions/{session_id}/pause", handle_stop)
app.router.add_post("/api/sessions/{session_id}/pause", handle_pause)
app.router.add_post("/api/sessions/{session_id}/resume", handle_resume)
app.router.add_post("/api/sessions/{session_id}/stop", handle_stop)
app.router.add_post("/api/sessions/{session_id}/cancel-queen", handle_cancel_queen)
Expand Down
62 changes: 61 additions & 1 deletion core/framework/server/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class MockStream:
async def cancel_execution(self, execution_id: str) -> bool:
return execution_id in self._execution_tasks

async def pause_execution(self, execution_id: str) -> bool:
return execution_id in self._execution_tasks


@dataclass
class MockGraphRegistration:
Expand Down Expand Up @@ -126,6 +129,12 @@ def find_awaiting_node(self):
def get_stats(self):
return {"running": True, "executions": 1}

def pause_timers(self) -> None:
pass

def resume_timers(self) -> None:
pass


class MockAgentInfo:
name: str = "test_agent"
Expand Down Expand Up @@ -558,6 +567,26 @@ async def test_pause_missing_execution_id(self):
)
assert resp.status == 400

@pytest.mark.asyncio
async def test_pause_success(self):
"""Pausing a running execution marks it as paused and stops timer-driven runs."""
session = _make_session()
# Replace pause_timers with a MagicMock so we can assert it was called
session.worker_runtime.pause_timers = MagicMock()
session.worker_runtime._mock_streams["default"]._execution_tasks["exec_abc"] = MagicMock()
app = _make_app_with_session(session)
async with TestClient(TestServer(app)) as client:
resp = await client.post(
"/api/sessions/test_agent/pause",
json={"execution_id": "exec_abc"},
)
assert resp.status == 200
data = await resp.json()
assert data["paused"] is True
assert data["execution_id"] == "exec_abc"
# Timer-driven entry points must be frozen alongside the live execution
session.worker_runtime.pause_timers.assert_called_once()

@pytest.mark.asyncio
async def test_goal_progress(self):
session = _make_session()
Expand All @@ -572,11 +601,13 @@ async def test_goal_progress(self):
class TestResume:
@pytest.mark.asyncio
async def test_resume_from_session_state(self, sample_session, tmp_agent_dir):
"""Resume using session state (paused_at)."""
"""Resume using session state (paused_at) and re-enables timers."""
session_id, session_dir, state = sample_session
tmp_path, agent_name, base = tmp_agent_dir

session = _make_session(tmp_dir=tmp_path / ".hive" / "agents" / agent_name)
# Replace resume_timers with a MagicMock so we can assert it was called
session.worker_runtime.resume_timers = MagicMock()
app = _make_app_with_session(session)

async with TestClient(TestServer(app)) as client:
Expand All @@ -589,6 +620,8 @@ async def test_resume_from_session_state(self, sample_session, tmp_agent_dir):
assert data["execution_id"] == "exec_test_123"
assert data["resumed_from"] == session_id
assert data["checkpoint_id"] is None
# Timers must be re-enabled when resuming
session.worker_runtime.resume_timers.assert_called_once()

@pytest.mark.asyncio
async def test_resume_with_checkpoint(self, sample_session, tmp_agent_dir):
Expand Down Expand Up @@ -633,6 +666,33 @@ async def test_resume_session_not_found(self):
)
assert resp.status == 404

@pytest.mark.asyncio
async def test_resume_rejected_for_cancelled_session(self, tmp_agent_dir):
"""Resuming a stopped (cancelled) session must return 409."""
tmp_path, agent_name, base = tmp_agent_dir
session_id = "session_cancelled"
session_dir = base / "sessions" / session_id
session_dir.mkdir(parents=True)
cancelled_state = {
"status": "cancelled",
"input_data": {},
"memory": {},
"progress": {},
}
(session_dir / "state.json").write_text(json.dumps(cancelled_state))

session = _make_session(tmp_dir=tmp_path / ".hive" / "agents" / agent_name)
app = _make_app_with_session(session)

async with TestClient(TestServer(app)) as client:
resp = await client.post(
"/api/sessions/test_agent/resume",
json={"session_id": session_id},
)
assert resp.status == 409
data = await resp.json()
assert "stopped" in data["error"].lower() or "cancelled" in data["error"].lower()


class TestStop:
@pytest.mark.asyncio
Expand Down
10 changes: 5 additions & 5 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -644,11 +644,11 @@ Expose basic REST/WebSocket endpoints for external control (Start, Stop, Pause,
- [x] State persistence
- [x] Recovery mechanisms
- [ ] **REST API Endpoints**
- [ ] Start endpoint for agent execution
- [ ] Stop endpoint for graceful shutdown
- [ ] Pause endpoint for execution suspension
- [ ] Resume endpoint for continuation
- [ ] Status query endpoint for monitoring
- [x] Start endpoint for agent execution
- [x] Stop endpoint for graceful shutdown
- [x] Pause endpoint for execution suspension
- [x] Resume endpoint for continuation
- [x] Status query endpoint for monitoring
- [ ] **WebSocket API**
- [ ] Real-time event streaming to clients
- [ ] Bidirectional communication
Expand Down
3 changes: 2 additions & 1 deletion docs/server-cli-arch.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ Every HTTP endpoint is a direct, thin delegation to a shared runtime method. No
| `POST /api/agents/{id}/chat` | Auto-route | `runtime.inject_input()` or `runtime.trigger()` |
| `POST /api/agents/{id}/inject` | Send user input | `runtime.inject_input(node_id, content)` |
| `POST /api/agents/{id}/resume` | Resume session | `runtime.trigger()` with `session_state` |
| `POST /api/agents/{id}/stop` | Pause execution | Cancels the execution task |
| `POST /api/agents/{id}/pause` | Pause execution (resumable) | `stream.pause_execution()` → writes `status="paused"` to disk |
| `POST /api/agents/{id}/stop` | Terminate execution permanently | `stream.cancel_execution()` → writes `status="cancelled"` to disk |
| `POST /api/agents/{id}/replay` | Replay checkpoint | Checkpoint restore → `runtime.trigger()` |
| `GET /api/agents/{id}/goal-progress` | Goal progress | `runtime.get_goal_progress()` |

Expand Down