Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Repo: https://github.com/openclaw/acpx
### Fixes

- Runtime/embedding: preserve structured ACP `tool_call_update` details on public runtime events, including content, output, locations, kind, and raw payload fields, so embedders can display live tool progress. (#306) Thanks @joeia26.
- CLI/sessions: checkpoint live assistant and tool updates while prompt turns are still running, so `sessions read` and `sessions history` can show in-flight progress instead of only the submitted prompt. (#314) Thanks @AndroidPoet.

## 2026.5.5 (v0.7.0)

Expand Down
44 changes: 43 additions & 1 deletion src/cli/session/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
trimConversationForRuntime,
} from "../../session/conversation-model.js";
import { SessionEventWriter } from "../../session/events.js";
import { LiveSessionCheckpoint } from "../../session/live-checkpoint.js";
import { setCurrentModelId, setDesiredModelId } from "../../session/mode-preference.js";
import {
absolutePath,
Expand Down Expand Up @@ -408,6 +409,39 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
await eventWriter.appendMessages(batch, { checkpoint });
});
};
const preserveClosedState = async (): Promise<void> => {
const latest = await resolveSessionRecord(record.acpxRecordId).catch(() => undefined);
if (!latest?.closed) {
return;
}

record.closed = true;
record.closedAt = latest.closedAt ?? record.closedAt ?? isoNow();
record.pid = latest.pid;
if (latest.acpx) {
record.acpx = {
...record.acpx,
...latest.acpx,
};
}
};
const liveCheckpoint = new LiveSessionCheckpoint({
save: async () => {
await flushPendingMessages(false);
record.lastUsedAt = isoNow();
applyConversation(record, conversation);
record.acpx = acpxState;
await preserveClosedState();
await eventWriter.checkpoint();
},
onError: (error) => {
if (options.verbose) {
process.stderr.write(
"[acpx] live session checkpoint failed: " + formatErrorMessage(error) + "\n",
);
}
},
});

const ownClient = options.client == null;
const client =
Expand Down Expand Up @@ -451,6 +485,7 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
}
acpxState = recordConversationSessionUpdate(conversation, acpxState, notification);
trimConversationForRuntime(conversation);
liveCheckpoint.request();
options.onSessionUpdate?.(notification);
},
onClientOperation: (operation) => {
Expand All @@ -459,6 +494,7 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
}
acpxState = recordConversationClientOperation(conversation, acpxState, operation);
trimConversationForRuntime(conversation);
liveCheckpoint.request();
options.onClientOperation?.(operation);
},
});
Expand Down Expand Up @@ -541,7 +577,7 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
output.setContext({
sessionId: record.acpxRecordId,
});
await flushPendingMessages(false);
await liveCheckpoint.checkpoint();

const maxRetries = options.promptRetries ?? 0;
let response;
Expand Down Expand Up @@ -695,9 +731,15 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
applyLifecycleSnapshotToRecord(record, client.getAgentLifecycleSnapshot());
applyConversation(record, conversation);
record.acpx = acpxState;
await liveCheckpoint.flush().catch(() => {
// best effort on close
});
await flushPendingMessages(false).catch(() => {
// best effort on close
});
await preserveClosedState().catch(() => {
// best effort on close
});
await closeEventWriter(true).catch(() => {
// best effort on close
});
Expand Down
16 changes: 15 additions & 1 deletion src/runtime/engine/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
trimConversationForRuntime,
} from "../../session/conversation-model.js";
import { defaultSessionEventLog } from "../../session/event-log.js";
import { LiveSessionCheckpoint } from "../../session/live-checkpoint.js";
import { setDesiredConfigOption, setDesiredModeId } from "../../session/mode-preference.js";
import type { ClientOperation, SessionRecord, SessionResumePolicy } from "../../types.js";
import type {
Expand Down Expand Up @@ -509,6 +510,7 @@ export class AcpRuntimeManager {
let record: SessionRecord | null = null;
let conversation: ReturnType<typeof cloneSessionConversation> | null = null;
let acpxState: ReturnType<typeof cloneSessionAcpxState>;
let liveCheckpoint: LiveSessionCheckpoint | undefined;
let client: AcpClient | null = null;
try {
record = await this.requireRecord(input.handle.acpxRecordId ?? input.handle.sessionKey);
Expand Down Expand Up @@ -537,6 +539,15 @@ export class AcpRuntimeManager {
const runtimeClient = client;
const runtimeConversation = conversation;
const runtimeRecord = record;
liveCheckpoint = new LiveSessionCheckpoint({
save: async () => {
runtimeRecord.lastUsedAt = isoNow();
runtimeRecord.acpx = acpxState;
applyConversation(runtimeRecord, runtimeConversation);
await this.refreshClosedState(runtimeRecord);
await this.options.sessionStore.save(runtimeRecord);
},
});
let activeSessionId = record.acpSessionId;

const applyPendingCancel = async (): Promise<boolean> => {
Expand Down Expand Up @@ -623,6 +634,7 @@ export class AcpRuntimeManager {
onSessionUpdate: (notification) => {
acpxState = recordSessionUpdate(runtimeConversation, acpxState, notification);
trimConversationForRuntime(runtimeConversation);
liveCheckpoint?.request();
emitParsed({
jsonrpc: "2.0",
method: "session/update",
Expand All @@ -632,6 +644,7 @@ export class AcpRuntimeManager {
onClientOperation: (operation: ClientOperation) => {
acpxState = recordClientOperation(runtimeConversation, acpxState, operation);
trimConversationForRuntime(runtimeConversation);
liveCheckpoint?.request();
emitParsed({
type: "client_operation",
...operation,
Expand Down Expand Up @@ -663,12 +676,12 @@ export class AcpRuntimeManager {
},
});
sessionReady.resolve();

runtimeRecord.lastRequestId = input.requestId;
runtimeRecord.lastPromptAt = isoNow();
runtimeRecord.closed = false;
runtimeRecord.closedAt = undefined;
runtimeRecord.lastUsedAt = isoNow();
await liveCheckpoint.checkpoint();
if (resumed || loadError) {
emitParsed({
type: "status",
Expand Down Expand Up @@ -735,6 +748,7 @@ export class AcpRuntimeManager {
record.acpx = acpxState;
applyConversation(record, conversation);
record.lastUsedAt = isoNow();
await liveCheckpoint?.flush().catch(() => {});
const closed = await this.refreshClosedState(record);
await this.options.sessionStore.save(record).catch(() => {});
if (!closed && client) {
Expand Down
70 changes: 70 additions & 0 deletions src/session/live-checkpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
const DEFAULT_LIVE_CHECKPOINT_INTERVAL_MS = 500;

export type LiveSessionCheckpointOptions = {
save: () => Promise<void>;
intervalMs?: number;
onError?: (error: unknown) => void;
};

export class LiveSessionCheckpoint {
private readonly save: () => Promise<void>;
private readonly intervalMs: number;
private readonly onError: ((error: unknown) => void) | undefined;
private dirty = false;
private flushing: Promise<void> | undefined;
private timer: ReturnType<typeof setTimeout> | undefined;

constructor(options: LiveSessionCheckpointOptions) {
this.save = options.save;
this.intervalMs = options.intervalMs ?? DEFAULT_LIVE_CHECKPOINT_INTERVAL_MS;
this.onError = options.onError;
}

request(): void {
this.dirty = true;
if (this.timer) {
return;
}

this.timer = setTimeout(() => {
this.timer = undefined;
void this.flush().catch((error: unknown) => {
this.onError?.(error);
});
}, this.intervalMs);
this.timer.unref?.();
}

async checkpoint(): Promise<void> {
this.dirty = true;
await this.flush();
}

async flush(): Promise<void> {
if (this.timer) {
clearTimeout(this.timer);
this.timer = undefined;
}

if (this.flushing) {
await this.flushing;
if (!this.dirty) {
return;
}
}

this.flushing = this.flushDirty();
try {
await this.flushing;
} finally {
this.flushing = undefined;
}
}

private async flushDirty(): Promise<void> {
while (this.dirty) {
this.dirty = false;
await this.save();
}
}
}
Loading