diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index a161b79086e..4163d0aa43f 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -47,6 +47,9 @@ export namespace ACP { private config: ACPConfig private sdk: OpencodeClient private sessionManager + private disposed = false + /** AbortControllers for each session's event subscription */ + private sessionAbortControllers = new Map() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection @@ -55,273 +58,343 @@ export namespace ACP { this.sessionManager = new ACPSessionManager(this.sdk) } + /** + * Clean up a specific session's event subscription. + */ + private cleanupSession(sessionId: string) { + const controller = this.sessionAbortControllers.get(sessionId) + if (controller) { + controller.abort() + this.sessionAbortControllers.delete(sessionId) + log.info("cleaned up session subscription", { sessionId }) + } + } + + /** + * Dispose of all resources held by this agent. + * Should be called when the ACP connection ends. + */ + dispose() { + if (this.disposed) return + this.disposed = true + + log.info("disposing agent", { sessionCount: this.sessionManager.size() }) + + // Abort all event subscriptions + for (const [sessionId, controller] of this.sessionAbortControllers) { + controller.abort() + log.info("aborted session subscription", { sessionId }) + } + this.sessionAbortControllers.clear() + + // Clear all session state + this.sessionManager.clear() + + log.info("agent disposed") + } + private setupEventSubscriptions(session: ACPSessionState) { const sessionId = session.id const directory = session.cwd + // Clean up any existing subscription for this session + this.cleanupSession(sessionId) + + // Create abort controller for this session + const controller = new AbortController() + this.sessionAbortControllers.set(sessionId, controller) + const options: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { - for await (const event of events.stream) { - switch (event.type) { - case "permission.asked": - try { - const permission = event.properties - const res = await this.connection - .requestPermission({ - sessionId, - toolCall: { - toolCallId: permission.tool?.callID ?? permission.id, - status: "pending", - title: permission.permission, - rawInput: permission.metadata, - kind: toToolKind(permission.permission), - locations: toLocations(permission.permission, permission.metadata), - }, - options, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, + this.config.sdk.event + .subscribe({ directory }, { signal: controller.signal }) + .then(async (events) => { + for await (const event of events.stream) { + // Check if we should stop processing + if (controller.signal.aborted || this.disposed) { + log.info("event subscription stopped", { + sessionId, + aborted: controller.signal.aborted, + disposed: this.disposed, + }) + break + } + switch (event.type) { + case "permission.asked": + if (this.disposed) break + try { + const permission = event.properties + const res = await this.connection + .requestPermission({ + sessionId, + toolCall: { + toolCallId: permission.tool?.callID ?? permission.id, + status: "pending", + title: permission.permission, + rawInput: permission.metadata, + kind: toToolKind(permission.permission), + locations: toLocations(permission.permission, permission.metadata), + }, + options, }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + await this.config.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }) + return + }) + if (!res) return + if (this.disposed) return + if (res.outcome.outcome !== "selected") { await this.config.sdk.permission.reply({ requestID: permission.id, reply: "reject", directory, }) return - }) - if (!res) return - if (res.outcome.outcome !== "selected") { + } await this.config.sdk.permission.reply({ requestID: permission.id, - reply: "reject", + reply: res.outcome.optionId as "once" | "always" | "reject", directory, }) - return + } catch (err) { + log.error("unexpected error when handling permission", { error: err }) } - await this.config.sdk.permission.reply({ - requestID: permission.id, - reply: res.outcome.optionId as "once" | "always" | "reject", - directory, - }) - } catch (err) { - log.error("unexpected error when handling permission", { error: err }) - } finally { break - } - case "message.part.updated": - log.info("message part updated", { event: event.properties }) - try { - const props = event.properties - const { part } = props - - const message = await this.config.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined - }) + case "message.part.updated": + if (this.disposed) break + log.info("message part updated", { event: event.properties }) + try { + const props = event.properties + const { part } = props - if (!message || message.info.role !== "assistant") return + const message = await this.config.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((err) => { + log.error("unexpected error when fetching message", { error: err }) + return undefined + }) - if (part.type === "tool") { - switch (part.state.status) { - case "pending": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((err) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - break - case "running": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - break - case "completed": - const kind = toToolKind(part.tool) - const content: ToolCallContent[] = [ - { - type: "content", - content: { - type: "text", - text: part.state.output, + if (!message || message.info.role !== "assistant") return + if (this.disposed) return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, + }, + }) + .catch((err) => { + log.error("failed to send tool pending to ACP", { error: err }) + }) + break + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((err) => { + log.error("failed to send tool in_progress to ACP", { error: err }) + }) + break + case "completed": + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, + }, }, - }, - ] - - if (kind === "edit") { - const input = part.state.input - const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" - const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" - const newText = - typeof input["newString"] === "string" - ? input["newString"] - : typeof input["content"] === "string" - ? input["content"] - : "" - content.push({ - type: "diff", - path: filePath, - oldText, - newText, - }) - } - - if (part.tool === "todowrite") { - const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) - if (parsedTodos.success) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.data.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) - } else { - log.error("failed to parse todo output", { error: parsedTodos.error }) + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, + }) + } + + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), + }, + }) + .catch((err) => { + log.error("failed to send session update for todo", { error: err }) + }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) + } } - } + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool completed to ACP", { error: err }) + }) + break + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, + }, + }, + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool error to ACP", { error: err }) + }) + break + } + } else if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawOutput: { - output: part.state.output, - metadata: part.state.metadata, + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) + log.error("failed to send text to ACP", { error: err }) }) - break - case "error": + } + } else if (part.type === "reasoning") { + const delta = props.delta + if (delta) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, - }, - ], - rawOutput: { - error: part.state.error, + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) + log.error("failed to send reasoning to ACP", { error: err }) }) - break - } - } else if (part.type === "text") { - const delta = props.delta - if (delta && part.synthetic !== true) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) - }) - } - } else if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) + } } + } catch (err) { + log.error("unexpected error when handling message part update", { error: err }) } - } finally { break - } + } } - } - }) + }) + .catch((err) => { + if (!controller.signal.aborted) { + log.error("error in event subscription", { sessionId, error: err }) + } + }) + .finally(() => { + // Cleanup controller reference when stream ends, but only if it's the same controller + // to prevent race condition with re-subscriptions + const current = this.sessionAbortControllers.get(sessionId) + if (current === controller) { + this.sessionAbortControllers.delete(sessionId) + } + }) } async initialize(params: InitializeRequest): Promise { diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 70b65834705..e46174057cf 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -98,4 +98,40 @@ export class ACPSessionManager { this.sessions.set(sessionId, session) return session } + + /** + * Remove a session from the manager to free memory. + * Should be called when a session is terminated or the connection closes. + */ + remove(sessionId: string): boolean { + const existed = this.sessions.has(sessionId) + if (existed) { + this.sessions.delete(sessionId) + log.info("removed_session", { sessionId }) + } + return existed + } + + /** + * Get the count of active sessions (useful for monitoring/debugging). + */ + size(): number { + return this.sessions.size + } + + /** + * Get all session IDs (useful for cleanup). + */ + sessionIds(): string[] { + return Array.from(this.sessions.keys()) + } + + /** + * Clear all sessions. Used during cleanup/dispose. + */ + clear(): void { + const count = this.sessions.size + this.sessions.clear() + log.info("cleared_all_sessions", { count }) + } } diff --git a/packages/opencode/src/cli/cmd/acp.ts b/packages/opencode/src/cli/cmd/acp.ts index 30e919d999a..1994da4139e 100644 --- a/packages/opencode/src/cli/cmd/acp.ts +++ b/packages/opencode/src/cli/cmd/acp.ts @@ -52,10 +52,14 @@ export const AcpCommand = cmd({ }) const stream = ndJsonStream(input, output) - const agent = await ACP.init({ sdk }) + const acpFactory = await ACP.init({ sdk }) + + // Track the agent instance so we can dispose it when connection ends + let agentInstance: ReturnType | undefined new AgentSideConnection((conn) => { - return agent.create(conn, { sdk }) + agentInstance = acpFactory.create(conn, { sdk }) + return agentInstance }, stream) log.info("setup connection") @@ -64,6 +68,12 @@ export const AcpCommand = cmd({ process.stdin.on("end", resolve) process.stdin.on("error", reject) }) + + // Clean up agent resources when connection ends + if (agentInstance) { + log.info("disposing agent on connection end") + agentInstance.dispose() + } }) }, }) diff --git a/packages/opencode/test/acp/session.test.ts b/packages/opencode/test/acp/session.test.ts new file mode 100644 index 00000000000..40df144b2e9 --- /dev/null +++ b/packages/opencode/test/acp/session.test.ts @@ -0,0 +1,108 @@ +import { describe, expect, test } from "bun:test" +import { ACPSessionManager } from "../../src/acp/session" + +describe("ACPSessionManager", () => { + // Create a mock SDK + const createMockSdk = (sessionId: string = "test-session-1") => + ({ + session: { + create: async () => ({ data: { id: sessionId } }), + get: async () => ({ data: { id: sessionId, time: { created: new Date().toISOString() } } }), + }, + }) as any + + describe("remove", () => { + test("removes existing session and returns true", async () => { + const manager = new ACPSessionManager(createMockSdk()) + + await manager.create("/test/path", [], undefined) + expect(manager.size()).toBe(1) + + const result = manager.remove("test-session-1") + expect(result).toBe(true) + expect(manager.size()).toBe(0) + }) + + test("returns false for non-existent session", () => { + const manager = new ACPSessionManager(createMockSdk()) + + const result = manager.remove("non-existent") + expect(result).toBe(false) + }) + }) + + describe("size", () => { + test("returns correct count of sessions", async () => { + let counter = 0 + const sdk = { + session: { + create: async () => ({ data: { id: `session-${counter++}` } }), + get: async (params: any) => ({ + data: { id: params.sessionID, time: { created: new Date().toISOString() } }, + }), + }, + } as any + + const manager = new ACPSessionManager(sdk) + expect(manager.size()).toBe(0) + + await manager.create("/path1", [], undefined) + expect(manager.size()).toBe(1) + + await manager.create("/path2", [], undefined) + expect(manager.size()).toBe(2) + }) + }) + + describe("sessionIds", () => { + test("returns all session IDs", async () => { + let counter = 0 + const sdk = { + session: { + create: async () => ({ data: { id: `session-${counter++}` } }), + get: async (params: any) => ({ + data: { id: params.sessionID, time: { created: new Date().toISOString() } }, + }), + }, + } as any + + const manager = new ACPSessionManager(sdk) + + await manager.create("/path1", [], undefined) + await manager.create("/path2", [], undefined) + await manager.create("/path3", [], undefined) + + const ids = manager.sessionIds() + expect(ids).toHaveLength(3) + expect(ids).toContain("session-0") + expect(ids).toContain("session-1") + expect(ids).toContain("session-2") + }) + }) + + describe("clear", () => { + test("removes all sessions", async () => { + let counter = 0 + const sdk = { + session: { + create: async () => ({ data: { id: `session-${counter++}` } }), + get: async (params: any) => ({ + data: { id: params.sessionID, time: { created: new Date().toISOString() } }, + }), + }, + } as any + + const manager = new ACPSessionManager(sdk) + + await manager.create("/path1", [], undefined) + await manager.create("/path2", [], undefined) + await manager.create("/path3", [], undefined) + + expect(manager.size()).toBe(3) + + manager.clear() + expect(manager.size()).toBe(0) + expect(manager.sessionIds()).toEqual([]) + }) + }) +})