diff --git a/codec_agent_runner.py b/codec_agent_runner.py index ac2fb57..7427cde 100644 --- a/codec_agent_runner.py +++ b/codec_agent_runner.py @@ -97,6 +97,12 @@ def permission_gate(action: Action, agent_grants: Dict[str, Any], if action.skill not in skills: raise PermissionViolation("skill_not_authorized", action.skill) + # Asymmetry: only write_paths is enforced here. PermissionManifest.read_paths + # is declared at approval time for user transparency ("this agent will read X") + # but not gated at runtime — Action's grammar has no read/write distinction + # (only `touches_path`, treated as a write), and skill-internal reads bypass + # the runner anyway. Runtime read enforcement would need a new Action field + # + LLM prompt update — out of scope for Step 9. if action.touches_path: write_paths = (set(agent_grants.get("write_paths", [])) | set(global_grants.get("write_paths", []))) @@ -378,21 +384,27 @@ def _atomic_set_status(agent_id: str, new_status: str, log.warning("[%s] set_status %s failed: %s", agent_id, new_status, e) -def _run_agent(agent_id: str) -> None: +def _run_agent(agent_id: str, cid: Optional[str] = None) -> None: """The main per-agent thread function. Loads plan + grants, verifies plan_hash, walks checkpoints via _execute_checkpoint, persists state, emits audit events. On any unhandled exception: atomic save status=aborted, log, emit agent_aborted. Never propagates exceptions to caller (the - daemon's thread pool depends on this).""" + daemon's thread pool depends on this). + + `cid` lets the daemon's crash-recovery path mint a single correlation_id, + emit AGENT_RESUMED under it, then chain all of this run's emits to the + same id (Step 1 §1.4 paired-cid contract). When None, generate fresh. + """ from codec_agent_plan import ( load_plan, load_state, load_manifest, load_grants, load_global_grants, save_state, save_manifest, compute_plan_hash, ) - cid = secrets.token_hex(6) + if cid is None: + cid = secrets.token_hex(6) try: plan = load_plan(agent_id) @@ -444,14 +456,18 @@ def _run_agent(agent_id: str) -> None: # Walk checkpoints history: List[Dict[str, Any]] = [] + # Review fix I2: per-checkpoint step_budget overrides applied on resume + # after /extend_budget endpoint bumps the cap. Keys are checkpoint IDs. + budget_overrides = state.get("step_budget_overrides", {}) or {} for idx, cp in enumerate(plan.checkpoints): if idx < current_idx: continue # resume: skip already-completed checkpoints + effective_budget = int(budget_overrides.get(cp.id, cp.step_budget)) cp_dict = { "id": cp.id, "title": cp.title, "description": cp.description, "skills_needed": cp.skills_needed, "expected_output": cp.expected_output, - "step_budget": cp.step_budget, + "step_budget": effective_budget, } _audit(AGENT_CHECKPOINT_STARTED, @@ -494,10 +510,16 @@ def _run_agent(agent_id: str) -> None: extra={"agent_id": agent_id, "checkpoint_id": cp.id, "reason": "destructive_consent_timeout"}) else: - _atomic_set_status(agent_id, "blocked_on_permission", + # Review fix I2: real budget hit → paused (not blocked_on_permission). + # User can resolve via POST /api/agents/{id}/extend_budget which + # writes step_budget_overrides[checkpoint_id] to state.json and + # transitions status=paused → running. The plan stays immutable + # (plan_hash tamper check remains intact); the override lives in + # mutable state.json. + _atomic_set_status(agent_id, "paused", reason="step_budget_exhausted") - _audit(AGENT_BLOCKED_ON_PERMISSION, - message="step budget exhausted", + _audit(AGENT_PAUSED, + message="paused on step budget exhaustion", correlation_id=cid, outcome="warning", level="warning", extra={"agent_id": agent_id, "checkpoint_id": cp.id, "reason": "step_budget_exhausted"}) @@ -633,13 +655,19 @@ def _daemon_one_tick() -> None: with _threads_lock: has_thread = agent_id in _active_threads and _active_threads[agent_id].is_alive() if not has_thread and occupied < MAX_CONCURRENT: + # Mint cid here and propagate into _run_agent so AGENT_RESUMED + # chains with the agent_started/checkpoint/completed emits that + # follow (Step 1 §1.4 paired-cid contract; review I4). + recovery_cid = secrets.token_hex(6) _atomic_set_status(agent_id, "crashed_resumed") _audit(AGENT_RESUMED, message=f"resumed {agent_id} after crash/restart", + correlation_id=recovery_cid, extra={"agent_id": agent_id, "recovery": True}) # Transition to running and re-spawn _atomic_set_status(agent_id, "running") - t = threading.Thread(target=_run_agent, args=(agent_id,), daemon=True, + t = threading.Thread(target=_run_agent, args=(agent_id,), + kwargs={"cid": recovery_cid}, daemon=True, name=f"agent-{agent_id}") t.start() with _threads_lock: diff --git a/codec_heartbeat.py b/codec_heartbeat.py index 940974b..fb6a004 100644 --- a/codec_heartbeat.py +++ b/codec_heartbeat.py @@ -59,7 +59,12 @@ def _check_one_service(name: str, url: str) -> tuple: def check_system_health(): - """Verify all CODEC services are running (checks run in parallel).""" + """Verify all CODEC services are running (checks run in parallel). + + Only HTTP-exposing services are probed here. PM2-supervised daemons + (codec-observer, codec-agent-runner) rely on PM2's autorestart for + crash recovery — see AGENTS.md §3 "Background Execution". + """ services = { "LLM": "http://localhost:8083/v1/models", "Whisper": "http://localhost:8084/health", diff --git a/routes/agents.py b/routes/agents.py index 904d729..88f9001 100644 --- a/routes/agents.py +++ b/routes/agents.py @@ -459,3 +459,63 @@ def grant_permission(agent_id: str, body: GrantBody): return {"agent_id": agent_id, "grants": grants, "status": _cap.load_manifest(agent_id).get("status")} + + +# ── Phase 3 Step 9 review fix I2 — extend step_budget for paused agents ──── +class ExtendBudgetBody(BaseModel): + additional_steps: int = Field(..., ge=1, le=100) + + +@router.post("/api/agents/{agent_id}/extend_budget") +def extend_budget(agent_id: str, body: ExtendBudgetBody): + """Bump the current checkpoint's step_budget for an agent paused on + step_budget_exhausted. Writes step_budget_overrides[checkpoint_id] + in state.json (mutable; does NOT modify plan.json so plan_hash + tamper check stays intact). Transitions paused → running so the + daemon respawns the thread on its next tick. + + 409 if status != paused or status_reason != step_budget_exhausted. + Body: {"additional_steps": int} where 1 <= int <= 100. + """ + manifest = _cap.load_manifest(agent_id) + if not manifest: + raise HTTPException(status_code=404, detail=f"agent {agent_id} not found") + + status = manifest.get("status", "") + reason = manifest.get("status_reason", "") + if status != "paused" or reason != "step_budget_exhausted": + raise HTTPException( + status_code=409, + detail=f"agent must be paused with reason=step_budget_exhausted " + f"(currently status={status!r}, reason={reason!r})", + ) + + plan = _cap.load_plan(agent_id) + if plan is None: + raise HTTPException(status_code=409, detail="agent has no plan") + state = _cap.load_state(agent_id) + current_idx = int(state.get("current_checkpoint", 0)) + if current_idx >= len(plan.checkpoints): + raise HTTPException(status_code=409, detail="agent has no current checkpoint") + + cp = plan.checkpoints[current_idx] + overrides = state.get("step_budget_overrides", {}) or {} + base = int(overrides.get(cp.id, cp.step_budget)) + new_budget = base + int(body.additional_steps) + overrides[cp.id] = new_budget + state["step_budget_overrides"] = overrides + _cap.save_state(agent_id, state) + + try: + _cap.set_status(agent_id, "running") + except _cap.InvalidStatusTransition as e: + raise HTTPException(status_code=409, detail=str(e)) + + return { + "agent_id": agent_id, + "checkpoint_id": cp.id, + "previous_budget": base, + "new_budget": new_budget, + "additional_steps": int(body.additional_steps), + "status": "running", + } diff --git a/tests/test_agent_runner.py b/tests/test_agent_runner.py index 7798c41..52a986e 100644 --- a/tests/test_agent_runner.py +++ b/tests/test_agent_runner.py @@ -1,6 +1,6 @@ """Phase 3 Step 9 tests — codec_agent_runner. -31 tests covering: audit constants, state machine, permission gate, +32 tests covering: audit constants, state machine, permission gate, Action dataclass, qwen next-action driver, strict-consent integration, checkpoint executor, run_agent paths, daemon outer loop, multi-agent concurrency, resume-after-restart, plan-hash tamper, PWA endpoints. @@ -76,7 +76,7 @@ def test_step9_state_transitions_extend_valid_map(): # ───────────────────────────────────────────────────────────────────────────── -# Task 3 — PermissionViolation + permission_gate (4 tests) +# Task 3 — PermissionViolation + permission_gate (5 tests) # ───────────────────────────────────────────────────────────────────────────── @pytest.fixture @@ -140,6 +140,17 @@ def test_permission_gate_allows_via_global_allowlist(basic_grants): permission_gate(action, basic_grants, global_grants) +def test_permission_gate_blocks_domain_not_in_grants(basic_grants, empty_global_grants): + from codec_agent_runner import permission_gate, Action, PermissionViolation + action = Action(skill="weather", task="x", + is_destructive=False, network_call=True, + network_domain="evil.com", touches_path=False) + with pytest.raises(PermissionViolation) as exc: + permission_gate(action, basic_grants, empty_global_grants) + assert exc.value.reason == "domain_not_authorized" + assert exc.value.needed == "evil.com" + + # ───────────────────────────────────────────────────────────────────────────── # Task 4 — Qwen-3.6 next-action driver (3 tests) # ───────────────────────────────────────────────────────────────────────────── @@ -618,7 +629,8 @@ def test_daemon_resumes_after_pm2_restart(monkeypatch, temp_codec_dir): "status": "running", "title": "x"}) spawned: List[str] = [] - monkeypatch.setattr(car, "_run_agent", lambda a: spawned.append(a)) + # Crash-recovery path passes cid kwarg into the thread (review I4) + monkeypatch.setattr(car, "_run_agent", lambda a, cid=None: spawned.append(a)) # Mark NO active thread for "crashed" — simulating fresh PM2 boot monkeypatch.setattr(car, "_active_threads", {}) @@ -632,6 +644,45 @@ def test_daemon_resumes_after_pm2_restart(monkeypatch, temp_codec_dir): assert m["status"] in ("crashed_resumed", "running", "completed", "aborted") +def test_daemon_resume_emits_correlation_id_matching_run_agent(monkeypatch, temp_codec_dir): + """Step 1 §1.4 paired-cid contract: AGENT_RESUMED on crash recovery must + share its correlation_id with the _run_agent operation that follows, so + the resume event chains with subsequent agent_started / agent_checkpoint_* + / agent_completed emits instead of being orphaned in the audit log. + + Phase 3 Step 9 review I4 fix. + """ + import codec_agent_runner as car + import codec_agent_plan as cap + + cap.save_manifest("crashed", {"agent_id": "crashed", + "status": "running", "title": "x"}) + + audit_calls: List[dict] = [] + def fake_audit(event, source="codec-agent-runner", message="", + correlation_id="", outcome="ok", level="info", extra=None): + audit_calls.append({"event": event, "correlation_id": correlation_id}) + monkeypatch.setattr(car, "_audit", fake_audit) + + received_cid: List[str] = [] + def fake_run_agent(agent_id, cid=None): + received_cid.append(cid or "") + monkeypatch.setattr(car, "_run_agent", fake_run_agent) + monkeypatch.setattr(car, "_active_threads", {}) + + car._daemon_one_tick() + time.sleep(0.3) + + resumed = [c for c in audit_calls if c["event"] == car.AGENT_RESUMED] + assert len(resumed) == 1, f"expected exactly 1 AGENT_RESUMED emit, got {len(resumed)}" + resume_cid = resumed[0]["correlation_id"] + assert resume_cid, ("AGENT_RESUMED emitted without correlation_id " + "(Step 1 §1.4 paired-cid contract violation)") + assert received_cid == [resume_cid], ( + f"_run_agent must receive the resume cid to chain emits: " + f"got {received_cid}, expected [{resume_cid}]") + + def test_daemon_global_kill_switch(monkeypatch, temp_codec_dir): """AGENT_RUNNER_ENABLED=false → daemon idles even with approved agents.""" import codec_agent_runner as car @@ -731,3 +782,79 @@ def test_post_api_agents_404_for_unknown_id(temp_codec_dir): r = client.post("/api/agents/nonexistent/abort") assert r.status_code == 404 + + +# ───────────────────────────────────────────────────────────────────────────── +# Review fix I2 — paused on step_budget + /extend_budget endpoint (3 tests) +# ───────────────────────────────────────────────────────────────────────────── + +def test_run_agent_step_budget_exhausted_pauses_not_blocks(monkeypatch, temp_codec_dir): + """Real budget hit (not destructive_consent_timeout) → status=paused + with reason=step_budget_exhausted (was: blocked_on_permission).""" + import codec_agent_runner as car + import codec_agent_plan as cap + _setup_approved_agent(temp_codec_dir, monkeypatch, num_checkpoints=1) + + # Always return skill_call (never checkpoint_done) → budget exhausts + monkeypatch.setattr(car, "_qwen_next_action", lambda *a, **k: + car.Action(skill="weather", task="loop", kind="skill_call", + is_destructive=False, network_call=False, touches_path=False)) + monkeypatch.setattr(car, "_run_skill", MagicMock(return_value="r")) + + car._run_agent("test_agent") + + m = cap.load_manifest("test_agent") + assert m["status"] == "paused" + assert m["status_reason"] == "step_budget_exhausted" + + +def test_extend_budget_endpoint_bumps_and_resumes(monkeypatch, temp_codec_dir): + """POST /api/agents/{id}/extend_budget writes state.json override, + transitions paused → running.""" + from fastapi.testclient import TestClient + from fastapi import FastAPI + import codec_agent_plan as cap + _setup_approved_agent(temp_codec_dir, monkeypatch, num_checkpoints=1) + + # Set up paused-on-budget state + cap.save_manifest("test_agent", { + **cap.load_manifest("test_agent"), + "status": "paused", + "status_reason": "step_budget_exhausted", + }) + + from routes.agents import router + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + r = client.post("/api/agents/test_agent/extend_budget", + json={"additional_steps": 20}) + assert r.status_code == 200 + body = r.json() + assert body["status"] == "running" + assert body["additional_steps"] == 20 + + # State.json has the override + state = cap.load_state("test_agent") + cp_id = cap.load_plan("test_agent").checkpoints[0].id + assert state["step_budget_overrides"][cp_id] >= 25 # base 5 + 20 + + +def test_extend_budget_endpoint_409_when_not_paused_on_budget(temp_codec_dir): + """409 if status != paused or status_reason != step_budget_exhausted.""" + from fastapi.testclient import TestClient + from fastapi import FastAPI + import codec_agent_plan as cap + + cap.save_manifest("a1", {"agent_id": "a1", "status": "running", + "title": "x"}) + + from routes.agents import router + app = FastAPI() + app.include_router(router) + client = TestClient(app) + + r = client.post("/api/agents/a1/extend_budget", + json={"additional_steps": 10}) + assert r.status_code == 409