Skip to content

Commit 4a35f3f

Browse files
committed
Fix session reset: move JSONL to inbox instead of in-process branching
agent.reset() now moves pi-session.jsonl to staging/inbox/ and restarts Pi fresh, severing the parentId chain that caused unbounded context growth. - Replace SessionReader watermark class with parse_session() function - Add read_sessions ctools command for reflect skill - Simplify reset_clarvis_session (parallel resets, remove redundant voice disconnect) - Add STAGING_INBOX/STAGING_DIGESTED constants to paths.py - Update reflect skill to use read_sessions on inbox files
1 parent a545f9e commit 4a35f3f

File tree

8 files changed

+212
-339
lines changed

8 files changed

+212
-339
lines changed

clarvis/agent/agent.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
import json
1010
import logging
1111
import signal
12+
import time
1213
from collections.abc import AsyncGenerator
1314
from dataclasses import dataclass
1415
from pathlib import Path
1516
from typing import TYPE_CHECKING
1617

18+
from ..core.paths import STAGING_INBOX
19+
1720
if TYPE_CHECKING:
1821
from .context import ContextInjector
1922

@@ -230,10 +233,18 @@ async def shutdown(self) -> None:
230233
await self.disconnect()
231234

232235
async def reset(self) -> None:
233-
"""Reset the Pi session (new conversation, retains JSONL history)."""
234-
if not self._connected:
235-
return
236-
self._send_command({"type": "new_session"})
236+
"""Reset by moving session file to inbox and restarting Pi fresh."""
237+
was_connected = self._connected
238+
await self.disconnect()
239+
240+
if self._session_file.exists():
241+
STAGING_INBOX.mkdir(parents=True, exist_ok=True)
242+
dest = STAGING_INBOX / f"session_{self._session_key}_{int(time.time())}.jsonl"
243+
self._session_file.rename(dest)
244+
logger.info("Moved session to %s", dest.name)
245+
246+
if was_connected:
247+
await self.connect()
237248
if self.context:
238249
self.context.reset()
239250

clarvis/core/commands/agent.py

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,46 +33,37 @@ def reload_agents(self: CommandHandlers, **kwargs) -> dict:
3333

3434

3535
def reset_clarvis_session(self: CommandHandlers, **kw) -> str:
36-
"""Reset Clarvis agent session (new_session RPC)."""
36+
"""Reset all agent sessions — moves session files to inbox and restarts."""
3737
import asyncio
3838

39-
from ..paths import CLARVIS_HOME, agent_home
40-
41-
# Flush unreflected session content to inbox before resetting
42-
session_reader = self._get_service("session_reader")
43-
if session_reader:
44-
inbox = CLARVIS_HOME / "staging" / "inbox"
45-
for source in ("clarvis", "factoria"):
46-
try:
47-
session_reader.flush_to_inbox(source, inbox)
48-
except Exception as exc:
49-
logger.warning("Failed to flush %s session to inbox: %s", source, exc)
39+
from ..paths import agent_home
5040

5141
for sid_file in [
5242
agent_home("clarvis") / "session_id",
5343
agent_home("factoria") / "session_id",
5444
]:
5545
sid_file.unlink(missing_ok=True)
5646

57-
# Reset the Clarvis agent session
47+
# Reset both agents in parallel (each handles its own file move + restart)
5848
agents = self._get_service("agents") or {}
59-
clarvis_agent = agents.get("clarvis")
60-
if clarvis_agent and clarvis_agent.connected:
61-
try:
62-
asyncio.run_coroutine_threadsafe(clarvis_agent.reset(), self.ctx.loop).result(timeout=30)
63-
except Exception as exc:
64-
logger.warning("Failed to reset Clarvis agent: %s", exc)
49+
if agents:
50+
51+
async def _reset_all():
52+
results = await asyncio.gather(
53+
*(a.reset() for a in agents.values()),
54+
return_exceptions=True,
55+
)
56+
for name, result in zip(agents, results):
57+
if isinstance(result, Exception):
58+
logger.warning("Failed to reset %s agent: %s", name, result)
6559

66-
# Disconnect voice orchestrator's agent if active
67-
orchestrator = self._get_service("voice")
68-
if orchestrator and orchestrator.agent.connected:
69-
asyncio.run_coroutine_threadsafe(orchestrator.agent.disconnect(), orchestrator._loop)
60+
asyncio.run_coroutine_threadsafe(_reset_all(), self.ctx.loop).result(timeout=30)
7061

7162
return "ok"
7263

7364

7465
def reflect_complete(self: CommandHandlers, **kw) -> dict:
75-
"""Signal that reflect is done — advance watermarks and reset agents."""
66+
"""Signal that reflect is done — archive inbox and reset agents."""
7667
import asyncio
7768

7869
daemon = self._get_service("daemon")
@@ -114,10 +105,21 @@ def nudge_agent(self: CommandHandlers, *, reason: str = "timer", **kw) -> dict:
114105
return {"error": str(exc)}
115106

116107

108+
def read_sessions(self: CommandHandlers, *, path: str, **kw) -> dict:
109+
"""Parse a Pi session JSONL file and return structured messages."""
110+
from pathlib import Path
111+
112+
from ...memory.session_reader import parse_session
113+
114+
messages = parse_session(Path(path))
115+
return {"messages": messages, "count": len(messages)}
116+
117+
117118
COMMANDS: dict[str, str] = {
118119
"reload_agents": "reload_agents",
119120
"reset_clarvis_session": "reset_clarvis_session",
120121
"reflect_complete": "reflect_complete",
121122
"listen": "listen",
122123
"nudge": "nudge_agent",
124+
"read_sessions": "read_sessions",
123125
}

clarvis/core/paths.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
from pathlib import Path
22

33
CLARVIS_HOME = Path.home() / ".clarvis"
4+
STAGING_DIR = CLARVIS_HOME / "staging"
5+
STAGING_INBOX = STAGING_DIR / "inbox"
6+
STAGING_DIGESTED = STAGING_DIR / "digested"
47

58

69
def agent_home(name: str) -> Path:

clarvis/daemon.py

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from .core.commands import CommandHandlers
2323
from .core.context import AppContext
2424
from .core.ipc import DaemonServer
25-
from .core.paths import CLARVIS_HOME
25+
from .core.paths import STAGING_DIGESTED, STAGING_INBOX
2626
from .core.persistence import json_load_safe
2727
from .core.scheduler import Scheduler
2828
from .core.signals import SignalBus
@@ -128,8 +128,6 @@ def __init__(
128128
self.voice_orchestrator = None
129129
self.channel_manager = None
130130
self._chat_bridge = None
131-
self.staging_dir = CLARVIS_HOME / "staging"
132-
self._session_reader = None
133131
self._owned_services: list = [] # services with no other refs (prevent GC)
134132

135133
# Deferred — initialized in run()
@@ -700,25 +698,24 @@ async def stop(self) -> None:
700698
pass
701699

702700
async def reset_all_agents(self) -> None:
703-
"""Reset all agent sessions (called after reflect)."""
704-
for name, agent in self._agents.items():
705-
try:
706-
await agent.reset()
701+
"""Reset all agent sessions in parallel."""
702+
results = await asyncio.gather(
703+
*(agent.reset() for agent in self._agents.values()),
704+
return_exceptions=True,
705+
)
706+
for name, result in zip(self._agents, results):
707+
if isinstance(result, Exception):
708+
logger.warning("Failed to reset agent %s: %s", name, result, exc_info=True)
709+
else:
707710
logger.info("Reset agent session: %s", name)
708-
except Exception:
709-
logger.warning("Failed to reset agent %s", name, exc_info=True)
710711

711712
async def complete_reflect(self) -> dict:
712-
"""Finalize reflect: advance watermarks, archive inbox, reset agents."""
713-
if self._session_reader:
714-
self._session_reader.advance_all()
715-
inbox = self.staging_dir / "inbox"
716-
if inbox.is_dir():
717-
read_dir = self.staging_dir / "digested"
718-
read_dir.mkdir(exist_ok=True)
719-
for f in inbox.glob("*"):
713+
"""Finalize reflect: move inbox → digested, then reset agents."""
714+
if STAGING_INBOX.is_dir():
715+
STAGING_DIGESTED.mkdir(parents=True, exist_ok=True)
716+
for f in STAGING_INBOX.glob("*"):
720717
if f.is_file():
721-
f.rename(read_dir / f.name)
718+
f.rename(STAGING_DIGESTED / f.name)
722719
await self.reset_all_agents()
723720
return {"status": "reflect complete"}
724721

@@ -741,18 +738,6 @@ async def run(self) -> None:
741738
# Initialize display pipeline (scene, socket, rendering)
742739
self._init_display()
743740

744-
# Create SessionReader for Pi session files
745-
from .core.paths import agent_home
746-
from .memory.session_reader import SessionReader
747-
748-
self._session_reader = SessionReader(
749-
sources={
750-
"clarvis": agent_home("clarvis") / "pi-session.jsonl",
751-
"factoria": agent_home("factoria") / "pi-session.jsonl",
752-
},
753-
watermark_file=self.staging_dir / "session_watermarks.json",
754-
)
755-
756741
self.commands = CommandHandlers(
757742
ctx=self.ctx,
758743
session_tracker=self.session_tracker,
@@ -766,7 +751,6 @@ async def run(self) -> None:
766751
"timer_service": lambda: self.timer_service,
767752
"channel_manager": lambda: self.channel_manager,
768753
"daemon": lambda: self,
769-
"session_reader": lambda: self._session_reader,
770754
},
771755
)
772756

clarvis/data/skills/reflect/SKILL.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ Memory maintenance — extract facts from conversation history and staging inbox
1111

1212
Three sources to check:
1313
1. **Your current session** — already in your context. Extract facts from what you know.
14-
2. **Inbox** — check `~/.clarvis/staging/inbox/` for prior session transcripts (flushed automatically on session reset), user-submitted summaries, and staged files.
15-
3. **Factoria transcript** — read `~/.clarvis/factoria/pi-session.jsonl` for Factoria's conversations since last reflect.
14+
2. **Inbox sessions** — list `~/.clarvis/staging/inbox/` for `session_*.jsonl` files. Parse each with `ctools read_sessions '{"path": "<file>"}'`.
15+
3. **Factoria's live session** — parse `~/.clarvis/factoria/pi-session.jsonl` with `read_sessions` (still active, not in inbox).
16+
4. **Other inbox items** — check for non-session files in inbox (user-submitted summaries from `/remember`, staged markdown files).
1617

17-
If nothing new across all three sources, report "nothing to reflect on" and stop.
18+
If nothing new across all sources, report "nothing to reflect on" and stop.
1819

1920
## Phase 2: Extract facts
2021

@@ -58,5 +59,5 @@ Guidelines:
5859

5960
## Phase 5: Complete
6061

61-
1. Call `reflect_complete` to reset all agent sessions
62+
1. Call `reflect_complete` — moves inbox to `staging/digested/`, resets all agent sessions (current sessions move to inbox and restart fresh)
6263
2. Report what was processed

clarvis/memory/session_reader.py

Lines changed: 9 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,22 @@
1-
"""Multi-source JSONL session reader with per-source watermarks.
2-
3-
Reads Pi session files (pi-session.jsonl) incrementally using byte-offset
4-
watermarks. Each source (e.g. clarvis, factoria) has an independent watermark.
5-
"""
1+
"""Parse Pi session JSONL files into structured messages."""
62

73
import json
84
import logging
9-
import time
105
from pathlib import Path
116

12-
from clarvis.core.persistence import json_load_safe, json_save_atomic
13-
147
logger = logging.getLogger(__name__)
158

16-
# Entry types that are metadata, not conversation content.
17-
_SKIP_TYPES = {"session", "model_change", "thinking_level_change"}
189

10+
def parse_session(path: Path) -> list[dict[str, str]]:
11+
"""Parse a Pi session JSONL file, return [{"role": ..., "text": ...}].
1912
20-
def _parse_pi_messages(raw: str) -> list[dict[str, str]]:
21-
"""Parse Pi JSONL and extract user/assistant text messages.
22-
23-
Returns list of {"role": ..., "text": ...} dicts.
13+
Extracts user and assistant text messages, skipping metadata entries
14+
(session, model_change, system prompts, etc.).
2415
"""
16+
try:
17+
raw = path.read_text(encoding="utf-8", errors="replace")
18+
except FileNotFoundError:
19+
return []
2520
messages = []
2621
for line in raw.splitlines():
2722
line = line.strip()
@@ -48,113 +43,3 @@ def _parse_pi_messages(raw: str) -> list[dict[str, str]]:
4843
if text_parts:
4944
messages.append({"role": role, "text": "\n".join(text_parts)})
5045
return messages
51-
52-
53-
def _filter_for_inbox(raw: str) -> list[str]:
54-
"""Filter JSONL lines for inbox dump.
55-
56-
Keeps all message entries (user, assistant, toolResult — including
57-
ambient context, tool calls, thinking blocks). Drops session metadata
58-
and system prompts.
59-
"""
60-
kept = []
61-
for line in raw.splitlines():
62-
line = line.strip()
63-
if not line:
64-
continue
65-
try:
66-
entry = json.loads(line)
67-
except json.JSONDecodeError:
68-
continue
69-
if entry.get("type") in _SKIP_TYPES:
70-
continue
71-
if entry.get("type") == "message":
72-
role = entry.get("message", {}).get("role")
73-
if role == "system":
74-
continue
75-
kept.append(line)
76-
return kept
77-
78-
79-
class SessionReader:
80-
"""Reads multiple Pi session JSONL files with per-source byte watermarks."""
81-
82-
def __init__(self, sources: dict[str, Path], watermark_file: Path) -> None:
83-
self._sources = {k: Path(v) for k, v in sources.items()}
84-
self._watermark_file = Path(watermark_file)
85-
self._watermarks: dict[str, int] = json_load_safe(self._watermark_file) or {}
86-
self._pending_offsets: dict[str, int] = {}
87-
88-
def _read_since_watermark(self, source: str) -> tuple[str, int] | None:
89-
"""Read raw bytes from *source* since its watermark.
90-
91-
Returns (raw_text, file_size) or None if nothing new.
92-
"""
93-
path = self._sources.get(source)
94-
if not path or not path.exists():
95-
return None
96-
watermark = self._watermarks.get(source, 0)
97-
file_size = path.stat().st_size
98-
if watermark >= file_size:
99-
return None
100-
with open(path, "rb") as f:
101-
f.seek(watermark)
102-
raw = f.read().decode("utf-8", errors="replace")
103-
return raw, file_size
104-
105-
def read_pending(self) -> dict[str, list[dict[str, str]]]:
106-
"""Read new messages from all sources since their watermarks.
107-
108-
Returns {source_name: [{"role": ..., "text": ...}, ...]}.
109-
"""
110-
result: dict[str, list[dict[str, str]]] = {}
111-
for name in self._sources:
112-
chunk = self._read_since_watermark(name)
113-
if chunk is None:
114-
result[name] = []
115-
continue
116-
raw, file_size = chunk
117-
result[name] = _parse_pi_messages(raw)
118-
self._pending_offsets[name] = file_size
119-
return result
120-
121-
def advance(self, source: str) -> None:
122-
"""Advance watermark for a source after successful processing."""
123-
if source in self._pending_offsets:
124-
self._watermarks[source] = self._pending_offsets.pop(source)
125-
json_save_atomic(self._watermark_file, self._watermarks)
126-
127-
def advance_all(self) -> None:
128-
"""Advance watermarks for all sources."""
129-
if not self._pending_offsets:
130-
return
131-
for source in list(self._pending_offsets):
132-
self._watermarks[source] = self._pending_offsets.pop(source)
133-
json_save_atomic(self._watermark_file, self._watermarks)
134-
135-
def flush_to_inbox(self, source: str, inbox_dir: Path) -> Path | None:
136-
"""Dump unreflected content for *source* into inbox and advance watermark.
137-
138-
Filters out session metadata and system prompts. Returns the
139-
path of the written file, or None if there was nothing to flush.
140-
"""
141-
chunk = self._read_since_watermark(source)
142-
if chunk is None:
143-
return None
144-
raw, file_size = chunk
145-
146-
lines = _filter_for_inbox(raw)
147-
if not lines:
148-
# Only metadata — advance watermark, nothing to dump.
149-
self._watermarks[source] = file_size
150-
json_save_atomic(self._watermark_file, self._watermarks)
151-
return None
152-
153-
inbox_dir.mkdir(parents=True, exist_ok=True)
154-
out = inbox_dir / f"{source}_{int(time.time())}.jsonl"
155-
out.write_text("\n".join(lines) + "\n")
156-
157-
self._watermarks[source] = file_size
158-
json_save_atomic(self._watermark_file, self._watermarks)
159-
logger.info("Flushed %d lines from %s to %s", len(lines), source, out.name)
160-
return out

0 commit comments

Comments
 (0)