Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 69 additions & 29 deletions codec_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,52 @@ class StepBudgetExhausted(Exception):
"""Per-checkpoint step budget cap reached without checkpoint_done."""


def _build_correction_nudge(pv: "PermissionViolation",
action: Action,
agent_grants: Dict[str, Any],
global_grants: Dict[str, Any]) -> Optional[str]:
"""PR #35: build a single-shot correction string for the LLM when
it picks something outside the allowlist. Returns None for unknown
reasons (caller falls back to raise).

The string is appended to history.result so the next
_qwen_next_action call sees it as the most-recent step output, and
the model corrects itself instead of looping. We list the FULL
allowed set so the model has a closed-world choice — listing
nothing was the original PR #34 bug for skills; same logic applies
to paths and domains."""
reason = pv.reason
if reason == "skill_not_authorized":
allowed = sorted(set(agent_grants.get("skills", [])) |
set(global_grants.get("skills", [])))
return (f"<skill_error: '{action.skill}' is NOT in this agent's "
f"permission_manifest.skills. Allowed skills: "
f"{', '.join(allowed) or '(none)'}. Pick one of those "
f"instead.>")
if reason == "path_not_authorized":
allowed = sorted(set(agent_grants.get("write_paths", [])) |
set(global_grants.get("write_paths", [])))
return (f"<path_error: write to '{action.path}' is NOT under "
f"permission_manifest.write_paths. Allowed write_paths "
f"(glob patterns): {', '.join(allowed) or '(none)'}. "
f"Pick a path that matches one of those globs.>")
if reason == "read_path_not_authorized":
allowed = sorted(set(agent_grants.get("read_paths", [])) |
set(global_grants.get("read_paths", [])))
return (f"<read_path_error: read of '{action.read_path}' is NOT "
f"under permission_manifest.read_paths. Allowed read_paths "
f"(glob patterns): {', '.join(allowed) or '(none)'}. "
f"Pick a read path that matches one of those globs.>")
if reason == "domain_not_authorized":
allowed = sorted(set(agent_grants.get("network_domains", [])) |
set(global_grants.get("network_domains", [])))
return (f"<domain_error: '{action.network_domain}' is NOT in "
f"permission_manifest.network_domains. Allowed domains: "
f"{', '.join(allowed) or '(none)'}. Use one of those "
f"exact domains (no schema, no path).>")
return None


def _run_skill(skill_name: str, task: str, agent_id: str) -> str:
"""Lazy-imported codec_dispatch.run_skill. Step 1+2 hooks fire
automatically inside run_skill via run_with_hooks."""
Expand Down Expand Up @@ -365,39 +411,33 @@ def _execute_checkpoint(plan_dict: Dict[str, Any],
return history

# Permission gate (raises PermissionViolation if outside manifest).
# Phase 3.5 hotfix: if the LLM hallucinates a skill name (e.g.
# "fetch_url" instead of the real "web_fetch"), give it ONE retry
# with the corrected skill list as context. Most skill-hallucination
# errors recover with a single correction pass; only block on the
# SECOND consecutive miss. This dramatically reduces user-visible
# Phase 3.5 hotfix (PR #34 + #35): if the LLM hallucinates a skill
# name, write path, read path, or network domain, give it ONE retry
# with the corrected allowlist as context. Most hallucinations
# recover with a single correction pass; only block on the SECOND
# consecutive miss. This dramatically reduces user-visible
# blocked_on_permission events caused by LLM naming drift.
try:
permission_gate(action, agent_grants, global_grants)
except PermissionViolation as pv:
if pv.reason == "skill_not_authorized":
# Append the failed action to history with a correction nudge
# so the next _qwen_next_action call sees the error context.
allowed = sorted(set(agent_grants.get("skills", [])) |
set(global_grants.get("skills", [])))
history.append({
"step": len(history),
"skill": action.skill,
"task": action.task[:200],
"result": (f"<skill_error: '{action.skill}' is NOT in this "
f"agent's permission_manifest.skills. Allowed skills: "
f"{', '.join(allowed)}. Pick one of those instead.>"),
"is_destructive": False,
"_skill_correction_nudge": True,
})
# Re-call Qwen — if it still picks the wrong skill, fall through
# and the SECOND permission_gate call will raise normally.
action2 = _qwen_next_action(plan_dict, checkpoint, history)
if action2.kind == "checkpoint_done":
return history
permission_gate(action2, agent_grants, global_grants)
action = action2 # use the corrected action going forward
else:
raise # path / domain violations not auto-recoverable
nudge = _build_correction_nudge(pv, action, agent_grants, global_grants)
if nudge is None:
raise # unknown reason — fall through unchanged
history.append({
"step": len(history),
"skill": action.skill,
"task": action.task[:200],
"result": nudge,
"is_destructive": False,
"_skill_correction_nudge": True,
})
# Re-call Qwen — if it still picks something invalid, fall through
# and the SECOND permission_gate call will raise normally.
action2 = _qwen_next_action(plan_dict, checkpoint, history)
if action2.kind == "checkpoint_done":
return history
permission_gate(action2, agent_grants, global_grants)
action = action2 # use the corrected action going forward

# Destructive gate (raises DestructiveOpRejected on user reject)
if action.is_destructive:
Expand Down
137 changes: 137 additions & 0 deletions tests/test_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,3 +995,140 @@ def fake_next(*a, **k):
nudges = [h for h in history if h.get("_skill_correction_nudge")]
assert len(nudges) == 1
assert "fetch_url" in nudges[0]["skill"]


# ─────────────────────────────────────────────────────────────────────────────
# PR #35 — extend hallucination retry to path / read_path / domain (3 tests)
# ─────────────────────────────────────────────────────────────────────────────

def test_domain_hallucination_retries_with_corrected_domain_list(monkeypatch, temp_codec_dir):
"""When LLM picks an unauthorized network_domain (e.g. bare host instead
of the api.* one in the manifest), runner appends a correction nudge
listing allowed domains and re-calls Qwen. The corrected call runs."""
import codec_agent_runner as car

grants = {"skills": ["web_fetch"], "read_paths": [], "write_paths": [],
"network_domains": ["api.exchangerate-api.com"]}
global_grants = {"schema": 1, "version": 0,
"skills": [], "read_paths": [], "write_paths": [], "network_domains": []}
checkpoint = {"id": "cp1", "title": "t", "description": "d",
"expected_output": "o", "step_budget": 5}

actions = [
car.Action(skill="web_fetch", task="get rates", kind="skill_call",
is_destructive=False, network_call=True,
network_domain="exchangerate-api.com"), # bare host — not in manifest
car.Action(skill="web_fetch", task="get rates", kind="skill_call",
is_destructive=False, network_call=True,
network_domain="api.exchangerate-api.com"), # corrected
car.Action(skill="", task="", kind="checkpoint_done"),
]
idx = {"n": 0}
def fake_next(*a, **k):
out = actions[idx["n"]]
idx["n"] += 1
return out
monkeypatch.setattr(car, "_qwen_next_action", fake_next)
fake_run = MagicMock(return_value="USD/EUR=0.92")
monkeypatch.setattr(car, "_run_skill", fake_run)

history = car._execute_checkpoint(
plan_dict={"goals": ["g"]}, checkpoint=checkpoint,
agent_grants=grants, global_grants=global_grants,
agent_id="agent_test",
)

# Corrected web_fetch ran exactly once; bare-host attempt was rejected pre-execution
fake_run.assert_called_once_with("web_fetch", "get rates", "agent_test")
nudges = [h for h in history if h.get("_skill_correction_nudge")]
assert len(nudges) == 1
assert "domain_error" in nudges[0]["result"]
assert "api.exchangerate-api.com" in nudges[0]["result"]


def test_write_path_hallucination_retries_with_corrected_path_list(monkeypatch, temp_codec_dir):
"""When LLM picks an unauthorized write path, runner appends a
correction nudge listing allowed write_paths globs and re-calls Qwen."""
import codec_agent_runner as car

grants = {"skills": ["file_write"],
"read_paths": [], "write_paths": ["~/Documents/agents/**"],
"network_domains": []}
global_grants = {"schema": 1, "version": 0,
"skills": [], "read_paths": [], "write_paths": [], "network_domains": []}
checkpoint = {"id": "cp1", "title": "t", "description": "d",
"expected_output": "o", "step_budget": 5}

actions = [
car.Action(skill="file_write", task="write report", kind="skill_call",
is_destructive=False, touches_path=True,
path="/tmp/report.md"), # outside glob
car.Action(skill="file_write", task="write report", kind="skill_call",
is_destructive=False, touches_path=True,
path="~/Documents/agents/forex/report.md"), # corrected
car.Action(skill="", task="", kind="checkpoint_done"),
]
idx = {"n": 0}
def fake_next(*a, **k):
out = actions[idx["n"]]
idx["n"] += 1
return out
monkeypatch.setattr(car, "_qwen_next_action", fake_next)
fake_run = MagicMock(return_value="ok")
monkeypatch.setattr(car, "_run_skill", fake_run)

history = car._execute_checkpoint(
plan_dict={"goals": ["g"]}, checkpoint=checkpoint,
agent_grants=grants, global_grants=global_grants,
agent_id="agent_test",
)

fake_run.assert_called_once_with("file_write", "write report", "agent_test")
nudges = [h for h in history if h.get("_skill_correction_nudge")]
assert len(nudges) == 1
assert "path_error" in nudges[0]["result"]
assert "~/Documents/agents/**" in nudges[0]["result"]


def test_read_path_hallucination_retries_with_corrected_path_list(monkeypatch, temp_codec_dir):
"""When LLM picks an unauthorized read path, runner appends a
correction nudge listing allowed read_paths and re-calls Qwen."""
import codec_agent_runner as car

grants = {"skills": ["file_read"],
"read_paths": ["~/Documents/research/**"],
"write_paths": [], "network_domains": []}
global_grants = {"schema": 1, "version": 0,
"skills": [], "read_paths": [], "write_paths": [], "network_domains": []}
checkpoint = {"id": "cp1", "title": "t", "description": "d",
"expected_output": "o", "step_budget": 5}

actions = [
car.Action(skill="file_read", task="read", kind="skill_call",
is_destructive=False, reads_path=True,
read_path="/etc/passwd"), # outside allowlist
car.Action(skill="file_read", task="read", kind="skill_call",
is_destructive=False, reads_path=True,
read_path="~/Documents/research/notes.md"), # corrected
car.Action(skill="", task="", kind="checkpoint_done"),
]
idx = {"n": 0}
def fake_next(*a, **k):
out = actions[idx["n"]]
idx["n"] += 1
return out
monkeypatch.setattr(car, "_qwen_next_action", fake_next)
fake_run = MagicMock(return_value="content")
monkeypatch.setattr(car, "_run_skill", fake_run)

history = car._execute_checkpoint(
plan_dict={"goals": ["g"]}, checkpoint=checkpoint,
agent_grants=grants, global_grants=global_grants,
agent_id="agent_test",
)

fake_run.assert_called_once_with("file_read", "read", "agent_test")
nudges = [h for h in history if h.get("_skill_correction_nudge")]
assert len(nudges) == 1
assert "read_path_error" in nudges[0]["result"]
assert "~/Documents/research/**" in nudges[0]["result"]
Loading