Skip to content

Commit 27d6a22

Browse files
committed
fix: preserve close state during live checkpoints
1 parent a2e50ab commit 27d6a22

5 files changed

Lines changed: 212 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Repo: https://github.com/openclaw/acpx
1313
### Fixes
1414

1515
- Runtime/embedding: preserve structured ACP `tool_call_update` details on public runtime events, including content, output, locations, kind, and raw payload fields, so embedders can display live tool progress. (#306) Thanks @joeia26.
16+
- CLI/sessions: checkpoint live assistant and tool updates while prompt turns are still running, so `sessions read` and `sessions history` can show in-flight progress instead of only the submitted prompt. (#314) Thanks @AndroidPoet.
1617

1718
## 2026.5.5 (v0.7.0)
1819

src/cli/session/runtime.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,29 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
409409
await eventWriter.appendMessages(batch, { checkpoint });
410410
});
411411
};
412+
const preserveClosedState = async (): Promise<void> => {
413+
const latest = await resolveSessionRecord(record.acpxRecordId).catch(() => undefined);
414+
if (!latest?.closed) {
415+
return;
416+
}
417+
418+
record.closed = true;
419+
record.closedAt = latest.closedAt ?? record.closedAt ?? isoNow();
420+
record.pid = latest.pid;
421+
if (latest.acpx) {
422+
record.acpx = {
423+
...record.acpx,
424+
...latest.acpx,
425+
};
426+
}
427+
};
412428
const liveCheckpoint = new LiveSessionCheckpoint({
413429
save: async () => {
414430
await flushPendingMessages(false);
415431
record.lastUsedAt = isoNow();
416432
applyConversation(record, conversation);
417433
record.acpx = acpxState;
434+
await preserveClosedState();
418435
await eventWriter.checkpoint();
419436
},
420437
onError: (error) => {
@@ -720,6 +737,9 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
720737
await flushPendingMessages(false).catch(() => {
721738
// best effort on close
722739
});
740+
await preserveClosedState().catch(() => {
741+
// best effort on close
742+
});
723743
await closeEventWriter(true).catch(() => {
724744
// best effort on close
725745
});

src/runtime/engine/manager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ export class AcpRuntimeManager {
544544
runtimeRecord.lastUsedAt = isoNow();
545545
runtimeRecord.acpx = acpxState;
546546
applyConversation(runtimeRecord, runtimeConversation);
547+
await this.refreshClosedState(runtimeRecord);
547548
await this.options.sessionStore.save(runtimeRecord);
548549
},
549550
});

test/integration.test.ts

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3017,11 +3017,18 @@ test("integration: sessions read shows assistant updates before the prompt finis
30173017
try {
30183018
const history = await waitFor(async () => {
30193019
const result = await runCli(
3020-
[...baseAgentArgs(cwd), "--format", "quiet", "sessions", "read"],
3020+
[...baseAgentArgs(cwd), "--format", "json", "sessions", "read"],
30213021
homeDir,
30223022
);
30233023
assert.equal(result.code, 0, result.stderr);
3024-
return result.stdout.includes("foreground-live-update") ? result.stdout : null;
3024+
const payload = JSON.parse(result.stdout.trim()) as {
3025+
entries?: Array<{ role?: string; textPreview?: string }>;
3026+
};
3027+
const assistantEntry = payload.entries?.find(
3028+
(entry) =>
3029+
entry.role === "assistant" && entry.textPreview?.includes("foreground-live-update"),
3030+
);
3031+
return assistantEntry ? result.stdout : null;
30253032
}, 5_000);
30263033

30273034
assert.equal(promptChild.exitCode, null, "prompt should still be running");
@@ -3079,11 +3086,18 @@ test("integration: --no-wait stdin prompt checkpoints live assistant updates", a
30793086

30803087
const history = await waitFor(async () => {
30813088
const result = await runCli(
3082-
[...baseAgentArgs(cwd), "--format", "quiet", "sessions", "read"],
3089+
[...baseAgentArgs(cwd), "--format", "json", "sessions", "read"],
30833090
homeDir,
30843091
);
30853092
assert.equal(result.code, 0, result.stderr);
3086-
return result.stdout.includes("background-live-update") ? result.stdout : null;
3093+
const payload = JSON.parse(result.stdout.trim()) as {
3094+
entries?: Array<{ role?: string; textPreview?: string }>;
3095+
};
3096+
const assistantEntry = payload.entries?.find(
3097+
(entry) =>
3098+
entry.role === "assistant" && entry.textPreview?.includes("background-live-update"),
3099+
);
3100+
return assistantEntry ? result.stdout : null;
30873101
}, 5_000);
30883102

30893103
assert.match(history, /background-live-update/);
@@ -3100,6 +3114,91 @@ test("integration: --no-wait stdin prompt checkpoints live assistant updates", a
31003114
});
31013115
});
31023116

3117+
test("integration: sessions close stays closed after live checkpoints", async () => {
3118+
await withTempHome(async (homeDir) => {
3119+
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-"));
3120+
3121+
try {
3122+
const created = await runCli(
3123+
[...baseAgentArgs(cwd), "--format", "json", "sessions", "new"],
3124+
homeDir,
3125+
);
3126+
assert.equal(created.code, 0, created.stderr);
3127+
const createdPayload = JSON.parse(created.stdout.trim()) as {
3128+
acpxRecordId?: string;
3129+
};
3130+
const sessionId = createdPayload.acpxRecordId;
3131+
assert.equal(typeof sessionId, "string");
3132+
3133+
const promptChild = spawn(
3134+
process.execPath,
3135+
[
3136+
CLI_PATH,
3137+
...baseAgentArgs(cwd),
3138+
"--format",
3139+
"quiet",
3140+
"prompt",
3141+
"stream-sleep 5000 close-live-update",
3142+
],
3143+
{
3144+
env: {
3145+
...process.env,
3146+
HOME: homeDir,
3147+
},
3148+
stdio: ["ignore", "pipe", "pipe"],
3149+
},
3150+
);
3151+
3152+
try {
3153+
await waitFor(async () => {
3154+
const result = await runCli(
3155+
[...baseAgentArgs(cwd), "--format", "json", "sessions", "read"],
3156+
homeDir,
3157+
);
3158+
assert.equal(result.code, 0, result.stderr);
3159+
const payload = JSON.parse(result.stdout.trim()) as {
3160+
entries?: Array<{ role?: string; textPreview?: string }>;
3161+
};
3162+
const assistantEntry = payload.entries?.find(
3163+
(entry) =>
3164+
entry.role === "assistant" && entry.textPreview?.includes("close-live-update"),
3165+
);
3166+
return assistantEntry ? true : null;
3167+
}, 5_000);
3168+
3169+
const closed = await runCli(
3170+
[...baseAgentArgs(cwd), "--format", "json", "sessions", "close"],
3171+
homeDir,
3172+
);
3173+
assert.equal(closed.code, 0, closed.stderr);
3174+
if (promptChild.exitCode == null && promptChild.signalCode == null) {
3175+
await awaitChildClose(promptChild).catch(() => {});
3176+
}
3177+
3178+
const recordPath = path.join(
3179+
homeDir,
3180+
".acpx",
3181+
"sessions",
3182+
`${encodeURIComponent(sessionId as string)}.json`,
3183+
);
3184+
const storedRecord = JSON.parse(await fs.readFile(recordPath, "utf8")) as {
3185+
closed?: boolean;
3186+
closed_at?: string;
3187+
};
3188+
assert.equal(storedRecord.closed, true);
3189+
assert.equal(typeof storedRecord.closed_at, "string");
3190+
} finally {
3191+
if (promptChild.exitCode == null && promptChild.signalCode == null) {
3192+
promptChild.kill("SIGKILL");
3193+
await awaitChildClose(promptChild).catch(() => {});
3194+
}
3195+
}
3196+
} finally {
3197+
await fs.rm(cwd, { recursive: true, force: true });
3198+
}
3199+
});
3200+
});
3201+
31033202
test("integration: session remains resumable after queue owner exits and agent has exited", async () => {
31043203
await withTempHome(async (homeDir) => {
31053204
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-"));

test/runtime-manager.test.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,93 @@ test("AcpRuntimeManager does not pool a persistent client after active close", a
802802
assert.equal(typeof closed?.closedAt, "string");
803803
});
804804

805+
test("AcpRuntimeManager live checkpoints preserve active close state", async () => {
806+
const record = makeSessionRecord({
807+
acpxRecordId: "active-close-checkpoint-session",
808+
acpSessionId: "active-close-checkpoint-sid",
809+
agentCommand: "codex --acp",
810+
cwd: "/workspace",
811+
});
812+
const store = new InMemorySessionStore([record]);
813+
let handlers: FakeClientHandlers = {};
814+
let promptActive = false;
815+
let resolvePromptStart!: () => void;
816+
let resolvePrompt!: (value: { stopReason: string }) => void;
817+
const promptStarted = new Promise<void>((resolve) => {
818+
resolvePromptStart = resolve;
819+
});
820+
const promptResult = new Promise<{ stopReason: string }>((resolve) => {
821+
resolvePrompt = resolve;
822+
});
823+
const client: FakeClient = {
824+
start: async () => {},
825+
close: async () => {
826+
promptActive = false;
827+
},
828+
createSession: async () => ({ sessionId: "unused" }),
829+
loadSession: async () => ({ agentSessionId: "unused" }),
830+
hasReusableSession: (sessionId) => sessionId === "active-close-checkpoint-sid",
831+
supportsLoadSession: () => true,
832+
supportsCloseSession: () => true,
833+
closeSession: async () => {},
834+
loadSessionWithOptions: async () => ({ agentSessionId: "active-close-checkpoint-agent-id" }),
835+
getAgentLifecycleSnapshot: () => ({ running: promptActive }),
836+
prompt: async () => {
837+
promptActive = true;
838+
handlers.onSessionUpdate?.({
839+
sessionId: "active-close-checkpoint-sid",
840+
update: {
841+
sessionUpdate: "agent_message_chunk",
842+
content: { type: "text", text: "live checkpoint" },
843+
},
844+
});
845+
resolvePromptStart();
846+
return await promptResult;
847+
},
848+
requestCancelActivePrompt: async () => {
849+
promptActive = false;
850+
return true;
851+
},
852+
hasActivePrompt: () => promptActive,
853+
setSessionMode: async () => {},
854+
setSessionConfigOption: async () => {},
855+
clearEventHandlers: () => {
856+
handlers = {};
857+
},
858+
setEventHandlers: (nextHandlers) => {
859+
handlers = nextHandlers;
860+
},
861+
};
862+
const manager = new AcpRuntimeManager(
863+
createRuntimeOptions({ cwd: "/workspace", sessionStore: store }),
864+
{
865+
clientFactory: () => client as never,
866+
},
867+
);
868+
const handle = createHandle("active-close-checkpoint-session");
869+
870+
const turn = manager.startTurn({
871+
handle,
872+
text: "hello",
873+
mode: "prompt",
874+
sessionMode: "persistent",
875+
requestId: "req-active-close-checkpoint",
876+
});
877+
const eventsPromise = collectEvents(turn.events);
878+
await promptStarted;
879+
880+
await manager.close(handle, { discardPersistentState: true });
881+
await new Promise((resolve) => setTimeout(resolve, 650));
882+
883+
const checkpointed = await store.load("active-close-checkpoint-session");
884+
assert.equal(checkpointed?.closed, true);
885+
assert.equal(checkpointed?.acpx?.reset_on_next_ensure, true);
886+
887+
resolvePrompt({ stopReason: "cancelled" });
888+
await eventsPromise;
889+
await turn.result;
890+
});
891+
805892
test("AcpRuntimeManager accepts a session reply even when the prompt RPC times out", async () => {
806893
const record = makeSessionRecord({
807894
acpxRecordId: "late-reply-session",

0 commit comments

Comments
 (0)