From a20ae2edc835055803b0d845f1cebc80b6187955 Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 18:14:28 -0400 Subject: [PATCH 01/11] fix(guard): page cloud command request snapshots --- .../guard/runtime/command_executors.py | 59 +++++++++++++++++-- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/src/codex_plugin_scanner/guard/runtime/command_executors.py b/src/codex_plugin_scanner/guard/runtime/command_executors.py index d92283c85..686062c89 100644 --- a/src/codex_plugin_scanner/guard/runtime/command_executors.py +++ b/src/codex_plugin_scanner/guard/runtime/command_executors.py @@ -2,6 +2,8 @@ from __future__ import annotations +import base64 +import json import tempfile from collections.abc import Callable from datetime import datetime, timezone @@ -47,6 +49,7 @@ _GUARD_REVIEW_MEMORY_REGISTRY_SYNC_KEY = "guard_review_memory_registry" _GUARD_REVIEW_MEMORY_VERSION_SYNC_KEY = "guard_review_memory_policy_version" _GUARD_REVIEW_MEMORY_ACK_SYNC_KEY = "guard_review_memory_last_ack" +_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY = "guard_command_local_request_snapshot_cursor" PACKAGE_SHIM_OPERATIONS: tuple[str, ...] = ( "guard.packageShims.status", @@ -69,8 +72,8 @@ ) SUPPORTED_COMMAND_OPERATIONS: tuple[str, ...] = (*PACKAGE_SHIM_OPERATIONS, *APP_OPERATIONS, *APPROVAL_OPERATIONS) COMMAND_OPERATION_SCHEMA_VERSIONS: dict[str, int] = {operation: 1 for operation in SUPPORTED_COMMAND_OPERATIONS} -LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = 10_000 -LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = 500 +LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = 125 +LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = 25 def execute_guard_command_job( @@ -467,7 +470,16 @@ def _local_request_snapshot_items_for_status( oauth = guard_review_oauth_metadata(store) except GuardReviewContractError: oauth = None - rows = store.list_approval_requests(status=status, limit=limit + 1) + cursor_state = _local_request_snapshot_cursor_state(store) + cursor = cursor_state.get(status) + rows = store.list_approval_requests( + status=status, + limit=limit + 1, + cursor=cursor if isinstance(cursor, str) and cursor else None, + ) + if not rows and isinstance(cursor, str) and cursor: + cursor = None + rows = store.list_approval_requests(status=status, limit=limit + 1) for item in rows[:limit]: request_id = item.get("request_id") if not isinstance(request_id, str) or not request_id: @@ -500,7 +512,46 @@ def _local_request_snapshot_items_for_status( "resolvedAt": str(resolved_at) if isinstance(resolved_at, str) and resolved_at else None, } ) - return items, len(rows) <= limit + if rows: + cursor_state[status] = _local_request_snapshot_next_cursor(rows, limit) + _save_local_request_snapshot_cursor_state(store, cursor_state) + return items, cursor is None and len(rows) <= limit + + +def _local_request_snapshot_cursor_state(store: GuardStore) -> dict[str, object]: + value = store.get_sync_payload(_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY) + return dict(value) if isinstance(value, dict) else {} + + +def _save_local_request_snapshot_cursor_state( + store: GuardStore, + state: dict[str, object], +) -> None: + cleaned = { + key: value + for key, value in state.items() + if key in {"pending", "resolved"} and isinstance(value, str) and value + } + store.set_sync_payload(_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY, cleaned, _now()) + + +def _local_request_snapshot_next_cursor( + rows: list[dict[str, object]], + limit: int, +) -> str | None: + if len(rows) <= limit: + return None + last_item = rows[limit - 1] + payload = { + "last_seen_at": str(last_item.get("last_seen_at") or last_item.get("created_at") or ""), + "request_id": str(last_item.get("request_id") or ""), + } + if not payload["last_seen_at"] or not payload["request_id"]: + return None + encoded = base64.urlsafe_b64encode( + json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8"), + ).decode("ascii") + return encoded.rstrip("=") def _resolve_cloud_receipt_redaction_level(store: GuardStore) -> str: From d5522753086e818436e4f7a9547deed4ea1bd45d Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 18:15:31 -0400 Subject: [PATCH 02/11] test(guard): cover paged command snapshots --- tests/test_guard_command_snapshot_paging.py | 90 +++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 tests/test_guard_command_snapshot_paging.py diff --git a/tests/test_guard_command_snapshot_paging.py b/tests/test_guard_command_snapshot_paging.py new file mode 100644 index 000000000..8c32a7574 --- /dev/null +++ b/tests/test_guard_command_snapshot_paging.py @@ -0,0 +1,90 @@ +"""Regression coverage for Cloud command local request snapshot paging.""" + +from __future__ import annotations + +import base64 +import json +from pathlib import Path + +from codex_plugin_scanner.guard.runtime import command_executors + + +class PagingStore: + def __init__(self, guard_home: Path) -> None: + self.guard_home = guard_home + self.payloads: dict[str, object] = {} + + def get_sync_payload(self, key: str) -> object | None: + return self.payloads.get(key) + + def set_sync_payload(self, key: str, payload: object, now: str) -> None: + del now + self.payloads[key] = payload + + def get_oauth_local_credentials(self, *, allow_primary: bool = False) -> dict[str, object]: + del allow_primary + return {} + + def list_approval_requests( + self, + *, + status: str | None = "pending", + harness: str | None = None, + limit: int | None = 50, + cursor: str | None = None, + search: str | None = None, + ) -> list[dict[str, object]]: + del harness, search + if status != "pending": + return [] + rows = [_approval_request_row(index) for index in range(130)] + if cursor: + padded = cursor + ("=" * (-len(cursor) % 4)) + decoded = json.loads(base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")) + marker_last_seen = decoded["last_seen_at"] + marker_request_id = decoded["request_id"] + rows = [ + row + for row in rows + if ( + str(row["last_seen_at"]) < marker_last_seen + or (str(row["last_seen_at"]) == marker_last_seen and str(row["request_id"]) < marker_request_id) + ) + ] + return rows if limit is None else rows[:limit] + + +def _approval_request_row(index: int) -> dict[str, object]: + return { + "request_id": f"req-pending-{index:03d}", + "status": "pending", + "harness": "codex", + "artifact_id": f"artifact-{index:03d}", + "artifact_hash": "b" * 64, + "policy_action": "require-reapproval", + "recommended_scope": "artifact", + "created_at": "2026-05-14T11:58:00.000Z", + "last_seen_at": f"2026-05-14T11:{59 - (index // 10):02d}:{59 - (index % 10):02d}.000Z", + "queue_group_id": "queue-group-1", + "action_envelope_json": { + "action_type": "shell_command", + "command": "npm install minimist@1.2.8", + "tool_name": "Bash", + }, + } + + +def test_local_request_snapshot_pages_large_pending_backlog(tmp_path: Path) -> None: + store = PagingStore(tmp_path / "guard-home") + + first_payload = command_executors._local_request_snapshot_payload(store) + second_payload = command_executors._local_request_snapshot_payload(store) + + assert first_payload["pendingComplete"] is False + assert first_payload["pendingCount"] == command_executors.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT + assert first_payload["requests"][0]["localRequestId"] == "req-pending-000" + assert first_payload["requests"][-1]["localRequestId"] == "req-pending-124" + assert second_payload["pendingComplete"] is False + assert second_payload["pendingCount"] == 5 + assert second_payload["requests"][0]["localRequestId"] == "req-pending-125" + assert second_payload["requests"][-1]["localRequestId"] == "req-pending-129" From f47c41d7a852d3728e53c73240daa325de1ca421 Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 18:18:44 -0400 Subject: [PATCH 03/11] test(guard): cover paged command snapshots --- tests/test_guard_command_snapshot_paging.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_guard_command_snapshot_paging.py b/tests/test_guard_command_snapshot_paging.py index 8c32a7574..9b86034ab 100644 --- a/tests/test_guard_command_snapshot_paging.py +++ b/tests/test_guard_command_snapshot_paging.py @@ -21,9 +21,9 @@ def set_sync_payload(self, key: str, payload: object, now: str) -> None: del now self.payloads[key] = payload - def get_oauth_local_credentials(self, *, allow_primary: bool = False) -> dict[str, object]: + def get_oauth_local_credentials(self, *, allow_primary: bool = False) -> object | None: del allow_primary - return {} + return None def list_approval_requests( self, From b112fb47a0a4041ad4a9b3a0f1ed7c98de06221c Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 18:31:51 -0400 Subject: [PATCH 04/11] refactor(guard): extract local request snapshots --- .../guard/runtime/command_executors.py | 271 +----------------- 1 file changed, 8 insertions(+), 263 deletions(-) diff --git a/src/codex_plugin_scanner/guard/runtime/command_executors.py b/src/codex_plugin_scanner/guard/runtime/command_executors.py index 686062c89..907c327da 100644 --- a/src/codex_plugin_scanner/guard/runtime/command_executors.py +++ b/src/codex_plugin_scanner/guard/runtime/command_executors.py @@ -2,8 +2,6 @@ from __future__ import annotations -import base64 -import json import tempfile from collections.abc import Callable from datetime import datetime, timezone @@ -18,7 +16,7 @@ list_harness_setup_items, uninstall_confirmation_token, ) -from ..config import VALID_RECEIPT_REDACTION_LEVELS, load_guard_config +from ..config import load_guard_config from ..local_supply_chain import ( build_workspace_audit_payload, managed_install_audit_workspace_dirs, @@ -27,10 +25,8 @@ ) from ..models import DECISION_SCOPE_VALUES, GUARD_ACTION_VALUES, DecisionScope, GuardAction, PolicyDecision from ..package_shim_status import record_package_shim_audit_result -from ..redaction import redact_text from ..review_contracts import ( GuardReviewContractError, - build_local_review_request_claim, guard_review_oauth_metadata, validate_decision_memory_bundle_target, validate_remote_approval_request_binding, @@ -45,11 +41,11 @@ probe_package_shim_intercepts, ) from ..store import GuardStore +from . import local_request_snapshots _GUARD_REVIEW_MEMORY_REGISTRY_SYNC_KEY = "guard_review_memory_registry" _GUARD_REVIEW_MEMORY_VERSION_SYNC_KEY = "guard_review_memory_policy_version" _GUARD_REVIEW_MEMORY_ACK_SYNC_KEY = "guard_review_memory_last_ack" -_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY = "guard_command_local_request_snapshot_cursor" PACKAGE_SHIM_OPERATIONS: tuple[str, ...] = ( "guard.packageShims.status", @@ -72,8 +68,8 @@ ) SUPPORTED_COMMAND_OPERATIONS: tuple[str, ...] = (*PACKAGE_SHIM_OPERATIONS, *APP_OPERATIONS, *APPROVAL_OPERATIONS) COMMAND_OPERATION_SCHEMA_VERSIONS: dict[str, int] = {operation: 1 for operation in SUPPORTED_COMMAND_OPERATIONS} -LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = 125 -LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = 25 +LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = local_request_snapshots.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT +LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = local_request_snapshots.LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT def execute_guard_command_job( @@ -423,264 +419,13 @@ def _is_guard_action(value: object) -> TypeGuard[GuardAction]: def _local_request_snapshot_items(store: GuardStore) -> list[dict[str, object]]: - pending_items, _ = _local_request_snapshot_items_for_status( - store, - status="pending", - limit=100, - ) - resolved_items, _ = _local_request_snapshot_items_for_status( - store, - status="resolved", - limit=100, - ) - return [*pending_items, *resolved_items] + return local_request_snapshots.local_request_snapshot_items(store) def _local_request_snapshot_payload(store: GuardStore) -> dict[str, object]: - pending_items, pending_complete = _local_request_snapshot_items_for_status( - store, - status="pending", - limit=LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT, - ) - resolved_items, resolved_complete = _local_request_snapshot_items_for_status( - store, - status="resolved", - limit=LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, - ) - return { - "requests": [*pending_items, *resolved_items], - "pendingComplete": pending_complete, - "resolvedComplete": resolved_complete, - "pendingLimit": LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT, - "resolvedLimit": LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, - "pendingCount": len(pending_items), - "resolvedCount": len(resolved_items), - } - - -def _local_request_snapshot_items_for_status( - store: GuardStore, - *, - status: str, - limit: int, -) -> tuple[list[dict[str, object]], bool]: - items: list[dict[str, object]] = [] - redaction_level = _resolve_cloud_receipt_redaction_level(store) - try: - oauth = guard_review_oauth_metadata(store) - except GuardReviewContractError: - oauth = None - cursor_state = _local_request_snapshot_cursor_state(store) - cursor = cursor_state.get(status) - rows = store.list_approval_requests( - status=status, - limit=limit + 1, - cursor=cursor if isinstance(cursor, str) and cursor else None, - ) - if not rows and isinstance(cursor, str) and cursor: - cursor = None - rows = store.list_approval_requests(status=status, limit=limit + 1) - for item in rows[:limit]: - request_id = item.get("request_id") - if not isinstance(request_id, str) or not request_id: - continue - created_at = str(item.get("created_at") or _now()) - last_seen_at = str(item.get("last_seen_at") or created_at) - resolved_at = item.get("resolved_at") - claim = None - if oauth is not None: - try: - claim = build_local_review_request_claim( - request_row=item, - oauth=oauth, - store=store, - ) - except GuardReviewContractError: - claim = None - items.append( - { - "claim": claim, - "localRequestId": request_id, - "requestKind": str(item.get("harness") or "guard-review"), - "requestPayload": _cloud_safe_local_request_payload( - item, - redaction_level=redaction_level, - ), - "localStatus": str(item.get("status") or status), - "firstSeenAt": created_at, - "lastSeenAt": last_seen_at, - "resolvedAt": str(resolved_at) if isinstance(resolved_at, str) and resolved_at else None, - } - ) - if rows: - cursor_state[status] = _local_request_snapshot_next_cursor(rows, limit) - _save_local_request_snapshot_cursor_state(store, cursor_state) - return items, cursor is None and len(rows) <= limit - - -def _local_request_snapshot_cursor_state(store: GuardStore) -> dict[str, object]: - value = store.get_sync_payload(_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY) - return dict(value) if isinstance(value, dict) else {} - - -def _save_local_request_snapshot_cursor_state( - store: GuardStore, - state: dict[str, object], -) -> None: - cleaned = { - key: value - for key, value in state.items() - if key in {"pending", "resolved"} and isinstance(value, str) and value - } - store.set_sync_payload(_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY, cleaned, _now()) - - -def _local_request_snapshot_next_cursor( - rows: list[dict[str, object]], - limit: int, -) -> str | None: - if len(rows) <= limit: - return None - last_item = rows[limit - 1] - payload = { - "last_seen_at": str(last_item.get("last_seen_at") or last_item.get("created_at") or ""), - "request_id": str(last_item.get("request_id") or ""), - } - if not payload["last_seen_at"] or not payload["request_id"]: - return None - encoded = base64.urlsafe_b64encode( - json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8"), - ).decode("ascii") - return encoded.rstrip("=") - - -def _resolve_cloud_receipt_redaction_level(store: GuardStore) -> str: - payload = store.get_sync_payload("cloud_receipt_redaction_level") - if isinstance(payload, dict): - level = payload.get("level") - if isinstance(level, str) and level in VALID_RECEIPT_REDACTION_LEVELS: - return level - try: - config = load_guard_config(store.guard_home) - if config.receipt_redaction_level in VALID_RECEIPT_REDACTION_LEVELS: - return config.receipt_redaction_level - except Exception: - pass - return "full" - - -def _optional_payload_mapping(value: object) -> dict[str, object] | None: - return dict(value) if isinstance(value, dict) else None - - -def _cloud_safe_local_request_payload( - item: dict[str, object], - *, - redaction_level: str, -) -> dict[str, object]: - payload: dict[str, object] = {} - for key in ( - "request_id", - "status", - "harness", - "artifact_id", - "artifact_name", - "artifact_type", - "artifact_hash", - "artifact_label", - "source_label", - "trigger_summary", - "why_now", - "risk_headline", - "risk_summary", - "policy_action", - "recommended_scope", - "created_at", - "last_seen_at", - "queue_group_id", - "review_kind", - "risk_category", - "capability_category", - "publisher", - "package_manager", - "package_name", - ): - value = item.get(key) - if isinstance(value, (str, int, float, bool)) or value is None: - payload[key] = value - - envelope = _optional_payload_mapping(item.get("action_envelope_json")) - safe_envelope = _cloud_safe_action_envelope(envelope, redaction_level=redaction_level) - if safe_envelope is not None: - payload["action_envelope_json"] = safe_envelope - - if redaction_level == "full": - payload["raw_command_text"] = None - payload["command_text"] = None - return payload - - command_text = _local_request_command_text(item, envelope) - if command_text: - scrubbed = redact_text(command_text).text - payload["raw_command_text"] = scrubbed - payload["command_text"] = scrubbed - payload_envelope = payload.get("action_envelope_json") - if isinstance(payload_envelope, dict): - payload_envelope["command"] = scrubbed - return payload - - -def _local_request_command_text( - payload: dict[str, object], - envelope: dict[str, object] | None, -) -> str | None: - for key in ("raw_command_text", "rawCommandText", "command_text", "commandText"): - value = payload.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() - if envelope is None: - return None - command = envelope.get("command") - return command.strip() if isinstance(command, str) and command.strip() else None - - -def _cloud_safe_action_envelope( - envelope: dict[str, object] | None, - *, - redaction_level: str, -) -> dict[str, object] | None: - if envelope is None: - return None - safe: dict[str, object] = {} - for key in ( - "schema_version", - "action_id", - "harness", - "event_name", - "action_type", - "workspace_hash", - "tool_name", - "mcp_server", - "mcp_tool", - "target_path_count", - "network_host_count", - "package_manager", - ): - value = envelope.get(key) - if isinstance(value, (str, int, float, bool)) or value is None: - safe[key] = value - if redaction_level != "full": - command = envelope.get("command") - if isinstance(command, str) and command.strip(): - safe["command"] = redact_text(command).text - if redaction_level == "none": - for key in ("target_paths", "network_hosts", "package_name", "package_targets"): - value = envelope.get(key) - if isinstance(value, list): - safe[key] = [item for item in value if isinstance(item, str)] - elif isinstance(value, str): - safe[key] = value - return safe or None + local_request_snapshots.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT + local_request_snapshots.LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT + return local_request_snapshots.local_request_snapshot_payload(store) def _package_shim_context( From 0ad2445dfb1224f6bb9d68fae0d6af01c62ffb72 Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 18:33:00 -0400 Subject: [PATCH 05/11] refactor(guard): extract local request snapshots --- .../guard/runtime/local_request_snapshots.py | 285 ++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py diff --git a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py new file mode 100644 index 000000000..861c2551b --- /dev/null +++ b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py @@ -0,0 +1,285 @@ +"""Cloud-safe local approval request snapshots for command queue leases.""" + +from __future__ import annotations + +import base64 +import json +from datetime import datetime, timezone + +from ..config import VALID_RECEIPT_REDACTION_LEVELS, load_guard_config +from ..redaction import redact_text +from ..review_contracts import ( + GuardReviewContractError, + build_local_review_request_claim, + guard_review_oauth_metadata, +) +from ..store import GuardStore + +LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = 125 +LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = 25 +_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY = "guard_command_local_request_snapshot_cursor" + + +def local_request_snapshot_items(store: GuardStore) -> list[dict[str, object]]: + pending_items, _ = _local_request_snapshot_items_for_status( + store, + status="pending", + limit=100, + ) + resolved_items, _ = _local_request_snapshot_items_for_status( + store, + status="resolved", + limit=100, + ) + return [*pending_items, *resolved_items] + + +def local_request_snapshot_payload(store: GuardStore) -> dict[str, object]: + pending_items, pending_complete = _local_request_snapshot_items_for_status( + store, + status="pending", + limit=LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT, + ) + resolved_items, resolved_complete = _local_request_snapshot_items_for_status( + store, + status="resolved", + limit=LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, + ) + return { + "requests": [*pending_items, *resolved_items], + "pendingComplete": pending_complete, + "resolvedComplete": resolved_complete, + "pendingLimit": LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT, + "resolvedLimit": LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, + "pendingCount": len(pending_items), + "resolvedCount": len(resolved_items), + } + + +def _local_request_snapshot_items_for_status( + store: GuardStore, + *, + status: str, + limit: int, +) -> tuple[list[dict[str, object]], bool]: + items: list[dict[str, object]] = [] + redaction_level = _resolve_cloud_receipt_redaction_level(store) + try: + oauth = guard_review_oauth_metadata(store) + except GuardReviewContractError: + oauth = None + cursor_state = _local_request_snapshot_cursor_state(store) + cursor = cursor_state.get(status) + rows = store.list_approval_requests( + status=status, + limit=limit + 1, + cursor=cursor if isinstance(cursor, str) and cursor else None, + ) + if not rows and isinstance(cursor, str) and cursor: + cursor = None + rows = store.list_approval_requests(status=status, limit=limit + 1) + for item in rows[:limit]: + request_id = item.get("request_id") + if not isinstance(request_id, str) or not request_id: + continue + created_at = str(item.get("created_at") or _now()) + last_seen_at = str(item.get("last_seen_at") or created_at) + resolved_at = item.get("resolved_at") + claim = None + if oauth is not None: + try: + claim = build_local_review_request_claim( + request_row=item, + oauth=oauth, + store=store, + ) + except GuardReviewContractError: + claim = None + items.append( + { + "claim": claim, + "localRequestId": request_id, + "requestKind": str(item.get("harness") or "guard-review"), + "requestPayload": _cloud_safe_local_request_payload( + item, + redaction_level=redaction_level, + ), + "localStatus": str(item.get("status") or status), + "firstSeenAt": created_at, + "lastSeenAt": last_seen_at, + "resolvedAt": str(resolved_at) if isinstance(resolved_at, str) and resolved_at else None, + } + ) + if rows: + cursor_state[status] = _local_request_snapshot_next_cursor(rows, limit) + _save_local_request_snapshot_cursor_state(store, cursor_state) + return items, cursor is None and len(rows) <= limit + + +def _local_request_snapshot_cursor_state(store: GuardStore) -> dict[str, object]: + value = store.get_sync_payload(_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY) + return dict(value) if isinstance(value, dict) else {} + + +def _save_local_request_snapshot_cursor_state( + store: GuardStore, + state: dict[str, object], +) -> None: + cleaned = { + key: value + for key, value in state.items() + if key in {"pending", "resolved"} and isinstance(value, str) and value + } + store.set_sync_payload(_LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY, cleaned, _now()) + + +def _local_request_snapshot_next_cursor( + rows: list[dict[str, object]], + limit: int, +) -> str | None: + if len(rows) <= limit: + return None + last_item = rows[limit - 1] + payload = { + "last_seen_at": str(last_item.get("last_seen_at") or last_item.get("created_at") or ""), + "request_id": str(last_item.get("request_id") or ""), + } + if not payload["last_seen_at"] or not payload["request_id"]: + return None + encoded = base64.urlsafe_b64encode( + json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8"), + ).decode("ascii") + return encoded.rstrip("=") + + +def _resolve_cloud_receipt_redaction_level(store: GuardStore) -> str: + payload = store.get_sync_payload("cloud_receipt_redaction_level") + if isinstance(payload, dict): + level = payload.get("level") + if isinstance(level, str) and level in VALID_RECEIPT_REDACTION_LEVELS: + return level + try: + config = load_guard_config(store.guard_home) + if config.receipt_redaction_level in VALID_RECEIPT_REDACTION_LEVELS: + return config.receipt_redaction_level + except Exception: + pass + return "full" + + +def _optional_payload_mapping(value: object) -> dict[str, object] | None: + return dict(value) if isinstance(value, dict) else None + + +def _cloud_safe_local_request_payload( + item: dict[str, object], + *, + redaction_level: str, +) -> dict[str, object]: + payload: dict[str, object] = {} + for key in ( + "request_id", + "status", + "harness", + "artifact_id", + "artifact_name", + "artifact_type", + "artifact_hash", + "artifact_label", + "source_label", + "trigger_summary", + "why_now", + "risk_headline", + "risk_summary", + "policy_action", + "recommended_scope", + "created_at", + "last_seen_at", + "queue_group_id", + "review_kind", + "risk_category", + "capability_category", + "publisher", + "package_manager", + "package_name", + ): + value = item.get(key) + if isinstance(value, (str, int, float, bool)) or value is None: + payload[key] = value + + envelope = _optional_payload_mapping(item.get("action_envelope_json")) + safe_envelope = _cloud_safe_action_envelope(envelope, redaction_level=redaction_level) + if safe_envelope is not None: + payload["action_envelope_json"] = safe_envelope + + if redaction_level == "full": + payload["raw_command_text"] = None + payload["command_text"] = None + return payload + + command_text = _local_request_command_text(item, envelope) + if command_text: + scrubbed = redact_text(command_text).text + payload["raw_command_text"] = scrubbed + payload["command_text"] = scrubbed + payload_envelope = payload.get("action_envelope_json") + if isinstance(payload_envelope, dict): + payload_envelope["command"] = scrubbed + return payload + + +def _local_request_command_text( + payload: dict[str, object], + envelope: dict[str, object] | None, +) -> str | None: + for key in ("raw_command_text", "rawCommandText", "command_text", "commandText"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + if envelope is None: + return None + command = envelope.get("command") + return command.strip() if isinstance(command, str) and command.strip() else None + + +def _cloud_safe_action_envelope( + envelope: dict[str, object] | None, + *, + redaction_level: str, +) -> dict[str, object] | None: + if envelope is None: + return None + safe: dict[str, object] = {} + for key in ( + "schema_version", + "action_id", + "harness", + "event_name", + "action_type", + "workspace_hash", + "tool_name", + "mcp_server", + "mcp_tool", + "target_path_count", + "network_host_count", + "package_manager", + ): + value = envelope.get(key) + if isinstance(value, (str, int, float, bool)) or value is None: + safe[key] = value + if redaction_level != "full": + command = envelope.get("command") + if isinstance(command, str) and command.strip(): + safe["command"] = redact_text(command).text + if redaction_level == "none": + for key in ("target_paths", "network_hosts", "package_name", "package_targets"): + value = envelope.get(key) + if isinstance(value, list): + safe[key] = [item for item in value if isinstance(item, str)] + elif isinstance(value, str): + safe[key] = value + return safe or None + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() From fa1abd44d055b76bd04b69b657a86f1f2d0510e2 Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 19:00:35 -0400 Subject: [PATCH 06/11] fix(guard): keep small local request snapshots complete --- .../guard/runtime/local_request_snapshots.py | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py index 861c2551b..5fce25c63 100644 --- a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py +++ b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py @@ -17,6 +17,7 @@ LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = 125 LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = 25 +LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT = 500 _LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY = "guard_command_local_request_snapshot_cursor" @@ -78,7 +79,18 @@ def _local_request_snapshot_items_for_status( if not rows and isinstance(cursor, str) and cursor: cursor = None rows = store.list_approval_requests(status=status, limit=limit + 1) - for item in rows[:limit]: + cursor_supported = True + if len(rows) > limit: + rows, cursor_supported = _expand_cursorless_small_backlog( + store, + status=status, + rows=rows, + limit=limit, + ) + if not cursor_supported: + cursor = None + page_limit = min(limit if cursor_supported else LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT, len(rows)) + for item in rows[:page_limit]: request_id = item.get("request_id") if not isinstance(request_id, str) or not request_id: continue @@ -110,10 +122,33 @@ def _local_request_snapshot_items_for_status( "resolvedAt": str(resolved_at) if isinstance(resolved_at, str) and resolved_at else None, } ) - if rows: + if cursor_supported and len(rows) > limit: cursor_state[status] = _local_request_snapshot_next_cursor(rows, limit) _save_local_request_snapshot_cursor_state(store, cursor_state) - return items, cursor is None and len(rows) <= limit + complete_limit = limit if cursor_supported else LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT + return items, cursor is None and len(rows) <= complete_limit + + +def _expand_cursorless_small_backlog( + store: GuardStore, + *, + status: str, + rows: list[dict[str, object]], + limit: int, +) -> tuple[list[dict[str, object]], bool]: + next_cursor = _local_request_snapshot_next_cursor(rows, limit) + if next_cursor is None or not rows: + return rows, True + probe = store.list_approval_requests(status=status, limit=1, cursor=next_cursor) + first_request_id = rows[0].get("request_id") + probe_request_id = probe[0].get("request_id") if probe else None + if not isinstance(first_request_id, str) or probe_request_id != first_request_id: + return rows, True + fallback_rows = store.list_approval_requests( + status=status, + limit=LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT + 1, + ) + return fallback_rows, False def _local_request_snapshot_cursor_state(store: GuardStore) -> dict[str, object]: From 07153ec88eb94663f85ac177d0b20cb7af067b71 Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 19:06:10 -0400 Subject: [PATCH 07/11] fix(guard): advance local request snapshot cursor --- .../guard/runtime/local_request_snapshots.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py index 5fce25c63..1095e6c8d 100644 --- a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py +++ b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py @@ -122,8 +122,11 @@ def _local_request_snapshot_items_for_status( "resolvedAt": str(resolved_at) if isinstance(resolved_at, str) and resolved_at else None, } ) - if cursor_supported and len(rows) > limit: - cursor_state[status] = _local_request_snapshot_next_cursor(rows, limit) + if cursor_supported: + if len(rows) > limit: + cursor_state[status] = _local_request_snapshot_next_cursor(rows, limit) + else: + cursor_state.pop(status, None) _save_local_request_snapshot_cursor_state(store, cursor_state) complete_limit = limit if cursor_supported else LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT return items, cursor is None and len(rows) <= complete_limit From ea990cc92f292b1442fd6d6dffeb50962172a284 Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 19:06:49 -0400 Subject: [PATCH 08/11] test(guard): cover snapshot cursor wraparound --- tests/test_guard_command_snapshot_paging.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_guard_command_snapshot_paging.py b/tests/test_guard_command_snapshot_paging.py index 9b86034ab..3e5473067 100644 --- a/tests/test_guard_command_snapshot_paging.py +++ b/tests/test_guard_command_snapshot_paging.py @@ -79,6 +79,7 @@ def test_local_request_snapshot_pages_large_pending_backlog(tmp_path: Path) -> N first_payload = command_executors._local_request_snapshot_payload(store) second_payload = command_executors._local_request_snapshot_payload(store) + third_payload = command_executors._local_request_snapshot_payload(store) assert first_payload["pendingComplete"] is False assert first_payload["pendingCount"] == command_executors.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT @@ -88,3 +89,7 @@ def test_local_request_snapshot_pages_large_pending_backlog(tmp_path: Path) -> N assert second_payload["pendingCount"] == 5 assert second_payload["requests"][0]["localRequestId"] == "req-pending-125" assert second_payload["requests"][-1]["localRequestId"] == "req-pending-129" + assert third_payload["pendingComplete"] is False + assert third_payload["pendingCount"] == command_executors.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT + assert third_payload["requests"][0]["localRequestId"] == "req-pending-000" + assert third_payload["requests"][-1]["localRequestId"] == "req-pending-124" From d829103be22ff5718bafbc037e5e61ebf4311e5b Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 19:27:55 -0400 Subject: [PATCH 09/11] fix(guard): cap local request snapshot payloads --- .../guard/runtime/local_request_snapshots.py | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py index 1095e6c8d..540d0b7e1 100644 --- a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py +++ b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py @@ -18,6 +18,9 @@ LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = 125 LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = 25 LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT = 500 +LOCAL_REQUEST_SNAPSHOT_MAX_BYTES = 900_000 +LOCAL_REQUEST_TEXT_FIELD_MAX_CHARS = 256 +LOCAL_REQUEST_COMMAND_FIELD_MAX_CHARS = 1_024 _LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY = "guard_command_local_request_snapshot_cursor" @@ -54,6 +57,7 @@ def local_request_snapshot_payload(store: GuardStore) -> dict[str, object]: "resolvedLimit": LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, "pendingCount": len(pending_items), "resolvedCount": len(resolved_items), + "maxBytes": LOCAL_REQUEST_SNAPSHOT_MAX_BYTES, } @@ -242,7 +246,9 @@ def _cloud_safe_local_request_payload( "package_name", ): value = item.get(key) - if isinstance(value, (str, int, float, bool)) or value is None: + if isinstance(value, str): + payload[key] = _bounded_text(value) + elif isinstance(value, (int, float, bool)) or value is None: payload[key] = value envelope = _optional_payload_mapping(item.get("action_envelope_json")) @@ -257,7 +263,7 @@ def _cloud_safe_local_request_payload( command_text = _local_request_command_text(item, envelope) if command_text: - scrubbed = redact_text(command_text).text + scrubbed = _bounded_command(redact_text(command_text).text) payload["raw_command_text"] = scrubbed payload["command_text"] = scrubbed payload_envelope = payload.get("action_envelope_json") @@ -303,21 +309,33 @@ def _cloud_safe_action_envelope( "package_manager", ): value = envelope.get(key) - if isinstance(value, (str, int, float, bool)) or value is None: + if isinstance(value, str): + safe[key] = _bounded_text(value) + elif isinstance(value, (int, float, bool)) or value is None: safe[key] = value if redaction_level != "full": command = envelope.get("command") if isinstance(command, str) and command.strip(): - safe["command"] = redact_text(command).text + safe["command"] = _bounded_command(redact_text(command).text) if redaction_level == "none": for key in ("target_paths", "network_hosts", "package_name", "package_targets"): value = envelope.get(key) if isinstance(value, list): - safe[key] = [item for item in value if isinstance(item, str)] + safe[key] = [_bounded_text(item) for item in value if isinstance(item, str)] elif isinstance(value, str): - safe[key] = value + safe[key] = _bounded_text(value) return safe or None +def _bounded_text(value: str, *, max_chars: int = LOCAL_REQUEST_TEXT_FIELD_MAX_CHARS) -> str: + if len(value) <= max_chars: + return value + return f"{value[:max_chars]}...[truncated {len(value) - max_chars} chars]" + + +def _bounded_command(value: str) -> str: + return _bounded_text(value, max_chars=LOCAL_REQUEST_COMMAND_FIELD_MAX_CHARS) + + def _now() -> str: return datetime.now(timezone.utc).isoformat() From 170c03cff7fe801f21ec11f4415ec6fc39ec950f Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Fri, 3 Jul 2026 19:28:37 -0400 Subject: [PATCH 10/11] test(guard): cover bounded local request snapshots --- tests/test_guard_command_snapshot_paging.py | 44 ++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/test_guard_command_snapshot_paging.py b/tests/test_guard_command_snapshot_paging.py index 3e5473067..e01697ef3 100644 --- a/tests/test_guard_command_snapshot_paging.py +++ b/tests/test_guard_command_snapshot_paging.py @@ -6,7 +6,7 @@ import json from pathlib import Path -from codex_plugin_scanner.guard.runtime import command_executors +from codex_plugin_scanner.guard.runtime import command_executors, local_request_snapshots class PagingStore: @@ -74,6 +74,48 @@ def _approval_request_row(index: int) -> dict[str, object]: } +def test_local_request_snapshot_payload_stays_under_cloud_byte_budget(tmp_path: Path) -> None: + class HugePayloadStore(PagingStore): + def list_approval_requests( + self, + *, + status: str | None = "pending", + harness: str | None = None, + limit: int | None = 50, + cursor: str | None = None, + search: str | None = None, + ) -> list[dict[str, object]]: + rows = super().list_approval_requests( + status=status, + harness=harness, + limit=limit, + cursor=cursor, + search=search, + ) + for row in rows: + row["risk_summary"] = "SECRET_TOKEN=" + ("x" * 20_000) + row["why_now"] = "review context " + ("y" * 20_000) + envelope = row["action_envelope_json"] + if isinstance(envelope, dict): + envelope["command"] = "npm install minimist@1.2.8 " + ("--flag value " * 2_000) + return rows + + store = HugePayloadStore(tmp_path / "guard-home") + store.payloads["cloud_receipt_redaction_level"] = {"level": "none"} + payload = command_executors._local_request_snapshot_payload(store) + + payload_size = len(json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8")) + assert payload_size <= local_request_snapshots.LOCAL_REQUEST_SNAPSHOT_MAX_BYTES + assert payload["pendingComplete"] is False + assert payload["pendingCount"] == command_executors.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT + first_request = payload["requests"][0] + assert isinstance(first_request, dict) + request_payload = first_request["requestPayload"] + assert isinstance(request_payload, dict) + assert "truncated" in str(request_payload["risk_summary"]) + assert "truncated" in str(request_payload["command_text"]) + + def test_local_request_snapshot_pages_large_pending_backlog(tmp_path: Path) -> None: store = PagingStore(tmp_path / "guard-home") From 3c26a972173a3a1a68bfb61f66a01df936a2166a Mon Sep 17 00:00:00 2001 From: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> Date: Sat, 4 Jul 2026 02:18:39 -0400 Subject: [PATCH 11/11] fix(guard): cap command request snapshots Signed-off-by: Michael Kantor <6068672+kantorcodes@users.noreply.github.com> --- .../guard/runtime/command_executors.py | 28 +++++ .../guard/runtime/local_request_snapshots.py | 108 +++++++++++++++++- tests/test_guard_command_queue.py | 1 + 3 files changed, 132 insertions(+), 5 deletions(-) diff --git a/src/codex_plugin_scanner/guard/runtime/command_executors.py b/src/codex_plugin_scanner/guard/runtime/command_executors.py index 907c327da..f270d741c 100644 --- a/src/codex_plugin_scanner/guard/runtime/command_executors.py +++ b/src/codex_plugin_scanner/guard/runtime/command_executors.py @@ -70,6 +70,7 @@ COMMAND_OPERATION_SCHEMA_VERSIONS: dict[str, int] = {operation: 1 for operation in SUPPORTED_COMMAND_OPERATIONS} LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = local_request_snapshots.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = local_request_snapshots.LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT +LOCAL_REQUEST_SNAPSHOT_MAX_BYTES = local_request_snapshots.LOCAL_REQUEST_SNAPSHOT_MAX_BYTES def execute_guard_command_job( @@ -425,9 +426,36 @@ def _local_request_snapshot_items(store: GuardStore) -> list[dict[str, object]]: def _local_request_snapshot_payload(store: GuardStore) -> dict[str, object]: local_request_snapshots.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT = LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT local_request_snapshots.LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT + local_request_snapshots.LOCAL_REQUEST_SNAPSHOT_MAX_BYTES = LOCAL_REQUEST_SNAPSHOT_MAX_BYTES return local_request_snapshots.local_request_snapshot_payload(store) +def _local_request_snapshot_items_for_status( + store: GuardStore, + *, + status: str, + limit: int, +) -> tuple[list[dict[str, object]], bool]: + return local_request_snapshots._local_request_snapshot_items_for_status( + store, + status=status, + limit=limit, + ) + + +def _local_request_snapshot_byte_capped_items( + items: list[dict[str, object]], + *, + max_bytes: int, + existing_items: list[dict[str, object]] | None = None, +) -> tuple[list[dict[str, object]], bool]: + return local_request_snapshots._local_request_snapshot_byte_capped_items( + items, + max_bytes=max_bytes, + existing_items=existing_items, + ) + + def _package_shim_context( payload: dict[str, object], *, diff --git a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py index 540d0b7e1..562677b67 100644 --- a/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py +++ b/src/codex_plugin_scanner/guard/runtime/local_request_snapshots.py @@ -19,6 +19,8 @@ LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT = 25 LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT = 500 LOCAL_REQUEST_SNAPSHOT_MAX_BYTES = 900_000 +LOCAL_REQUEST_SNAPSHOT_MAX_STRING_CHARS = 2_000 +LOCAL_REQUEST_SNAPSHOT_MAX_LIST_ITEMS = 20 LOCAL_REQUEST_TEXT_FIELD_MAX_CHARS = 256 LOCAL_REQUEST_COMMAND_FIELD_MAX_CHARS = 1_024 _LOCAL_REQUEST_SNAPSHOT_CURSOR_SYNC_KEY = "guard_command_local_request_snapshot_cursor" @@ -49,10 +51,15 @@ def local_request_snapshot_payload(store: GuardStore) -> dict[str, object]: status="resolved", limit=LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, ) + requests, pending_byte_complete, resolved_byte_complete = _local_request_snapshot_byte_capped_statuses( + pending_items, + resolved_items, + max_bytes=LOCAL_REQUEST_SNAPSHOT_MAX_BYTES, + ) return { - "requests": [*pending_items, *resolved_items], - "pendingComplete": pending_complete, - "resolvedComplete": resolved_complete, + "requests": requests, + "pendingComplete": pending_complete and pending_byte_complete, + "resolvedComplete": resolved_complete and resolved_byte_complete, "pendingLimit": LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT, "resolvedLimit": LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, "pendingCount": len(pending_items), @@ -61,6 +68,97 @@ def local_request_snapshot_payload(store: GuardStore) -> dict[str, object]: } +def _local_request_snapshot_byte_capped_statuses( + pending_items: list[dict[str, object]], + resolved_items: list[dict[str, object]], + *, + max_bytes: int, +) -> tuple[list[dict[str, object]], bool, bool]: + selected, pending_complete = _local_request_snapshot_byte_capped_items( + pending_items, + max_bytes=max_bytes, + ) + if not pending_complete: + return selected, False, False + + selected, resolved_complete = _local_request_snapshot_byte_capped_items( + resolved_items, + existing_items=selected, + max_bytes=max_bytes, + ) + return selected, True, resolved_complete + + +def _local_request_snapshot_byte_capped_items( + items: list[dict[str, object]], + *, + max_bytes: int, + existing_items: list[dict[str, object]] | None = None, +) -> tuple[list[dict[str, object]], bool]: + selected: list[dict[str, object]] = list(existing_items or []) + initial_len = len(selected) + for item in items: + candidate = [*selected, item] + candidate_bytes = len( + json.dumps({"requests": candidate}, separators=(",", ":"), sort_keys=True).encode("utf-8"), + ) + if candidate_bytes > max_bytes: + if len(selected) == initial_len: + compact_item = _compact_local_request_snapshot_item(item) + compact_candidate = [*selected, compact_item] + compact_bytes = len( + json.dumps({"requests": compact_candidate}, separators=(",", ":"), sort_keys=True).encode("utf-8"), + ) + if compact_bytes <= max_bytes: + selected.append(compact_item) + return selected, False + selected.append(item) + return selected, True + + +def _compact_local_request_snapshot_item(item: dict[str, object]) -> dict[str, object]: + compact = {key: _compact_local_request_snapshot_value(value) for key, value in item.items()} + compact_bytes = len(json.dumps(compact, separators=(",", ":"), sort_keys=True).encode("utf-8")) + if compact_bytes <= LOCAL_REQUEST_SNAPSHOT_MAX_BYTES: + return compact + + safe_keys = ( + "localRequestId", + "requestKind", + "requestPayload", + "localStatus", + "firstSeenAt", + "lastSeenAt", + "resolvedAt", + "status", + "harness", + "artifactId", + "artifactName", + "artifactType", + "policyAction", + "recommendedScope", + "rawCommandText", + "reviewCommand", + "actionEnvelope", + ) + reduced = {key: compact[key] for key in safe_keys if key in compact} + if reduced: + return reduced + return compact + + +def _compact_local_request_snapshot_value(value: object) -> object: + if isinstance(value, str): + if len(value) <= LOCAL_REQUEST_SNAPSHOT_MAX_STRING_CHARS: + return value + return f"{value[:LOCAL_REQUEST_SNAPSHOT_MAX_STRING_CHARS]}...[truncated]" + if isinstance(value, list): + return [_compact_local_request_snapshot_value(item) for item in value[:LOCAL_REQUEST_SNAPSHOT_MAX_LIST_ITEMS]] + if isinstance(value, dict): + return {str(key): _compact_local_request_snapshot_value(item) for key, item in value.items()} + return value + + def _local_request_snapshot_items_for_status( store: GuardStore, *, @@ -93,7 +191,7 @@ def _local_request_snapshot_items_for_status( ) if not cursor_supported: cursor = None - page_limit = min(limit if cursor_supported else LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT, len(rows)) + page_limit = min(limit, len(rows)) for item in rows[:page_limit]: request_id = item.get("request_id") if not isinstance(request_id, str) or not request_id: @@ -132,7 +230,7 @@ def _local_request_snapshot_items_for_status( else: cursor_state.pop(status, None) _save_local_request_snapshot_cursor_state(store, cursor_state) - complete_limit = limit if cursor_supported else LOCAL_REQUEST_CURSORLESS_FALLBACK_LIMIT + complete_limit = limit return items, cursor is None and len(rows) <= complete_limit diff --git a/tests/test_guard_command_queue.py b/tests/test_guard_command_queue.py index 294f29aff..d255c15e9 100644 --- a/tests/test_guard_command_queue.py +++ b/tests/test_guard_command_queue.py @@ -820,6 +820,7 @@ def fake_json_request( "resolvedComplete": True, "pendingLimit": command_executors.LOCAL_REQUEST_PENDING_SNAPSHOT_LIMIT, "resolvedLimit": command_executors.LOCAL_REQUEST_RESOLVED_SNAPSHOT_LIMIT, + "maxBytes": command_executors.LOCAL_REQUEST_SNAPSHOT_MAX_BYTES, "pendingCount": 0, "resolvedCount": 0, },