Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions codec_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ def permission_gate(action: Action, agent_grants: Dict[str, Any],
if action.skill not in skills:
raise PermissionViolation("skill_not_authorized", action.skill)

# Asymmetry: only write_paths is enforced here. PermissionManifest.read_paths
# is declared at approval time for user transparency ("this agent will read X")
# but not gated at runtime — Action's grammar has no read/write distinction
# (only `touches_path`, treated as a write), and skill-internal reads bypass
# the runner anyway. Runtime read enforcement would need a new Action field
# + LLM prompt update — out of scope for Step 9.
if action.touches_path:
write_paths = (set(agent_grants.get("write_paths", [])) |
set(global_grants.get("write_paths", [])))
Expand Down Expand Up @@ -378,21 +384,27 @@ def _atomic_set_status(agent_id: str, new_status: str,
log.warning("[%s] set_status %s failed: %s", agent_id, new_status, e)


def _run_agent(agent_id: str) -> None:
def _run_agent(agent_id: str, cid: Optional[str] = None) -> None:
"""The main per-agent thread function. Loads plan + grants,
verifies plan_hash, walks checkpoints via _execute_checkpoint,
persists state, emits audit events.

On any unhandled exception: atomic save status=aborted, log,
emit agent_aborted. Never propagates exceptions to caller (the
daemon's thread pool depends on this)."""
daemon's thread pool depends on this).

`cid` lets the daemon's crash-recovery path mint a single correlation_id,
emit AGENT_RESUMED under it, then chain all of this run's emits to the
same id (Step 1 §1.4 paired-cid contract). When None, generate fresh.
"""
from codec_agent_plan import (
load_plan, load_state, load_manifest, load_grants,
load_global_grants, save_state, save_manifest,
compute_plan_hash,
)

cid = secrets.token_hex(6)
if cid is None:
cid = secrets.token_hex(6)

try:
plan = load_plan(agent_id)
Expand Down Expand Up @@ -444,14 +456,18 @@ def _run_agent(agent_id: str) -> None:

# Walk checkpoints
history: List[Dict[str, Any]] = []
# Review fix I2: per-checkpoint step_budget overrides applied on resume
# after /extend_budget endpoint bumps the cap. Keys are checkpoint IDs.
budget_overrides = state.get("step_budget_overrides", {}) or {}
for idx, cp in enumerate(plan.checkpoints):
if idx < current_idx:
continue # resume: skip already-completed checkpoints
effective_budget = int(budget_overrides.get(cp.id, cp.step_budget))
cp_dict = {
"id": cp.id, "title": cp.title, "description": cp.description,
"skills_needed": cp.skills_needed,
"expected_output": cp.expected_output,
"step_budget": cp.step_budget,
"step_budget": effective_budget,
}

_audit(AGENT_CHECKPOINT_STARTED,
Expand Down Expand Up @@ -494,10 +510,16 @@ def _run_agent(agent_id: str) -> None:
extra={"agent_id": agent_id, "checkpoint_id": cp.id,
"reason": "destructive_consent_timeout"})
else:
_atomic_set_status(agent_id, "blocked_on_permission",
# Review fix I2: real budget hit → paused (not blocked_on_permission).
# User can resolve via POST /api/agents/{id}/extend_budget which
# writes step_budget_overrides[checkpoint_id] to state.json and
# transitions status=paused → running. The plan stays immutable
# (plan_hash tamper check remains intact); the override lives in
# mutable state.json.
_atomic_set_status(agent_id, "paused",
reason="step_budget_exhausted")
_audit(AGENT_BLOCKED_ON_PERMISSION,
message="step budget exhausted",
_audit(AGENT_PAUSED,
message="paused on step budget exhaustion",
correlation_id=cid, outcome="warning", level="warning",
extra={"agent_id": agent_id, "checkpoint_id": cp.id,
"reason": "step_budget_exhausted"})
Expand Down Expand Up @@ -633,13 +655,19 @@ def _daemon_one_tick() -> None:
with _threads_lock:
has_thread = agent_id in _active_threads and _active_threads[agent_id].is_alive()
if not has_thread and occupied < MAX_CONCURRENT:
# Mint cid here and propagate into _run_agent so AGENT_RESUMED
# chains with the agent_started/checkpoint/completed emits that
# follow (Step 1 §1.4 paired-cid contract; review I4).
recovery_cid = secrets.token_hex(6)
_atomic_set_status(agent_id, "crashed_resumed")
_audit(AGENT_RESUMED,
message=f"resumed {agent_id} after crash/restart",
correlation_id=recovery_cid,
extra={"agent_id": agent_id, "recovery": True})
# Transition to running and re-spawn
_atomic_set_status(agent_id, "running")
t = threading.Thread(target=_run_agent, args=(agent_id,), daemon=True,
t = threading.Thread(target=_run_agent, args=(agent_id,),
kwargs={"cid": recovery_cid}, daemon=True,
name=f"agent-{agent_id}")
t.start()
with _threads_lock:
Expand Down
7 changes: 6 additions & 1 deletion codec_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ def _check_one_service(name: str, url: str) -> tuple:


def check_system_health():
"""Verify all CODEC services are running (checks run in parallel)."""
"""Verify all CODEC services are running (checks run in parallel).

Only HTTP-exposing services are probed here. PM2-supervised daemons
(codec-observer, codec-agent-runner) rely on PM2's autorestart for
crash recovery — see AGENTS.md §3 "Background Execution".
"""
services = {
"LLM": "http://localhost:8083/v1/models",
"Whisper": "http://localhost:8084/health",
Expand Down
60 changes: 60 additions & 0 deletions routes/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,63 @@ def grant_permission(agent_id: str, body: GrantBody):

return {"agent_id": agent_id, "grants": grants,
"status": _cap.load_manifest(agent_id).get("status")}


# ── Phase 3 Step 9 review fix I2 — extend step_budget for paused agents ────
class ExtendBudgetBody(BaseModel):
additional_steps: int = Field(..., ge=1, le=100)


@router.post("/api/agents/{agent_id}/extend_budget")
def extend_budget(agent_id: str, body: ExtendBudgetBody):
"""Bump the current checkpoint's step_budget for an agent paused on
step_budget_exhausted. Writes step_budget_overrides[checkpoint_id]
in state.json (mutable; does NOT modify plan.json so plan_hash
tamper check stays intact). Transitions paused → running so the
daemon respawns the thread on its next tick.

409 if status != paused or status_reason != step_budget_exhausted.
Body: {"additional_steps": int} where 1 <= int <= 100.
"""
manifest = _cap.load_manifest(agent_id)
if not manifest:
raise HTTPException(status_code=404, detail=f"agent {agent_id} not found")

status = manifest.get("status", "")
reason = manifest.get("status_reason", "")
if status != "paused" or reason != "step_budget_exhausted":
raise HTTPException(
status_code=409,
detail=f"agent must be paused with reason=step_budget_exhausted "
f"(currently status={status!r}, reason={reason!r})",
)

plan = _cap.load_plan(agent_id)
if plan is None:
raise HTTPException(status_code=409, detail="agent has no plan")
state = _cap.load_state(agent_id)
current_idx = int(state.get("current_checkpoint", 0))
if current_idx >= len(plan.checkpoints):
raise HTTPException(status_code=409, detail="agent has no current checkpoint")

cp = plan.checkpoints[current_idx]
overrides = state.get("step_budget_overrides", {}) or {}
base = int(overrides.get(cp.id, cp.step_budget))
new_budget = base + int(body.additional_steps)
overrides[cp.id] = new_budget
state["step_budget_overrides"] = overrides
_cap.save_state(agent_id, state)

try:
_cap.set_status(agent_id, "running")
except _cap.InvalidStatusTransition as e:
raise HTTPException(status_code=409, detail=str(e))

return {
"agent_id": agent_id,
"checkpoint_id": cp.id,
"previous_budget": base,
"new_budget": new_budget,
"additional_steps": int(body.additional_steps),
"status": "running",
}
133 changes: 130 additions & 3 deletions tests/test_agent_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Phase 3 Step 9 tests — codec_agent_runner.

31 tests covering: audit constants, state machine, permission gate,
32 tests covering: audit constants, state machine, permission gate,
Action dataclass, qwen next-action driver, strict-consent integration,
checkpoint executor, run_agent paths, daemon outer loop, multi-agent
concurrency, resume-after-restart, plan-hash tamper, PWA endpoints.
Expand Down Expand Up @@ -76,7 +76,7 @@ def test_step9_state_transitions_extend_valid_map():


# ─────────────────────────────────────────────────────────────────────────────
# Task 3 — PermissionViolation + permission_gate (4 tests)
# Task 3 — PermissionViolation + permission_gate (5 tests)
# ─────────────────────────────────────────────────────────────────────────────

@pytest.fixture
Expand Down Expand Up @@ -140,6 +140,17 @@ def test_permission_gate_allows_via_global_allowlist(basic_grants):
permission_gate(action, basic_grants, global_grants)


def test_permission_gate_blocks_domain_not_in_grants(basic_grants, empty_global_grants):
from codec_agent_runner import permission_gate, Action, PermissionViolation
action = Action(skill="weather", task="x",
is_destructive=False, network_call=True,
network_domain="evil.com", touches_path=False)
with pytest.raises(PermissionViolation) as exc:
permission_gate(action, basic_grants, empty_global_grants)
assert exc.value.reason == "domain_not_authorized"
assert exc.value.needed == "evil.com"


# ─────────────────────────────────────────────────────────────────────────────
# Task 4 — Qwen-3.6 next-action driver (3 tests)
# ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -618,7 +629,8 @@ def test_daemon_resumes_after_pm2_restart(monkeypatch, temp_codec_dir):
"status": "running", "title": "x"})

spawned: List[str] = []
monkeypatch.setattr(car, "_run_agent", lambda a: spawned.append(a))
# Crash-recovery path passes cid kwarg into the thread (review I4)
monkeypatch.setattr(car, "_run_agent", lambda a, cid=None: spawned.append(a))
# Mark NO active thread for "crashed" — simulating fresh PM2 boot
monkeypatch.setattr(car, "_active_threads", {})

Expand All @@ -632,6 +644,45 @@ def test_daemon_resumes_after_pm2_restart(monkeypatch, temp_codec_dir):
assert m["status"] in ("crashed_resumed", "running", "completed", "aborted")


def test_daemon_resume_emits_correlation_id_matching_run_agent(monkeypatch, temp_codec_dir):
"""Step 1 §1.4 paired-cid contract: AGENT_RESUMED on crash recovery must
share its correlation_id with the _run_agent operation that follows, so
the resume event chains with subsequent agent_started / agent_checkpoint_*
/ agent_completed emits instead of being orphaned in the audit log.

Phase 3 Step 9 review I4 fix.
"""
import codec_agent_runner as car
import codec_agent_plan as cap

cap.save_manifest("crashed", {"agent_id": "crashed",
"status": "running", "title": "x"})

audit_calls: List[dict] = []
def fake_audit(event, source="codec-agent-runner", message="",
correlation_id="", outcome="ok", level="info", extra=None):
audit_calls.append({"event": event, "correlation_id": correlation_id})
monkeypatch.setattr(car, "_audit", fake_audit)

received_cid: List[str] = []
def fake_run_agent(agent_id, cid=None):
received_cid.append(cid or "")
monkeypatch.setattr(car, "_run_agent", fake_run_agent)
monkeypatch.setattr(car, "_active_threads", {})

car._daemon_one_tick()
time.sleep(0.3)

resumed = [c for c in audit_calls if c["event"] == car.AGENT_RESUMED]
assert len(resumed) == 1, f"expected exactly 1 AGENT_RESUMED emit, got {len(resumed)}"
resume_cid = resumed[0]["correlation_id"]
assert resume_cid, ("AGENT_RESUMED emitted without correlation_id "
"(Step 1 §1.4 paired-cid contract violation)")
assert received_cid == [resume_cid], (
f"_run_agent must receive the resume cid to chain emits: "
f"got {received_cid}, expected [{resume_cid}]")


def test_daemon_global_kill_switch(monkeypatch, temp_codec_dir):
"""AGENT_RUNNER_ENABLED=false → daemon idles even with approved agents."""
import codec_agent_runner as car
Expand Down Expand Up @@ -731,3 +782,79 @@ def test_post_api_agents_404_for_unknown_id(temp_codec_dir):

r = client.post("/api/agents/nonexistent/abort")
assert r.status_code == 404


# ─────────────────────────────────────────────────────────────────────────────
# Review fix I2 — paused on step_budget + /extend_budget endpoint (3 tests)
# ─────────────────────────────────────────────────────────────────────────────

def test_run_agent_step_budget_exhausted_pauses_not_blocks(monkeypatch, temp_codec_dir):
"""Real budget hit (not destructive_consent_timeout) → status=paused
with reason=step_budget_exhausted (was: blocked_on_permission)."""
import codec_agent_runner as car
import codec_agent_plan as cap
_setup_approved_agent(temp_codec_dir, monkeypatch, num_checkpoints=1)

# Always return skill_call (never checkpoint_done) → budget exhausts
monkeypatch.setattr(car, "_qwen_next_action", lambda *a, **k:
car.Action(skill="weather", task="loop", kind="skill_call",
is_destructive=False, network_call=False, touches_path=False))
monkeypatch.setattr(car, "_run_skill", MagicMock(return_value="r"))

car._run_agent("test_agent")

m = cap.load_manifest("test_agent")
assert m["status"] == "paused"
assert m["status_reason"] == "step_budget_exhausted"


def test_extend_budget_endpoint_bumps_and_resumes(monkeypatch, temp_codec_dir):
"""POST /api/agents/{id}/extend_budget writes state.json override,
transitions paused → running."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
import codec_agent_plan as cap
_setup_approved_agent(temp_codec_dir, monkeypatch, num_checkpoints=1)

# Set up paused-on-budget state
cap.save_manifest("test_agent", {
**cap.load_manifest("test_agent"),
"status": "paused",
"status_reason": "step_budget_exhausted",
})

from routes.agents import router
app = FastAPI()
app.include_router(router)
client = TestClient(app)

r = client.post("/api/agents/test_agent/extend_budget",
json={"additional_steps": 20})
assert r.status_code == 200
body = r.json()
assert body["status"] == "running"
assert body["additional_steps"] == 20

# State.json has the override
state = cap.load_state("test_agent")
cp_id = cap.load_plan("test_agent").checkpoints[0].id
assert state["step_budget_overrides"][cp_id] >= 25 # base 5 + 20


def test_extend_budget_endpoint_409_when_not_paused_on_budget(temp_codec_dir):
"""409 if status != paused or status_reason != step_budget_exhausted."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
import codec_agent_plan as cap

cap.save_manifest("a1", {"agent_id": "a1", "status": "running",
"title": "x"})

from routes.agents import router
app = FastAPI()
app.include_router(router)
client = TestClient(app)

r = client.post("/api/agents/a1/extend_budget",
json={"additional_steps": 10})
assert r.status_code == 409
Loading