Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,52 @@ Implementation: `codec_agent_plan.py` (~640 LOC), `routes/agents.py` (~250 LOC o

Implementation: `codec_agent_runner.py` (~700 LOC), `routes/agents.py` (+120 for Step 9 endpoints), `ecosystem.config.js` (+22 for PM2 entry).

### Other known gaps (tracked for Phase 3 follow-on)
- No UI yet — Step 10 ships chat mode dropdown + status pills + agent timeline
- No proactive messaging from agent → user (Step 10)
### Proactive Messaging + Project Mode (Phase 3 Step 10)

`codec_agent_messaging.py` is the agent ↔ user message dispatch. Backend complete; PWA HTML for the Project-mode dropdown + status pills deferred to Phase 3.5 alongside the proactive intelligence overlay.

**Per-message flow:**
1. `_run_agent` (Step 9) calls `post_message(agent_id, type, title, body, actions)` at 5 lifecycle points: agent start, checkpoint completion, blocked-on-permission, destructive-rejected abort, and final completion
2. `post_message` writes the record to `~/.codec/agents/<id>/messages.jsonl` (1:1 timeline preservation, never batched)
3. `post_message` then updates `~/.codec/notifications.json` — but only ONE banner per agent per `BATCH_WINDOW_SECONDS=60` (Q10). Within the window, the existing banner's `batch_count` increments and `title` updates to "N updates from <agent>: <latest>".
4. Audit emit `agent_message_sent` with `extra.batched` flag

**Message types** (frozen vocabulary): `agent_update` / `agent_blocked` / `agent_question` / `agent_done` / `agent_aborted` / `user_reply`.

**Silence kill-switch:** `is_silenced(agent_id)` reads `~/.codec/agent_silence.json`. When silenced: `post_message` still writes the timeline but skips notifications. Toggled via `POST /api/agents/{id}/silence {"silenced": bool}`. State is per-agent + persistent (atomic R/W).

**User reply pickup:** `POST /api/agents/{id}/messages {"body": "..."}` writes a `type=user_reply` line to `messages.jsonl` and emits `agent_message_received`. Step 9's `_run_agent` calls `get_unread_user_replies(agent_id, since_ts)` between checkpoints to feed replies into the next `_qwen_next_action` call as additional context.

**Auto-escalation from chat (Q11):** `_classify_chat_message(text)` calls Qwen-3.6 with a structured-JSON prompt and returns `(is_project: bool, estimated_checkpoints: int, reason: str)`. `_should_escalate_to_project(user_text, session_id)` is the 2-signal gate:
- Signal 1: classifier verdict `is_project=True`
- Signal 2: `estimated_checkpoints >= ESCALATE_CHECKPOINTS_THRESHOLD = 3`
- Plus 2 kill conditions: `AGENT_AUTO_ESCALATE_ENABLED=false` env var, OR `session_id` in `_autoescalate_silence_set` (in-memory, mutated under `_AUTOESCALATE_SILENCE_LOCK`)

After the user says "No" once for a session, that session_id is silenced for the rest of the conversation. Resets on new chat session (because `_autoescalate_silence_set` is in-memory).

**3 audit events:** `agent_message_sent`, `agent_message_received`, `agent_auto_escalated_from_chat`. `PHASE3_STEP10_EVENTS` frozenset exposed.

**Reuses:** `~/.codec/notifications.json` (existing PWA infrastructure since Phase 1) · Step 8 storage layout · Step 9 `_run_agent` emit sites · Qwen-3.6 (existing local LLM at `http://127.0.0.1:8090/v1/chat/completions`).

**Kill switches:**
- `AGENT_AUTO_ESCALATE_ENABLED=false` — chat handler never suggests project promotion
- Per-agent: `POST /api/agents/{id}/silence {"silenced": true}` — agent runs but no notification banners
- Per-conversation auto-escalation silence (Q11) — first "No" suppresses for that session

**PWA endpoints (Step 10):**
- `GET /api/agents/{id}/messages` — return messages.jsonl as a list (newest last)
- `POST /api/agents/{id}/messages {"body": "..."}` — user reply
- `POST /api/agents/{id}/silence {"silenced": bool}` — toggle silence

Implementation: `codec_agent_messaging.py` (~270 LOC), `routes/agents.py` (+61 for Step 10 endpoints), `codec_dashboard.py` (+125 for classifier + escalation gate), `codec_agent_runner.py` (+42 for `post_message` integration into 5 emit sites).

### Other known gaps (tracked for Phase 3.5 follow-on)
- **Project mode UI** — `codec_dashboard.html` does not yet have a mode-dropdown selector or agent status pills. Backend supports project dispatch via `POST /api/agents`; UI affordances deferred to Phase 3.5 alongside proactive overlay.
- **Proactive intelligence overlay** — observer-driven contextual nudges ("you've been on this Notion doc 30 min, want a summary?") deferred per Q12. Step 10 backend done; Phase 3.5 layers proactive on top.
- **`blocked_on_qwen` dedicated status** (Step 9 review C2) — Qwen unavailability currently maps to `blocked_on_permission` with reason. Phase 3.5 may introduce dedicated status with daemon-driven auto-resume.
- **Read-paths runtime enforcement** (Step 9 review M4) — `PermissionManifest.read_paths` declared but not gated; documented inline. Phase 3.5 may add `Action.reads_path` field + LLM prompt update.
- No formal teammate / sub-agent recursion — Crew is the only multi-agent primitive
- (Phase 3 complete after Step 10 ships)
- (Phase 3 backend complete after Step 10 ships; Phase 3.5 = UI + proactive + Step 9 review polish)

## 4. Skill system

Expand Down Expand Up @@ -457,6 +498,18 @@ Eight event names. `agent_started` opens the per-agent operation envelope; subse

`PHASE3_STEP9_EVENTS` frozenset exposed.

#### Phase 3 Step 10 events — agent ↔ user messaging

Three event names, all info-level. `agent_message_sent` and `agent_message_received` thread the per-agent `correlation_id` from `_run_agent`'s envelope when called from there; `agent_auto_escalated_from_chat` is independent (chat-handler invocation, no agent yet).

| Event | Source | level | extra fields |
|---|---|---|---|
| `agent_message_sent` | `codec-agent-messaging` | info | `agent_id`, `type` (one of `agent_update` \| `agent_blocked` \| `agent_question` \| `agent_done` \| `agent_aborted` \| `user_reply`), `batched` (bool) |
| `agent_message_received` | `codec-agent-messaging` | info | `agent_id`, `body_len` |
| `agent_auto_escalated_from_chat` | `codec-dashboard` | info | `session_id`, `estimated_checkpoints`, `verdict`, `silenced` (bool, true if subsequent No) |

`PHASE3_STEP10_EVENTS` frozenset exposed.

### Notifications (`~/.codec/notifications.json`)
Four sources can produce notifications: scheduler (crew completion), heartbeat (threshold alert), autopilot (ambient trigger), and Phase 1 Step 3's AskUserQuestion (`type="question"`). All write through `routes/_shared.py:51-127` except AskUserQuestion which writes via `codec_ask_user._write_question_notification`.

Expand Down Expand Up @@ -615,6 +668,12 @@ These zones break running infrastructure if changed without coordination. NEVER
- `AGENT_RUNNER_ENABLED` and `AGENT_RUNNER_MAX_CONCURRENT` env vars (Phase 3 Step 9, defaults `true` / `3`). `AGENT_RUNNER_ENABLED=false` idles the daemon.
- PM2 `codec-agent-runner` service (Phase 3 Step 9). Stop/restart through PM2; `autorestart: true` provides crash recovery automatically. Don't add HTTP heartbeat probes — daemon doesn't expose HTTP by design.
- `~/.codec/agents/<id>/state.json` after Step 9 deploy — read/written by `codec_agent_runner._run_agent` mid-checkpoint. Manual edits while an agent is `running` will desync the resume mechanism. To pause an agent: `POST /api/agents/{id}/pause`.
- `codec_agent_messaging.py` (Phase 3 Step 10) — agent ↔ user message dispatch + 60s batching. Don't refactor without re-running PHASE3-STEP10 design gate. The `BATCH_WINDOW_SECONDS=60` constant is the user-facing batching contract; tune cautiously. The `MAX_MESSAGE_BODY_LEN=5000` constant caps body size in `to_dict()` — never raise without considering audit-log impact.
- `~/.codec/agents/<id>/messages.jsonl` (Phase 3 Step 10) — append-only message log. Never edit directly; use `post_message` / `post_user_reply` / endpoints. Bare-edits during a running agent will desync the daemon's `since_ts` read position for user replies.
- `~/.codec/agent_silence.json` (Phase 3 Step 10) — per-agent silence state. Modify only via `set_silenced` or `POST /api/agents/{id}/silence`. Atomic-write contract.
- `_autoescalate_silence_set` global in `codec_dashboard.py` (Phase 3 Step 10) — in-memory per-session silence state for chat → project escalation. Mutated under `_AUTOESCALATE_SILENCE_LOCK` (`threading.Lock()`); never touch from outside. Resets on dashboard restart by design.
- `AGENT_AUTO_ESCALATE_ENABLED` env var (Phase 3 Step 10, default `true`). Setting `false` disables the chat-handler "Promote to Project mode?" prompt entirely.
- `ESCALATE_CHECKPOINTS_THRESHOLD` constant in `codec_dashboard.py` (default 3). Lowering to 1-2 will prompt-escalate even single-skill asks; raising past 5 effectively disables auto-escalation.

## 11. Working with this repo as a coding agent

Expand Down
268 changes: 268 additions & 0 deletions codec_agent_messaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
"""CODEC Phase 3 Step 10 — Proactive Messaging.

Agent → user message system. Posts simultaneously to:
1. ~/.codec/agents/<id>/messages.jsonl (append-only, durable)
2. ~/.codec/notifications.json (banner; batched per 60s window per Q10)

Reuses:
- codec_audit.audit() — Step 1 envelope
- ~/.codec/notifications.json (existing infrastructure since Phase 1)
- codec_agent_plan storage layout (Step 8)
- codec_agent_runner _run_agent emit sites (Step 9)

See docs/PHASE3-BLUEPRINT.md §4 for design rationale.
"""
from __future__ import annotations

import json
import logging
import os
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional

log = logging.getLogger("codec_agent_messaging")

# ── Storage paths (overridable for tests) ─────────────────────────────────────
_CODEC_DIR = Path(os.path.expanduser("~/.codec"))
_AGENTS_DIR = _CODEC_DIR / "agents"
_NOTIFICATIONS_PATH = _CODEC_DIR / "notifications.json"

# ── Configurable knobs ────────────────────────────────────────────────────────
BATCH_WINDOW_SECONDS = 60 # Q10: messages within this window merge into one banner
MAX_MESSAGE_BODY_LEN = 5000 # truncate beyond this


# ── Audit event constants (mirror codec_audit) ────────────────────────────────
try:
from codec_audit import (
AGENT_MESSAGE_SENT, AGENT_MESSAGE_RECEIVED,
AGENT_AUTO_ESCALATED_FROM_CHAT,
)
except ImportError:
AGENT_MESSAGE_SENT = "agent_message_sent"
AGENT_MESSAGE_RECEIVED = "agent_message_received"
AGENT_AUTO_ESCALATED_FROM_CHAT = "agent_auto_escalated_from_chat"


# ── Message types (frozen vocabulary for Step 10) ─────────────────────────────
VALID_MESSAGE_TYPES = frozenset({
"agent_update", # checkpoint complete, here's what I did
"agent_blocked", # blocked on permission, grant or skip?
"agent_question", # clarifying question (reuses Step 3 ask_user infra)
"agent_done", # plan complete, here's the summary + artifacts
"agent_aborted", # aborted (user / crash / step-budget / destructive-rejected)
"user_reply", # user → agent reply (consumed by runner)
})


# ── AgentMessage dataclass ────────────────────────────────────────────────────
@dataclass
class AgentMessage:
agent_id: str
type: str # one of VALID_MESSAGE_TYPES
title: str
body: str # markdown
actions: List[Dict[str, Any]] = field(default_factory=list)
correlation_id: str = ""

def to_dict(self) -> Dict[str, Any]:
if self.type not in VALID_MESSAGE_TYPES:
raise ValueError(f"invalid type {self.type!r}; expected {sorted(VALID_MESSAGE_TYPES)}")
return {
"ts": datetime.now(timezone.utc).isoformat(timespec="milliseconds"),
"agent_id": self.agent_id,
"type": self.type,
"title": self.title[:200],
"body": self.body[:MAX_MESSAGE_BODY_LEN],
"actions": list(self.actions),
"correlation_id": self.correlation_id,
}


# ── Atomic file I/O ───────────────────────────────────────────────────────────
def _atomic_write_json(path: Path, data: Any) -> None:
"""Write JSON atomically: write to .tmp, fsync, rename. Mirrors Step 8."""
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(path.suffix + ".tmp")
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)


def _append_jsonl(path: Path, record: Dict[str, Any]) -> None:
"""Append a single JSON-encoded line. fsync after each write."""
path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(record, separators=(",", ":")) + "\n"
with open(path, "a", encoding="utf-8") as f:
f.write(line)
f.flush()
os.fsync(f.fileno())


def _read_notifications() -> List[Dict[str, Any]]:
if not _NOTIFICATIONS_PATH.exists():
return []
try:
with open(_NOTIFICATIONS_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except (json.JSONDecodeError, OSError) as e:
log.warning("read notifications failed: %s", e)
return []


# ── Audit emit helper ─────────────────────────────────────────────────────────
def _audit(event: str, source: str = "codec-agent-messaging",
message: str = "", correlation_id: str = "",
extra: Optional[Dict[str, Any]] = None) -> None:
try:
from codec_audit import audit
except Exception:
return
audit(event=event, source=source, message=message,
correlation_id=correlation_id,
extra=dict(extra or {}))


# ── Silence storage ───────────────────────────────────────────────────────────
_SILENCE_LOCK = None # threading.Lock; lazy init


def _silence_state_path() -> Path:
return _CODEC_DIR / "agent_silence.json"


def _read_silence_state() -> Dict[str, bool]:
p = _silence_state_path()
if not p.exists():
return {}
try:
return json.loads(p.read_text())
except json.JSONDecodeError:
return {}


def is_silenced(agent_id: str) -> bool:
return bool(_read_silence_state().get(agent_id, False))


def set_silenced(agent_id: str, silenced: bool) -> None:
"""Toggle silence for an agent. When True, post_message still writes
to messages.jsonl but skips notifications.json banner."""
state = _read_silence_state()
if silenced:
state[agent_id] = True
else:
state.pop(agent_id, None)
_atomic_write_json(_silence_state_path(), state)


# ── Core post_message + batching ──────────────────────────────────────────────
def post_message(agent_id: str, type: str, title: str, body: str,
actions: Optional[List[Dict[str, Any]]] = None,
correlation_id: str = "") -> Dict[str, Any]:
"""Post an agent message. Writes to messages.jsonl (append-only,
timeline preserved) AND notifications.json (banner — batched if a
recent same-agent banner exists within BATCH_WINDOW_SECONDS).

Returns the record dict (with injected ts).

Per Q10: timeline messages are 1:1 with calls; banner notifications
are batched to avoid notification-badge spam.
"""
msg = AgentMessage(
agent_id=agent_id, type=type, title=title, body=body,
actions=list(actions or []), correlation_id=correlation_id,
)
record = msg.to_dict()

# Always append to messages.jsonl (timeline, no batching)
msg_path = _AGENTS_DIR / agent_id / "messages.jsonl"
_append_jsonl(msg_path, record)

# Silence kill-switch (Step 10): skip notification, keep messages.jsonl write.
batched = False
if not is_silenced(agent_id):
# Update notifications.json (with batching for agent_update)
notifs = _read_notifications()
now_ts = time.time()
if type == "agent_update":
# Look for recent banner from same agent
for n in notifs:
if (n.get("agent_id") == agent_id and
n.get("type") == "agent_update"):
n_ts = n.get("_post_ts", 0)
if now_ts - n_ts <= BATCH_WINDOW_SECONDS:
n["batch_count"] = int(n.get("batch_count", 1)) + 1
n["title"] = f"{n['batch_count']} updates from {agent_id}: {title[:60]}"
n["body"] = body # latest body wins
n["_post_ts"] = now_ts
n["correlation_id"] = correlation_id
batched = True
break

if not batched:
notif = dict(record)
notif["_post_ts"] = now_ts
notif["batch_count"] = 1
notifs.append(notif)

_atomic_write_json(_NOTIFICATIONS_PATH, notifs)

# Audit emit
_audit(AGENT_MESSAGE_SENT, message=f"{type} for {agent_id}",
correlation_id=correlation_id,
extra={"agent_id": agent_id, "type": type, "batched": batched})

return record


def post_user_reply(agent_id: str, body: str) -> Dict[str, Any]:
"""User → agent reply. Written to messages.jsonl with type=user_reply.
Daemon picks up next tick, feeds to next _qwen_next_action call.
Emits AGENT_MESSAGE_RECEIVED."""
msg = AgentMessage(
agent_id=agent_id, type="user_reply",
title="(user reply)", body=body,
actions=[], correlation_id="",
)
record = msg.to_dict()
msg_path = _AGENTS_DIR / agent_id / "messages.jsonl"
_append_jsonl(msg_path, record)
_audit(AGENT_MESSAGE_RECEIVED, message=f"user reply for {agent_id}",
extra={"agent_id": agent_id, "body_len": len(body)})
return record


def get_unread_user_replies(agent_id: str, since_ts: float) -> List[Dict[str, Any]]:
"""Return user_reply entries with ts > since_ts (epoch seconds).
Used by codec_agent_runner._run_agent to feed replies to the next
qwen call as additional context."""
msg_path = _AGENTS_DIR / agent_id / "messages.jsonl"
if not msg_path.exists():
return []
out: List[Dict[str, Any]] = []
with open(msg_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
if rec.get("type") != "user_reply":
continue
ts_str = rec.get("ts", "")
try:
rec_ts = datetime.fromisoformat(ts_str).timestamp()
except (ValueError, TypeError):
continue
if rec_ts > since_ts:
out.append(rec)
return out
Loading
Loading