From a2e50ab1d115420c65f525e007ed9ac19b36523a Mon Sep 17 00:00:00 2001 From: Ranbir Singh Date: Tue, 12 May 2026 21:33:01 +0530 Subject: [PATCH 1/2] fix: checkpoint live session updates --- src/cli/session/runtime.ts | 24 ++++++- src/runtime/engine/manager.ts | 15 +++- src/session/live-checkpoint.ts | 70 ++++++++++++++++++ test/integration.test.ts | 126 ++++++++++++++++++++++++++++++++- test/mock-agent.ts | 19 +++++ 5 files changed, 250 insertions(+), 4 deletions(-) create mode 100644 src/session/live-checkpoint.ts diff --git a/src/cli/session/runtime.ts b/src/cli/session/runtime.ts index 647d3b9f..e9b776bc 100644 --- a/src/cli/session/runtime.ts +++ b/src/cli/session/runtime.ts @@ -29,6 +29,7 @@ import { trimConversationForRuntime, } from "../../session/conversation-model.js"; import { SessionEventWriter } from "../../session/events.js"; +import { LiveSessionCheckpoint } from "../../session/live-checkpoint.js"; import { setCurrentModelId, setDesiredModelId } from "../../session/mode-preference.js"; import { absolutePath, @@ -408,6 +409,22 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise { + await flushPendingMessages(false); + record.lastUsedAt = isoNow(); + applyConversation(record, conversation); + record.acpx = acpxState; + await eventWriter.checkpoint(); + }, + onError: (error) => { + if (options.verbose) { + process.stderr.write( + "[acpx] live session checkpoint failed: " + formatErrorMessage(error) + "\n", + ); + } + }, + }); const ownClient = options.client == null; const client = @@ -451,6 +468,7 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise { @@ -459,6 +477,7 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise { + // best effort on close + }); await flushPendingMessages(false).catch(() => { // best effort on close }); diff --git a/src/runtime/engine/manager.ts b/src/runtime/engine/manager.ts index 73ecf05e..ab232b3e 100644 --- a/src/runtime/engine/manager.ts +++ b/src/runtime/engine/manager.ts @@ -16,6 +16,7 @@ import { trimConversationForRuntime, } from "../../session/conversation-model.js"; import { defaultSessionEventLog } from "../../session/event-log.js"; +import { LiveSessionCheckpoint } from "../../session/live-checkpoint.js"; import { setDesiredConfigOption, setDesiredModeId } from "../../session/mode-preference.js"; import type { ClientOperation, SessionRecord, SessionResumePolicy } from "../../types.js"; import type { @@ -509,6 +510,7 @@ export class AcpRuntimeManager { let record: SessionRecord | null = null; let conversation: ReturnType | null = null; let acpxState: ReturnType; + let liveCheckpoint: LiveSessionCheckpoint | undefined; let client: AcpClient | null = null; try { record = await this.requireRecord(input.handle.acpxRecordId ?? input.handle.sessionKey); @@ -537,6 +539,14 @@ export class AcpRuntimeManager { const runtimeClient = client; const runtimeConversation = conversation; const runtimeRecord = record; + liveCheckpoint = new LiveSessionCheckpoint({ + save: async () => { + runtimeRecord.lastUsedAt = isoNow(); + runtimeRecord.acpx = acpxState; + applyConversation(runtimeRecord, runtimeConversation); + await this.options.sessionStore.save(runtimeRecord); + }, + }); let activeSessionId = record.acpSessionId; const applyPendingCancel = async (): Promise => { @@ -623,6 +633,7 @@ export class AcpRuntimeManager { onSessionUpdate: (notification) => { acpxState = recordSessionUpdate(runtimeConversation, acpxState, notification); trimConversationForRuntime(runtimeConversation); + liveCheckpoint?.request(); emitParsed({ jsonrpc: "2.0", method: "session/update", @@ -632,6 +643,7 @@ export class AcpRuntimeManager { onClientOperation: (operation: ClientOperation) => { acpxState = recordClientOperation(runtimeConversation, acpxState, operation); trimConversationForRuntime(runtimeConversation); + liveCheckpoint?.request(); emitParsed({ type: "client_operation", ...operation, @@ -663,12 +675,12 @@ export class AcpRuntimeManager { }, }); sessionReady.resolve(); - runtimeRecord.lastRequestId = input.requestId; runtimeRecord.lastPromptAt = isoNow(); runtimeRecord.closed = false; runtimeRecord.closedAt = undefined; runtimeRecord.lastUsedAt = isoNow(); + await liveCheckpoint.checkpoint(); if (resumed || loadError) { emitParsed({ type: "status", @@ -735,6 +747,7 @@ export class AcpRuntimeManager { record.acpx = acpxState; applyConversation(record, conversation); record.lastUsedAt = isoNow(); + await liveCheckpoint?.flush().catch(() => {}); const closed = await this.refreshClosedState(record); await this.options.sessionStore.save(record).catch(() => {}); if (!closed && client) { diff --git a/src/session/live-checkpoint.ts b/src/session/live-checkpoint.ts new file mode 100644 index 00000000..9acc0ae1 --- /dev/null +++ b/src/session/live-checkpoint.ts @@ -0,0 +1,70 @@ +const DEFAULT_LIVE_CHECKPOINT_INTERVAL_MS = 500; + +export type LiveSessionCheckpointOptions = { + save: () => Promise; + intervalMs?: number; + onError?: (error: unknown) => void; +}; + +export class LiveSessionCheckpoint { + private readonly save: () => Promise; + private readonly intervalMs: number; + private readonly onError: ((error: unknown) => void) | undefined; + private dirty = false; + private flushing: Promise | undefined; + private timer: ReturnType | undefined; + + constructor(options: LiveSessionCheckpointOptions) { + this.save = options.save; + this.intervalMs = options.intervalMs ?? DEFAULT_LIVE_CHECKPOINT_INTERVAL_MS; + this.onError = options.onError; + } + + request(): void { + this.dirty = true; + if (this.timer) { + return; + } + + this.timer = setTimeout(() => { + this.timer = undefined; + void this.flush().catch((error: unknown) => { + this.onError?.(error); + }); + }, this.intervalMs); + this.timer.unref?.(); + } + + async checkpoint(): Promise { + this.dirty = true; + await this.flush(); + } + + async flush(): Promise { + if (this.timer) { + clearTimeout(this.timer); + this.timer = undefined; + } + + if (this.flushing) { + await this.flushing; + if (!this.dirty) { + return; + } + } + + this.flushing = this.flushDirty(); + try { + await this.flushing; + } finally { + this.flushing = undefined; + } + } + + private async flushDirty(): Promise { + while (this.dirty) { + this.dirty = false; + await this.save(); + } + } +} diff --git a/test/integration.test.ts b/test/integration.test.ts index 862fc6e2..2f665613 100644 --- a/test/integration.test.ts +++ b/test/integration.test.ts @@ -1414,7 +1414,7 @@ test("integration: perf metrics capture checkpoints queue-owner turns before own ); }); assert(queueOwnerRecord, "expected queue owner checkpoint record before owner exit"); - assert.equal(readPerfTimingCount(queueOwnerRecord, "session.write_record"), 2); + assert.equal((readPerfTimingCount(queueOwnerRecord, "session.write_record") ?? 0) >= 2, true); const status = await runCli([...baseAgentArgs(cwd), "--format", "json", "status"], homeDir); assert.equal(status.code, 0, status.stderr); @@ -2984,6 +2984,122 @@ test("integration: sessions history shows in-flight prompt after prompt starts", }); }); +test("integration: sessions read shows assistant updates before the prompt finishes", async () => { + await withTempHome(async (homeDir) => { + const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); + + try { + const created = await runCli( + [...baseAgentArgs(cwd), "--format", "json", "sessions", "new"], + homeDir, + ); + assert.equal(created.code, 0, created.stderr); + + const promptChild = spawn( + process.execPath, + [ + CLI_PATH, + ...baseAgentArgs(cwd), + "--format", + "quiet", + "prompt", + "stream-sleep 2500 foreground-live-update", + ], + { + env: { + ...process.env, + HOME: homeDir, + }, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + + try { + const history = await waitFor(async () => { + const result = await runCli( + [...baseAgentArgs(cwd), "--format", "quiet", "sessions", "read"], + homeDir, + ); + assert.equal(result.code, 0, result.stderr); + return result.stdout.includes("foreground-live-update") ? result.stdout : null; + }, 5_000); + + assert.equal(promptChild.exitCode, null, "prompt should still be running"); + assert.match(history, /foreground-live-update/); + assert.doesNotMatch(history, /stream-sleep done/); + + const promptResult = await awaitChildClose(promptChild); + assert.equal(promptResult.code, 0, promptResult.stderr); + assert.match(promptResult.stdout, /stream-sleep done: foreground-live-update/); + } finally { + if (promptChild.exitCode == null && promptChild.signalCode == null) { + promptChild.kill("SIGKILL"); + await awaitChildClose(promptChild).catch(() => {}); + } + } + } finally { + await fs.rm(cwd, { recursive: true, force: true }); + } + }); +}); + +test("integration: --no-wait stdin prompt checkpoints live assistant updates", async () => { + await withTempHome(async (homeDir) => { + const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); + + try { + const created = await runCli( + [...baseAgentArgs(cwd), "--format", "json", "sessions", "new"], + homeDir, + ); + assert.equal(created.code, 0, created.stderr); + + const queued = await runCli( + [ + ...baseAgentArgs(cwd), + "--format", + "json", + "--ttl", + "5", + "prompt", + "--no-wait", + "--file", + "-", + ], + homeDir, + { + stdin: "stream-sleep 5000 background-live-update", + }, + ); + assert.equal(queued.code, 0, queued.stderr); + const queuedPayload = JSON.parse(queued.stdout.trim()) as { + action?: string; + }; + assert.equal(queuedPayload.action, "prompt_queued"); + + const history = await waitFor(async () => { + const result = await runCli( + [...baseAgentArgs(cwd), "--format", "quiet", "sessions", "read"], + homeDir, + ); + assert.equal(result.code, 0, result.stderr); + return result.stdout.includes("background-live-update") ? result.stdout : null; + }, 5_000); + + assert.match(history, /background-live-update/); + assert.doesNotMatch(history, /stream-sleep done/); + + const closed = await runCli( + [...baseAgentArgs(cwd), "--format", "json", "sessions", "close"], + homeDir, + ); + assert.equal(closed.code, 0, closed.stderr); + } finally { + await fs.rm(cwd, { recursive: true, force: true }); + } + }); +}); + test("integration: session remains resumable after queue owner exits and agent has exited", async () => { await withTempHome(async (homeDir) => { const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); @@ -3294,7 +3410,7 @@ async function runCliWithEntry( ...options.env, }, cwd: options.cwd, - stdio: ["ignore", "pipe", "pipe"], + stdio: ["pipe", "pipe", "pipe"], }); let stdout = ""; @@ -3315,6 +3431,12 @@ async function runCliWithEntry( stderr += chunk; }); + if (options.stdin != null) { + child.stdin.end(options.stdin); + } else { + child.stdin.end(); + } + child.once("error", (error) => { clearTimeout(timer); reject(error); diff --git a/test/mock-agent.ts b/test/mock-agent.ts index c9f0d921..dee515e0 100644 --- a/test/mock-agent.ts +++ b/test/mock-agent.ts @@ -1012,6 +1012,25 @@ class MockAgent implements Agent { return `slept ${Math.round(ms)}ms`; } + if (text.startsWith("stream-sleep ")) { + const rest = text.slice("stream-sleep ".length).trim(); + const firstSpace = rest.search(/\s/); + if (firstSpace <= 0) { + throw new Error("Usage: stream-sleep "); + } + + const rawMs = rest.slice(0, firstSpace).trim(); + const liveText = rest.slice(firstSpace + 1).trim(); + const ms = Number(rawMs); + if (!Number.isFinite(ms) || ms < 0 || liveText.length === 0) { + throw new Error("Usage: stream-sleep "); + } + + await this.sendAssistantMessage(sessionId, liveText); + await sleepWithCancel(Math.round(ms), signal); + return `stream-sleep done: ${liveText}`; + } + if (text.startsWith("disconnect ")) { const rawMs = text.slice("disconnect ".length).trim(); if (!rawMs) { From 27d6a221c0065f67bd2f8eba729f3170636ce533 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 05:35:01 +0100 Subject: [PATCH 2/2] fix: preserve close state during live checkpoints --- CHANGELOG.md | 1 + src/cli/session/runtime.ts | 20 +++++++ src/runtime/engine/manager.ts | 1 + test/integration.test.ts | 107 ++++++++++++++++++++++++++++++++-- test/runtime-manager.test.ts | 87 +++++++++++++++++++++++++++ 5 files changed, 212 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8c73905..18a2970c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Repo: https://github.com/openclaw/acpx ### Fixes - Runtime/embedding: preserve structured ACP `tool_call_update` details on public runtime events, including content, output, locations, kind, and raw payload fields, so embedders can display live tool progress. (#306) Thanks @joeia26. +- CLI/sessions: checkpoint live assistant and tool updates while prompt turns are still running, so `sessions read` and `sessions history` can show in-flight progress instead of only the submitted prompt. (#314) Thanks @AndroidPoet. ## 2026.5.5 (v0.7.0) diff --git a/src/cli/session/runtime.ts b/src/cli/session/runtime.ts index e9b776bc..eb92a89a 100644 --- a/src/cli/session/runtime.ts +++ b/src/cli/session/runtime.ts @@ -409,12 +409,29 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise => { + const latest = await resolveSessionRecord(record.acpxRecordId).catch(() => undefined); + if (!latest?.closed) { + return; + } + + record.closed = true; + record.closedAt = latest.closedAt ?? record.closedAt ?? isoNow(); + record.pid = latest.pid; + if (latest.acpx) { + record.acpx = { + ...record.acpx, + ...latest.acpx, + }; + } + }; const liveCheckpoint = new LiveSessionCheckpoint({ save: async () => { await flushPendingMessages(false); record.lastUsedAt = isoNow(); applyConversation(record, conversation); record.acpx = acpxState; + await preserveClosedState(); await eventWriter.checkpoint(); }, onError: (error) => { @@ -720,6 +737,9 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise { // best effort on close }); + await preserveClosedState().catch(() => { + // best effort on close + }); await closeEventWriter(true).catch(() => { // best effort on close }); diff --git a/src/runtime/engine/manager.ts b/src/runtime/engine/manager.ts index ab232b3e..a3402826 100644 --- a/src/runtime/engine/manager.ts +++ b/src/runtime/engine/manager.ts @@ -544,6 +544,7 @@ export class AcpRuntimeManager { runtimeRecord.lastUsedAt = isoNow(); runtimeRecord.acpx = acpxState; applyConversation(runtimeRecord, runtimeConversation); + await this.refreshClosedState(runtimeRecord); await this.options.sessionStore.save(runtimeRecord); }, }); diff --git a/test/integration.test.ts b/test/integration.test.ts index 2f665613..e5d045e9 100644 --- a/test/integration.test.ts +++ b/test/integration.test.ts @@ -3017,11 +3017,18 @@ test("integration: sessions read shows assistant updates before the prompt finis try { const history = await waitFor(async () => { const result = await runCli( - [...baseAgentArgs(cwd), "--format", "quiet", "sessions", "read"], + [...baseAgentArgs(cwd), "--format", "json", "sessions", "read"], homeDir, ); assert.equal(result.code, 0, result.stderr); - return result.stdout.includes("foreground-live-update") ? result.stdout : null; + const payload = JSON.parse(result.stdout.trim()) as { + entries?: Array<{ role?: string; textPreview?: string }>; + }; + const assistantEntry = payload.entries?.find( + (entry) => + entry.role === "assistant" && entry.textPreview?.includes("foreground-live-update"), + ); + return assistantEntry ? result.stdout : null; }, 5_000); assert.equal(promptChild.exitCode, null, "prompt should still be running"); @@ -3079,11 +3086,18 @@ test("integration: --no-wait stdin prompt checkpoints live assistant updates", a const history = await waitFor(async () => { const result = await runCli( - [...baseAgentArgs(cwd), "--format", "quiet", "sessions", "read"], + [...baseAgentArgs(cwd), "--format", "json", "sessions", "read"], homeDir, ); assert.equal(result.code, 0, result.stderr); - return result.stdout.includes("background-live-update") ? result.stdout : null; + const payload = JSON.parse(result.stdout.trim()) as { + entries?: Array<{ role?: string; textPreview?: string }>; + }; + const assistantEntry = payload.entries?.find( + (entry) => + entry.role === "assistant" && entry.textPreview?.includes("background-live-update"), + ); + return assistantEntry ? result.stdout : null; }, 5_000); assert.match(history, /background-live-update/); @@ -3100,6 +3114,91 @@ test("integration: --no-wait stdin prompt checkpoints live assistant updates", a }); }); +test("integration: sessions close stays closed after live checkpoints", async () => { + await withTempHome(async (homeDir) => { + const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); + + try { + const created = await runCli( + [...baseAgentArgs(cwd), "--format", "json", "sessions", "new"], + homeDir, + ); + assert.equal(created.code, 0, created.stderr); + const createdPayload = JSON.parse(created.stdout.trim()) as { + acpxRecordId?: string; + }; + const sessionId = createdPayload.acpxRecordId; + assert.equal(typeof sessionId, "string"); + + const promptChild = spawn( + process.execPath, + [ + CLI_PATH, + ...baseAgentArgs(cwd), + "--format", + "quiet", + "prompt", + "stream-sleep 5000 close-live-update", + ], + { + env: { + ...process.env, + HOME: homeDir, + }, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + + try { + await waitFor(async () => { + const result = await runCli( + [...baseAgentArgs(cwd), "--format", "json", "sessions", "read"], + homeDir, + ); + assert.equal(result.code, 0, result.stderr); + const payload = JSON.parse(result.stdout.trim()) as { + entries?: Array<{ role?: string; textPreview?: string }>; + }; + const assistantEntry = payload.entries?.find( + (entry) => + entry.role === "assistant" && entry.textPreview?.includes("close-live-update"), + ); + return assistantEntry ? true : null; + }, 5_000); + + const closed = await runCli( + [...baseAgentArgs(cwd), "--format", "json", "sessions", "close"], + homeDir, + ); + assert.equal(closed.code, 0, closed.stderr); + if (promptChild.exitCode == null && promptChild.signalCode == null) { + await awaitChildClose(promptChild).catch(() => {}); + } + + const recordPath = path.join( + homeDir, + ".acpx", + "sessions", + `${encodeURIComponent(sessionId as string)}.json`, + ); + const storedRecord = JSON.parse(await fs.readFile(recordPath, "utf8")) as { + closed?: boolean; + closed_at?: string; + }; + assert.equal(storedRecord.closed, true); + assert.equal(typeof storedRecord.closed_at, "string"); + } finally { + if (promptChild.exitCode == null && promptChild.signalCode == null) { + promptChild.kill("SIGKILL"); + await awaitChildClose(promptChild).catch(() => {}); + } + } + } finally { + await fs.rm(cwd, { recursive: true, force: true }); + } + }); +}); + test("integration: session remains resumable after queue owner exits and agent has exited", async () => { await withTempHome(async (homeDir) => { const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); diff --git a/test/runtime-manager.test.ts b/test/runtime-manager.test.ts index ee219a30..5fd29c38 100644 --- a/test/runtime-manager.test.ts +++ b/test/runtime-manager.test.ts @@ -802,6 +802,93 @@ test("AcpRuntimeManager does not pool a persistent client after active close", a assert.equal(typeof closed?.closedAt, "string"); }); +test("AcpRuntimeManager live checkpoints preserve active close state", async () => { + const record = makeSessionRecord({ + acpxRecordId: "active-close-checkpoint-session", + acpSessionId: "active-close-checkpoint-sid", + agentCommand: "codex --acp", + cwd: "/workspace", + }); + const store = new InMemorySessionStore([record]); + let handlers: FakeClientHandlers = {}; + let promptActive = false; + let resolvePromptStart!: () => void; + let resolvePrompt!: (value: { stopReason: string }) => void; + const promptStarted = new Promise((resolve) => { + resolvePromptStart = resolve; + }); + const promptResult = new Promise<{ stopReason: string }>((resolve) => { + resolvePrompt = resolve; + }); + const client: FakeClient = { + start: async () => {}, + close: async () => { + promptActive = false; + }, + createSession: async () => ({ sessionId: "unused" }), + loadSession: async () => ({ agentSessionId: "unused" }), + hasReusableSession: (sessionId) => sessionId === "active-close-checkpoint-sid", + supportsLoadSession: () => true, + supportsCloseSession: () => true, + closeSession: async () => {}, + loadSessionWithOptions: async () => ({ agentSessionId: "active-close-checkpoint-agent-id" }), + getAgentLifecycleSnapshot: () => ({ running: promptActive }), + prompt: async () => { + promptActive = true; + handlers.onSessionUpdate?.({ + sessionId: "active-close-checkpoint-sid", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "live checkpoint" }, + }, + }); + resolvePromptStart(); + return await promptResult; + }, + requestCancelActivePrompt: async () => { + promptActive = false; + return true; + }, + hasActivePrompt: () => promptActive, + setSessionMode: async () => {}, + setSessionConfigOption: async () => {}, + clearEventHandlers: () => { + handlers = {}; + }, + setEventHandlers: (nextHandlers) => { + handlers = nextHandlers; + }, + }; + const manager = new AcpRuntimeManager( + createRuntimeOptions({ cwd: "/workspace", sessionStore: store }), + { + clientFactory: () => client as never, + }, + ); + const handle = createHandle("active-close-checkpoint-session"); + + const turn = manager.startTurn({ + handle, + text: "hello", + mode: "prompt", + sessionMode: "persistent", + requestId: "req-active-close-checkpoint", + }); + const eventsPromise = collectEvents(turn.events); + await promptStarted; + + await manager.close(handle, { discardPersistentState: true }); + await new Promise((resolve) => setTimeout(resolve, 650)); + + const checkpointed = await store.load("active-close-checkpoint-session"); + assert.equal(checkpointed?.closed, true); + assert.equal(checkpointed?.acpx?.reset_on_next_ensure, true); + + resolvePrompt({ stopReason: "cancelled" }); + await eventsPromise; + await turn.result; +}); + test("AcpRuntimeManager accepts a session reply even when the prompt RPC times out", async () => { const record = makeSessionRecord({ acpxRecordId: "late-reply-session",