Skip to content

Commit 06da8f1

Browse files
Mikarina13claude
andcommitted
test: A-12 inline-LLM-POST guard + OAuth scope-escalation coverage (Fix #10)
- tests/test_a12_invariant.py: fails if a new inline `.post(...chat/completions ...)` appears outside codec_llm. Allowlists the documented vision sites (dashboard, watcher, both bridges, screenshot_text) + codec_core's generated session-script string. Runs in CI via the full pytest suite (no separate workflow step needed). Includes a synthetic-violation test proving the guard is not a no-op. - tests/test_oauth_provider.py: regression tests for the refresh-token scope-escalation defense (requested scopes must be a subset of the original grant) — previously untested. permission_gate mutation cases are already covered by test_agent_runner.py (8 tests incl. D-5 traversal/symlink); not duplicated. The flaky time.sleep -> threading.Event refactor is deferred — no specific flaky test identified, and rewriting passing tests without evidence risks regressions. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent c465f99 commit 06da8f1

2 files changed

Lines changed: 129 additions & 0 deletions

File tree

tests/test_a12_invariant.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Fix #10: CI guard for the A-12 invariant.
2+
3+
A-12 (see AGENTS.md §2) routed every chat/completions *text* call site onto
4+
codec_llm. The only inline `requests.post(.../chat/completions)` calls left
5+
are vision sites (pending A-11 cleanup) and codec_core's generated session
6+
script. This guard fails if a NEW inline chat/completions POST appears
7+
anywhere else — i.e. someone bypassed codec_llm.
8+
9+
The detector matches the precise anti-pattern: a `.post(` whose first argument
10+
literally contains `chat/completions`. URL-in-a-variable callers (codec_llm,
11+
codec_vision) don't match and don't need allowlisting; that's intended — the
12+
guard targets the literal inline-POST shape that bypasses the canonical caller.
13+
"""
14+
import re
15+
from pathlib import Path
16+
17+
REPO = Path(__file__).resolve().parent.parent
18+
19+
# Files permitted to contain an inline `.post(...chat/completions...)`.
20+
# Every entry is a vision site (pending A-11 migration onto codec_vision) or
21+
# codec_core's build_session_script, which EMITS the call as a string into the
22+
# generated session script (not a live POST in codec_core itself).
23+
_ALLOWLIST = {
24+
"codec_dashboard.py", # screen-vision POSTs (A-11 pending)
25+
"codec_watcher.py", # screen-vision POST (A-11 pending)
26+
"codec_imessage.py", # bridge vision POST (A-11 pending)
27+
"codec_telegram.py", # bridge vision POST (A-11 pending)
28+
"skills/screenshot_text.py", # OCR vision POST (A-11 pending)
29+
"codec_core.py", # generated session-script string, not a live POST
30+
}
31+
32+
_INLINE_POST_RE = re.compile(r"\.post\s*\([^)]*chat/completions")
33+
_SKIP_PREFIXES = ("tests/", ".claude/", "scripts/")
34+
35+
36+
def _scan(root: Path) -> set:
37+
"""Return the set of repo-relative .py paths containing an inline
38+
chat/completions POST."""
39+
found = set()
40+
for p in root.rglob("*.py"):
41+
rel = p.relative_to(root).as_posix()
42+
if rel.startswith(_SKIP_PREFIXES) or "__pycache__" in rel:
43+
continue
44+
try:
45+
text = p.read_text(encoding="utf-8")
46+
except OSError:
47+
continue
48+
if _INLINE_POST_RE.search(text):
49+
found.add(rel)
50+
return found
51+
52+
53+
def test_no_new_inline_llm_post_outside_codec_llm():
54+
found = _scan(REPO)
55+
offenders = found - _ALLOWLIST
56+
assert not offenders, (
57+
"New inline chat/completions POST(s) outside codec_llm "
58+
f"(A-12 invariant violated): {sorted(offenders)}.\n"
59+
"Route LLM text calls through codec_llm.call/stream/acall/astream. "
60+
"If this is a legitimate vision site pending A-11, add it to the "
61+
"documented _ALLOWLIST in this test with a reason."
62+
)
63+
64+
65+
def test_a12_guard_actually_detects_a_violation(tmp_path):
66+
# Proves the detector is not a no-op: a synthetic rogue inline POST is found.
67+
(tmp_path / "rogue_skill.py").write_text(
68+
"import requests\n"
69+
"def run(t):\n"
70+
' return requests.post("http://127.0.0.1:8090/v1/chat/completions", json={}).text\n'
71+
)
72+
(tmp_path / "innocent.py").write_text("x = 1\n")
73+
found = _scan(tmp_path)
74+
assert "rogue_skill.py" in found, "guard failed to detect an inline chat/completions POST"
75+
assert "innocent.py" not in found, "guard false-positived on an unrelated file"

tests/test_oauth_provider.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,60 @@ def _counting_fsync(fd):
154154
assert mode == 0o600, f"fallback file must be 0600; got {oct(mode)}"
155155

156156

157+
# ── Fix #10: OAuth scope-escalation regression coverage ──────────────────────
158+
def test_refresh_rejects_scope_escalation(tmp_path):
159+
"""A refresh token issued for ['read'] must NOT be exchangeable for
160+
['read','write'] — exchange_refresh_token enforces requested ⊆ original.
161+
Pins the scope-escalation defense (codec_oauth_provider.py:244-249) so a
162+
future refactor that drops the subset check fails CI."""
163+
import time
164+
165+
from mcp.server.auth.provider import RefreshToken, TokenError
166+
167+
p = PersistentOAuthProvider(
168+
base_url="https://test.example.com",
169+
client_registration_options=ClientRegistrationOptions(enabled=True),
170+
state_path=tmp_path / "state.json",
171+
)
172+
client = _make_client("escal")
173+
asyncio.run(p.register_client(client))
174+
rt = RefreshToken(
175+
token="codec_rt_orig",
176+
client_id="escal",
177+
scopes=["read"],
178+
expires_at=int(time.time() + 3600),
179+
)
180+
with pytest.raises(TokenError) as exc:
181+
asyncio.run(p.exchange_refresh_token(client, rt, ["read", "write"]))
182+
assert "scope" in str(exc.value).lower()
183+
184+
185+
def test_refresh_allows_subset_scopes(tmp_path):
186+
"""A refresh for a SUBSET of the original scopes succeeds, and the new
187+
access token carries only the narrowed scopes (no silent widening)."""
188+
import time
189+
190+
from mcp.server.auth.provider import RefreshToken
191+
192+
p = PersistentOAuthProvider(
193+
base_url="https://test.example.com",
194+
client_registration_options=ClientRegistrationOptions(enabled=True),
195+
state_path=tmp_path / "state.json",
196+
)
197+
client = _make_client("narrow")
198+
asyncio.run(p.register_client(client))
199+
rt = RefreshToken(
200+
token="codec_rt_orig2",
201+
client_id="narrow",
202+
scopes=["read", "write"],
203+
expires_at=int(time.time() + 3600),
204+
)
205+
p.refresh_tokens[rt.token] = rt # register so the internal revoke is clean
206+
token = asyncio.run(p.exchange_refresh_token(client, rt, ["read"]))
207+
assert "read" in (token.scope or ""), "narrowed scope must include the requested scope"
208+
assert "write" not in (token.scope or ""), "new token must not carry the dropped scope"
209+
210+
157211
if __name__ == "__main__":
158212
import tempfile
159213
with tempfile.TemporaryDirectory() as d:

0 commit comments

Comments
 (0)