From 8142552eeeddc95e213abb85f2b5c1bdd2440ef2 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Sun, 11 Jan 2026 22:30:57 -0700 Subject: [PATCH 1/7] fix(core): add dispose functions to prevent subscription memory leaks - Add dispose() to Share, ShareNext, Plugin, and Format namespaces - Add cleanupSession() and dispose() to ACP Agent with AbortControllers - Add Bus._getSubscriptionCount() test helpers - Add memory tests to verify cleanup works correctly Supersedes #7032 Fixes #3013 --- packages/opencode/src/acp/agent.ts | 526 ++++++++++-------- packages/opencode/src/bus/index.ts | 15 + packages/opencode/src/format/index.ts | 16 +- packages/opencode/src/plugin/index.ts | 16 +- packages/opencode/src/share/share-next.ts | 47 +- packages/opencode/src/share/share.ts | 30 +- .../opencode/test/memory/acp-cleanup.test.ts | 164 ++++++ .../test/memory/leak-comparison.test.ts | 162 ++++++ packages/opencode/test/memory/profile.ts | 339 +++++++++++ .../test/memory/subscription-cleanup.test.ts | 284 ++++++++++ 10 files changed, 1349 insertions(+), 250 deletions(-) create mode 100644 packages/opencode/test/memory/acp-cleanup.test.ts create mode 100644 packages/opencode/test/memory/leak-comparison.test.ts create mode 100644 packages/opencode/test/memory/profile.ts create mode 100644 packages/opencode/test/memory/subscription-cleanup.test.ts diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index ebd65bb26da..635ee192399 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -48,303 +48,337 @@ export namespace ACP { private config: ACPConfig private sdk: OpencodeClient private sessionManager + private sessionAbortControllers = new Map() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) + + // Cleanup all subscriptions when connection closes + this.connection.closed.then(() => { + log.info("connection closed, disposing agent") + this.dispose() + }) + } + + private cleanupSession(sessionId: string) { + const controller = this.sessionAbortControllers.get(sessionId) + if (controller) { + controller.abort() + this.sessionAbortControllers.delete(sessionId) + log.info("cleaned up event subscription", { sessionId }) + } } private setupEventSubscriptions(session: ACPSessionState) { const sessionId = session.id const directory = session.cwd + // Cleanup any existing subscription for this session + this.cleanupSession(sessionId) + + // Create abort controller for this session's event subscription + const controller = new AbortController() + this.sessionAbortControllers.set(sessionId, controller) + const options: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { - for await (const event of events.stream) { - switch (event.type) { - case "permission.asked": - try { - const permission = event.properties - const res = await this.connection - .requestPermission({ - sessionId, - toolCall: { - toolCallId: permission.tool?.callID ?? permission.id, - status: "pending", - title: permission.permission, - rawInput: permission.metadata, - kind: toToolKind(permission.permission), - locations: toLocations(permission.permission, permission.metadata), - }, - options, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, + this.config.sdk.event + .subscribe({ directory }) + .then(async (events) => { + for await (const event of events.stream) { + // Check if subscription was aborted + if (controller.signal.aborted) { + log.info("event subscription aborted", { sessionId }) + break + } + switch (event.type) { + case "permission.asked": + try { + const permission = event.properties + const res = await this.connection + .requestPermission({ + sessionId, + toolCall: { + toolCallId: permission.tool?.callID ?? permission.id, + status: "pending", + title: permission.permission, + rawInput: permission.metadata, + kind: toToolKind(permission.permission), + locations: toLocations(permission.permission, permission.metadata), + }, + options, + }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + await this.config.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }) + return }) + if (!res) return + if (res.outcome.outcome !== "selected") { await this.config.sdk.permission.reply({ requestID: permission.id, reply: "reject", directory, }) return - }) - if (!res) return - if (res.outcome.outcome !== "selected") { + } + if (res.outcome.optionId !== "reject" && permission.permission == "edit") { + const metadata = permission.metadata || {} + const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : "" + const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : "" + + const content = await Bun.file(filepath).text() + const newContent = getNewContent(content, diff) + + if (newContent) { + this.connection.writeTextFile({ + sessionId: sessionId, + path: filepath, + content: newContent, + }) + } + } await this.config.sdk.permission.reply({ requestID: permission.id, - reply: "reject", + reply: res.outcome.optionId as "once" | "always" | "reject", directory, }) - return - } - if (res.outcome.optionId !== "reject" && permission.permission == "edit") { - const metadata = permission.metadata || {} - const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : "" - const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : "" - - const content = await Bun.file(filepath).text() - const newContent = getNewContent(content, diff) - - if (newContent) { - this.connection.writeTextFile({ - sessionId: sessionId, - path: filepath, - content: newContent, - }) - } + } catch (err) { + log.error("unexpected error when handling permission", { error: err }) + } finally { + break } - await this.config.sdk.permission.reply({ - requestID: permission.id, - reply: res.outcome.optionId as "once" | "always" | "reject", - directory, - }) - } catch (err) { - log.error("unexpected error when handling permission", { error: err }) - } finally { - break - } - case "message.part.updated": - log.info("message part updated", { event: event.properties }) - try { - const props = event.properties - const { part } = props - - const message = await this.config.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined - }) + case "message.part.updated": + log.info("message part updated", { event: event.properties }) + try { + const props = event.properties + const { part } = props - if (!message || message.info.role !== "assistant") return + const message = await this.config.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((err) => { + log.error("unexpected error when fetching message", { error: err }) + return undefined + }) - if (part.type === "tool") { - switch (part.state.status) { - case "pending": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((err) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - break - case "running": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - break - case "completed": - const kind = toToolKind(part.tool) - const content: ToolCallContent[] = [ - { - type: "content", - content: { - type: "text", - text: part.state.output, + if (!message || message.info.role !== "assistant") return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, + }, + }) + .catch((err) => { + log.error("failed to send tool pending to ACP", { error: err }) + }) + break + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + kind: toToolKind(part.tool), + title: part.tool, + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((err) => { + log.error("failed to send tool in_progress to ACP", { error: err }) + }) + break + case "completed": + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, + }, }, - }, - ] - - if (kind === "edit") { - const input = part.state.input - const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" - const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" - const newText = - typeof input["newString"] === "string" - ? input["newString"] - : typeof input["content"] === "string" - ? input["content"] - : "" - content.push({ - type: "diff", - path: filePath, - oldText, - newText, - }) - } - - if (part.tool === "todowrite") { - const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) - if (parsedTodos.success) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.data.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) - } else { - log.error("failed to parse todo output", { error: parsedTodos.error }) + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, + }) + } + + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), + }, + }) + .catch((err) => { + log.error("failed to send session update for todo", { error: err }) + }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) + } } - } + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawInput: part.state.input, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool completed to ACP", { error: err }) + }) + break + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + kind: toToolKind(part.tool), + title: part.tool, + rawInput: part.state.input, + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, + }, + }, + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool error to ACP", { error: err }) + }) + break + } + } else if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawInput: part.state.input, - rawOutput: { - output: part.state.output, - metadata: part.state.metadata, + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) + log.error("failed to send text to ACP", { error: err }) }) - break - case "error": + } + } else if (part.type === "reasoning") { + const delta = props.delta + if (delta) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, - }, - ], - rawOutput: { - error: part.state.error, + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) + log.error("failed to send reasoning to ACP", { error: err }) }) - break - } - } else if (part.type === "text") { - const delta = props.delta - if (delta && part.synthetic !== true) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) - }) - } - } else if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) + } } + } finally { + break } - } finally { - break - } + } } - } - }) + }) + .finally(() => { + // Cleanup controller reference when stream ends + this.sessionAbortControllers.delete(sessionId) + }) } async initialize(params: InitializeRequest): Promise { @@ -951,6 +985,8 @@ export namespace ACP { async cancel(params: CancelNotification) { const session = this.sessionManager.get(params.sessionId) + // Cleanup event subscription for this session + this.cleanupSession(params.sessionId) await this.config.sdk.session.abort( { sessionID: params.sessionId, @@ -959,6 +995,14 @@ export namespace ACP { { throwOnError: true }, ) } + + dispose() { + // Cleanup all session event subscriptions + for (const sessionId of this.sessionAbortControllers.keys()) { + this.cleanupSession(sessionId) + } + log.info("disposed all event subscriptions") + } } function toToolKind(toolName: string): ToolKind { diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..26e220fe3e3 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -102,4 +102,19 @@ export namespace Bus { match.splice(index, 1) } } + + /** @internal Test helper to get subscription count for a specific event type */ + export function _getSubscriptionCount(type: string): number { + const match = state().subscriptions.get(type) + return match?.length ?? 0 + } + + /** @internal Test helper to get total subscription count across all event types */ + export function _getTotalSubscriptionCount(): number { + let total = 0 + for (const subs of state().subscriptions.values()) { + total += subs.length + } + return total + } } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index bab758030b9..539a2be36fb 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -100,9 +100,14 @@ export namespace Format { return result } + // Store unsubscribe functions for cleanup + const unsubscribers: Array<() => void> = [] + export function init() { log.info("init") - Bus.subscribe(File.Event.Edited, async (payload) => { + // Clean up any existing subscriptions before adding new ones + dispose() + const unsub = Bus.subscribe(File.Event.Edited, async (payload) => { const file = payload.properties.file log.info("formatting", { file }) const ext = path.extname(file) @@ -133,5 +138,14 @@ export namespace Format { } } }) + unsubscribers.push(unsub) + } + + export function dispose() { + for (const unsub of unsubscribers) { + unsub() + } + unsubscribers.length = 0 + log.info("disposed format subscriptions") } } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index f57b46a3521..51952f27c3d 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -99,14 +99,19 @@ export namespace Plugin { return state().then((x) => x.hooks) } + // Store unsubscribe functions for cleanup + const unsubscribers: Array<() => void> = [] + export async function init() { + // Clean up any existing subscriptions before adding new ones + dispose() const hooks = await state().then((x) => x.hooks) const config = await Config.get() for (const hook of hooks) { // @ts-expect-error this is because we haven't moved plugin to sdk v2 await hook.config?.(config) } - Bus.subscribeAll(async (input) => { + const unsub = Bus.subscribeAll(async (input) => { const hooks = await state().then((x) => x.hooks) for (const hook of hooks) { hook["event"]?.({ @@ -114,5 +119,14 @@ export namespace Plugin { }) } }) + unsubscribers.push(unsub) + } + + export function dispose() { + for (const unsub of unsubscribers) { + unsub() + } + unsubscribers.length = 0 + log.info("disposed plugin subscriptions") } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 95271f8c827..35954903f7c 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -10,13 +10,22 @@ import type * as SDK from "@opencode-ai/sdk/v2" export namespace ShareNext { const log = Log.create({ service: "share-next" }) + let disposed = false + + // Store unsubscribe functions for cleanup + const unsubscribers: Array<() => void> = [] async function url() { return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") } export async function init() { - Bus.subscribe(Session.Event.Updated, async (evt) => { + // Clean up any existing subscriptions before adding new ones + dispose() + disposed = false + + const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { + if (disposed) return await sync(evt.properties.info.id, [ { type: "session", @@ -24,7 +33,8 @@ export namespace ShareNext { }, ]) }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + if (disposed) return await sync(evt.properties.info.sessionID, [ { type: "message", @@ -44,7 +54,8 @@ export namespace ShareNext { ]) } }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + if (disposed) return await sync(evt.properties.part.sessionID, [ { type: "part", @@ -52,7 +63,8 @@ export namespace ShareNext { }, ]) }) - Bus.subscribe(Session.Event.Diff, async (evt) => { + const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => { + if (disposed) return await sync(evt.properties.sessionID, [ { type: "session_diff", @@ -60,6 +72,21 @@ export namespace ShareNext { }, ]) }) + unsubscribers.push(unsub1, unsub2, unsub3, unsub4) + } + + export function dispose() { + disposed = true + for (const unsub of unsubscribers) { + unsub() + } + unsubscribers.length = 0 + // Clear pending timeouts + for (const entry of queue.values()) { + clearTimeout(entry.timeout) + } + queue.clear() + log.info("disposed share-next subscriptions") } export async function create(sessionID: string) { @@ -191,4 +218,16 @@ export namespace ShareNext { }, ]) } + + /** @internal Test helper to get queue size */ + export function _getQueueSize(): number { + return queue.size + } + + /** @internal Test helper to add items to queue for testing dispose cleanup */ + export function _addToQueueForTesting(sessionID: string) { + const dataMap = new Map() + const timeout = setTimeout(() => {}, 10000) + queue.set(sessionID, { timeout, data: dataMap }) + } } diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 1006b23d556..ff01be5ef24 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -9,8 +9,14 @@ export namespace Share { let queue: Promise = Promise.resolve() const pending = new Map() + let disposed = false + + // Store unsubscribe functions for cleanup + const unsubscribers: Array<() => void> = [] export async function sync(key: string, content: any) { + // Skip if disposed + if (disposed) return const [root, ...splits] = key.split("/") if (root !== "session") return const [sub, sessionID] = splits @@ -21,6 +27,8 @@ export namespace Share { pending.set(key, content) queue = queue .then(async () => { + // Check if disposed before processing + if (disposed) return const content = pending.get(key) if (content === undefined) return pending.delete(key) @@ -46,13 +54,17 @@ export namespace Share { } export function init() { - Bus.subscribe(Session.Event.Updated, async (evt) => { + // Clean up any existing subscriptions before adding new ones + dispose() + disposed = false + + const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { await sync("session/info/" + evt.properties.info.id, evt.properties.info) }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { await sync( "session/part/" + evt.properties.part.sessionID + @@ -63,6 +75,18 @@ export namespace Share { evt.properties.part, ) }) + unsubscribers.push(unsub1, unsub2, unsub3) + } + + export function dispose() { + disposed = true + for (const unsub of unsubscribers) { + unsub() + } + unsubscribers.length = 0 + pending.clear() + queue = Promise.resolve() + log.info("disposed share subscriptions") } export const URL = diff --git a/packages/opencode/test/memory/acp-cleanup.test.ts b/packages/opencode/test/memory/acp-cleanup.test.ts new file mode 100644 index 00000000000..3d976c850e9 --- /dev/null +++ b/packages/opencode/test/memory/acp-cleanup.test.ts @@ -0,0 +1,164 @@ +import { test, expect, describe } from "bun:test" +import { ACP } from "../../src/acp/agent" + +/** + * Tests for ACP Agent session cleanup. + * Verifies that session event subscriptions are properly cleaned up. + */ + +describe("ACP.Agent session cleanup", () => { + test("cleanupSession removes abort controller", () => { + // Create a mock connection and config + const mockConnection = { + requestPermission: async () => ({ outcome: { outcome: "selected", optionId: "once" } }), + sessionUpdate: async () => {}, + closed: new Promise(() => {}), // Never resolves during test + } + + const mockConfig = { + sdk: { + event: { + subscribe: async () => ({ + stream: (async function* () { + // Empty stream + })(), + }), + }, + permission: { + reply: async () => {}, + }, + session: { + message: async () => ({ data: null }), + abort: async () => {}, + }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // Access private map for testing + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Simulate adding a controller + const controller = new AbortController() + controllers.set("test-session-1", controller) + + expect(controllers.size).toBe(1) + expect(controller.signal.aborted).toBe(false) + + // Call cleanup + // @ts-expect-error - accessing private for testing + agent.cleanupSession("test-session-1") + + expect(controllers.size).toBe(0) + expect(controller.signal.aborted).toBe(true) + }) + + test("dispose cleans up all session controllers", () => { + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } + const mockConfig = { + sdk: { + event: { subscribe: async () => ({ stream: (async function* () {})() }) }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Add multiple controllers + const controller1 = new AbortController() + const controller2 = new AbortController() + const controller3 = new AbortController() + + controllers.set("session-1", controller1) + controllers.set("session-2", controller2) + controllers.set("session-3", controller3) + + expect(controllers.size).toBe(3) + + // Dispose all + agent.dispose() + + expect(controllers.size).toBe(0) + expect(controller1.signal.aborted).toBe(true) + expect(controller2.signal.aborted).toBe(true) + expect(controller3.signal.aborted).toBe(true) + }) + + test("setupEventSubscriptions replaces existing subscription for same session", async () => { + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } + const subscribeCallCount = { count: 0 } + // Track active generators so we can verify they terminate + const activeGenerators = new Set() + const mockConfig = { + sdk: { + event: { + subscribe: async () => { + subscribeCallCount.count++ + // Create a signal to control this generator's lifecycle + const genController = new AbortController() + activeGenerators.add(genController) + return { + stream: (async function* () { + // Use finite loop with abort check to prevent background runaway + for (let i = 0; i < 100 && !genController.signal.aborted; i++) { + await new Promise((r) => setTimeout(r, 100)) + if (genController.signal.aborted) break + yield { type: "test" } + } + activeGenerators.delete(genController) + })(), + // Expose abort to allow cleanup + abort: () => genController.abort(), + } + }, + }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Manually add an existing controller to simulate an existing subscription + const existingController = new AbortController() + controllers.set("session-1", existingController) + + // Setup event subscriptions for the same session + const mockSession = { id: "session-1", cwd: "/test" } + // @ts-expect-error - accessing private for testing + agent.setupEventSubscriptions(mockSession) + + // The existing controller should be aborted + expect(existingController.signal.aborted).toBe(true) + + // A new controller should exist + expect(controllers.has("session-1")).toBe(true) + const newController = controllers.get("session-1") + expect(newController).not.toBe(existingController) + expect(newController?.signal.aborted).toBe(false) + + // Cleanup - dispose the agent and abort all active generators + agent.dispose() + for (const genController of activeGenerators) { + genController.abort() + } + // Give generators time to exit cleanly + await new Promise((r) => setTimeout(r, 50)) + }) +}) diff --git a/packages/opencode/test/memory/leak-comparison.test.ts b/packages/opencode/test/memory/leak-comparison.test.ts new file mode 100644 index 00000000000..b3cee589957 --- /dev/null +++ b/packages/opencode/test/memory/leak-comparison.test.ts @@ -0,0 +1,162 @@ +import { describe, it, expect, beforeAll } from "bun:test" +import { Log } from "../../src/util/log" +import { Instance } from "../../src/project/instance" +import { Bus } from "../../src/bus" +import { Session } from "../../src/session" +import path from "path" +import os from "os" +import fs from "fs/promises" + +Log.init({ print: false, dev: false, level: "ERROR" }) + +let testDir: string + +beforeAll(async () => { + testDir = path.join(os.tmpdir(), `opencode-leak-test-${Date.now()}`) + await fs.mkdir(testDir, { recursive: true }) + await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) +}) + +describe("Memory Leak Comparison", () => { + it("WITHOUT dispose: subscriptions accumulate (demonstrates the leak)", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + // Simulate the OLD behavior - calling init() without dispose() + const subscriptions: Array<() => void> = [] + + const before = Bus._getTotalSubscriptionCount() + + // Simulate 100 "init" cycles where subscribe is called but never unsubscribed + for (let i = 0; i < 100; i++) { + const unsub = Bus.subscribe(Session.Event.Updated, () => {}) + subscriptions.push(unsub) // We track them but don't call them - simulating the bug + } + + const after = Bus._getTotalSubscriptionCount() + + // Subscriptions accumulated! + expect(after - before).toBe(100) + console.log(` Leaked subscriptions: ${after - before}`) + + // Cleanup for this test + for (const unsub of subscriptions) unsub() + }, + }) + }) + + it("WITH dispose: subscriptions stay at zero (fix works)", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const before = Bus._getTotalSubscriptionCount() + + // Simulate 100 init/dispose cycles with proper cleanup + for (let i = 0; i < 100; i++) { + const unsub = Bus.subscribe(Session.Event.Updated, () => {}) + unsub() // Proper cleanup - simulating the fix + } + + const after = Bus._getTotalSubscriptionCount() + + // No accumulation! + expect(after).toBe(before) + console.log(` Subscription delta: ${after - before}`) + }, + }) + }) + + it("Share: init/dispose cycles keep subscription count stable", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const before = Bus._getTotalSubscriptionCount() + + // Run 50 init/dispose cycles + for (let i = 0; i < 50; i++) { + Share.init() + Share.dispose() + } + + const after = Bus._getTotalSubscriptionCount() + + expect(after).toBe(before) + console.log(` Share cycles: 50, subscription delta: ${after - before}`) + }, + }) + }) + + it("ShareNext: init/dispose cycles keep subscription count stable", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + const before = Bus._getTotalSubscriptionCount() + + // Run 50 init/dispose cycles + for (let i = 0; i < 50; i++) { + await ShareNext.init() + ShareNext.dispose() + } + + const after = Bus._getTotalSubscriptionCount() + + expect(after).toBe(before) + console.log(` ShareNext cycles: 50, subscription delta: ${after - before}`) + }, + }) + }) + + it("Format: init/dispose cycles keep subscription count stable", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Format } = await import("../../src/format") + + const before = Bus._getTotalSubscriptionCount() + + // Run 50 init/dispose cycles + for (let i = 0; i < 50; i++) { + Format.init() + Format.dispose() + } + + const after = Bus._getTotalSubscriptionCount() + + expect(after).toBe(before) + console.log(` Format cycles: 50, subscription delta: ${after - before}`) + }, + }) + }) + + it("demonstrates heap memory stays flat with dispose()", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + // Take baseline + const baseline = process.memoryUsage().heapUsed + + // Run 500 cycles + for (let i = 0; i < 500; i++) { + await ShareNext.init() + ShareNext.dispose() + } + + // Check heap didn't grow significantly (allow 2MB variance) + const after = process.memoryUsage().heapUsed + const growth = after - baseline + const growthMB = growth / 1024 / 1024 + + console.log(` Heap growth after 500 cycles: ${growthMB.toFixed(2)} MB`) + + // Should be minimal growth (< 2MB for 500 cycles) + expect(Math.abs(growthMB)).toBeLessThan(2) + }, + }) + }) +}) diff --git a/packages/opencode/test/memory/profile.ts b/packages/opencode/test/memory/profile.ts new file mode 100644 index 00000000000..1797a35ad70 --- /dev/null +++ b/packages/opencode/test/memory/profile.ts @@ -0,0 +1,339 @@ +#!/usr/bin/env bun +/** + * Memory Profiling Script for OpenCode + * + * This script simulates subscription lifecycle patterns and monitors memory usage + * to verify that memory leaks have been fixed. + * + * Usage: + * bun run test/memory/profile.ts + * + * For heap snapshots (requires --expose-gc flag): + * bun --expose-gc run test/memory/profile.ts + */ + +import { Log } from "../../src/util/log" +import { Instance } from "../../src/project/instance" +import path from "path" +import os from "os" +import fs from "fs/promises" + +Log.init({ print: false, dev: false, level: "ERROR" }) + +// Create a temp directory for Instance.provide +let testDir: string + +async function setupTestDir() { + testDir = path.join(os.tmpdir(), `opencode-profile-${Date.now()}`) + await fs.mkdir(testDir, { recursive: true }) + await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) +} + +async function cleanupTestDir() { + if (testDir) { + await fs.rm(testDir, { recursive: true, force: true }).catch(() => {}) + } +} + +interface MemorySnapshot { + label: string + heapUsed: number + heapTotal: number + external: number + rss: number + timestamp: number +} + +function takeSnapshot(label: string): MemorySnapshot { + const mem = process.memoryUsage() + return { + label, + heapUsed: mem.heapUsed, + heapTotal: mem.heapTotal, + external: mem.external, + rss: mem.rss, + timestamp: Date.now(), + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B` + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(2)} KB` + return `${(bytes / 1024 / 1024).toFixed(2)} MB` +} + +function printSnapshot(snapshot: MemorySnapshot) { + console.log(`[${snapshot.label}]`) + console.log(` Heap Used: ${formatBytes(snapshot.heapUsed)}`) + console.log(` Heap Total: ${formatBytes(snapshot.heapTotal)}`) + console.log(` RSS: ${formatBytes(snapshot.rss)}`) +} + +function compareSnapshots(before: MemorySnapshot, after: MemorySnapshot) { + const heapDiff = after.heapUsed - before.heapUsed + const rssDiff = after.rss - before.rss + console.log(`\n[Delta: ${before.label} -> ${after.label}]`) + console.log(` Heap Used: ${heapDiff >= 0 ? "+" : ""}${formatBytes(heapDiff)}`) + console.log(` RSS: ${rssDiff >= 0 ? "+" : ""}${formatBytes(rssDiff)}`) + return { heapDiff, rssDiff } +} + +function forceGC() { + if (global.gc) { + global.gc() + console.log(" (GC forced)") + } +} + +async function sleep(ms: number) { + await new Promise((r) => setTimeout(r, ms)) +} + +// ============================================================================ +// Test Scenarios +// ============================================================================ + +async function testShareSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: Share subscription init/dispose cycles") + console.log("=".repeat(60)) + + return Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Share.init() + Share.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + // Check for leaks (should be less than 1MB growth for 1000 cycles) + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) +} + +async function testShareNextSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: ShareNext subscription init/dispose cycles") + console.log("=".repeat(60)) + + return Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + await ShareNext.init() + ShareNext.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) +} + +async function testFormatSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: Format subscription init/dispose cycles") + console.log("=".repeat(60)) + + return Instance.provide({ + directory: testDir, + fn: async () => { + const { Format } = await import("../../src/format") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Format.init() + Format.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) +} + +async function testACPControllerCleanup() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: ACP Agent controller cleanup") + console.log("=".repeat(60)) + + const { ACP } = await import("../../src/acp/agent") + + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } + const mockConfig = { + sdk: { + event: { subscribe: async () => ({ stream: (async function* () {})() }) }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nCreating and cleaning up ${iterations} sessions...`) + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + for (let i = 0; i < iterations; i++) { + // Simulate session creation with abort controller + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + controllers.set(`session-${i}`, new AbortController()) + + // Clean it up + // @ts-expect-error - accessing private for testing + agent.cleanupSession(`session-${i}`) + + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} sessions cleaned\r`) + } + } + console.log(` ${iterations} sessions cleaned`) + + agent.dispose() + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after controller cleanup cycles") + return true +} + +// ============================================================================ +// Main +// ============================================================================ + +async function main() { + console.log("OpenCode Memory Profiling") + console.log("=".repeat(60)) + console.log(`GC Available: ${global.gc ? "Yes" : "No (run with --expose-gc for accurate results)"}`) + console.log(`Platform: ${process.platform}`) + console.log(`Bun Version: ${Bun.version}`) + + // Setup test directory for Instance.provide + await setupTestDir() + + const results: boolean[] = [] + + try { + results.push(await testShareSubscriptionCycles()) + results.push(await testShareNextSubscriptionCycles()) + results.push(await testFormatSubscriptionCycles()) + results.push(await testACPControllerCleanup()) + } catch (err) { + console.error("\nError during profiling:", err) + await cleanupTestDir() + process.exit(1) + } + + // Cleanup test directory + await cleanupTestDir() + + console.log("\n" + "=".repeat(60)) + console.log("SUMMARY") + console.log("=".repeat(60)) + + const passed = results.filter(Boolean).length + const total = results.length + + console.log(`Tests passed: ${passed}/${total}`) + + if (passed === total) { + console.log("\n✅ All memory tests passed!") + process.exit(0) + } else { + console.log("\n❌ Some memory tests failed!") + process.exit(1) + } +} + +main() diff --git a/packages/opencode/test/memory/subscription-cleanup.test.ts b/packages/opencode/test/memory/subscription-cleanup.test.ts new file mode 100644 index 00000000000..4dac834bef3 --- /dev/null +++ b/packages/opencode/test/memory/subscription-cleanup.test.ts @@ -0,0 +1,284 @@ +import { test, expect, describe, beforeAll, afterAll } from "bun:test" +import { Bus } from "../../src/bus" +import { Session } from "../../src/session" +import { MessageV2 } from "../../src/session/message-v2" +import { File } from "../../src/file" +import { Instance } from "../../src/project/instance" +import path from "path" +import os from "os" +import fs from "fs/promises" + +/** + * Tests to verify that subscription cleanup functions work correctly. + * These tests verify that dispose() functions properly unsubscribe from the Bus, + * preventing memory leaks from accumulated event handlers. + */ + +let testDir: string + +beforeAll(async () => { + // Create a temp directory for the test instance + testDir = path.join(os.tmpdir(), `opencode-memory-test-${Date.now()}`) + await fs.mkdir(testDir, { recursive: true }) + await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) +}) + +afterAll(async () => { + // Clean up the temp directory to prevent test artifacts from accumulating + if (testDir) { + await fs.rm(testDir, { recursive: true, force: true }).catch(() => {}) + } +}) + +describe("subscription cleanup", () => { + describe("Share.dispose()", () => { + test("should unsubscribe from all events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const beforeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const beforeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const beforePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + Share.init() + + const afterInitSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterInitMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterInitPart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + // Verify subscriptions were added + expect(afterInitSession).toBe(beforeSession + 1) + expect(afterInitMessage).toBe(beforeMessage + 1) + expect(afterInitPart).toBe(beforePart + 1) + + Share.dispose() + + const afterDisposeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterDisposeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterDisposePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + // Verify subscriptions were removed + expect(afterDisposeSession).toBe(beforeSession) + expect(afterDisposeMessage).toBe(beforeMessage) + expect(afterDisposePart).toBe(beforePart) + }, + }) + }) + + test("multiple init/dispose cycles maintain correct count", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const baseline = Bus._getTotalSubscriptionCount() + + for (let i = 0; i < 10; i++) { + Share.init() + Share.dispose() + } + + const afterCycles = Bus._getTotalSubscriptionCount() + expect(afterCycles).toBe(baseline) + }, + }) + }) + }) + + describe("ShareNext.dispose()", () => { + test("should unsubscribe from all events and clear queue", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + const beforeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const beforeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const beforePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const beforeDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + await ShareNext.init() + + const afterInitSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterInitMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterInitPart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const afterInitDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + expect(afterInitSession).toBe(beforeSession + 1) + expect(afterInitMessage).toBe(beforeMessage + 1) + expect(afterInitPart).toBe(beforePart + 1) + expect(afterInitDiff).toBe(beforeDiff + 1) + + ShareNext.dispose() + + const afterDisposeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterDisposeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterDisposePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const afterDisposeDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + expect(afterDisposeSession).toBe(beforeSession) + expect(afterDisposeMessage).toBe(beforeMessage) + expect(afterDisposePart).toBe(beforePart) + expect(afterDisposeDiff).toBe(beforeDiff) + + // Verify queue is cleared + expect(ShareNext._getQueueSize()).toBe(0) + }, + }) + }) + + test("multiple init calls should not accumulate subscriptions", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + const baseline = Bus._getTotalSubscriptionCount() + + // Call init multiple times without dispose - should not accumulate + // because init() now calls dispose() at the start + await ShareNext.init() + const afterFirstInit = Bus._getTotalSubscriptionCount() + + await ShareNext.init() + const afterSecondInit = Bus._getTotalSubscriptionCount() + + await ShareNext.init() + const afterThirdInit = Bus._getTotalSubscriptionCount() + + // All counts should be the same - no accumulation + expect(afterSecondInit).toBe(afterFirstInit) + expect(afterThirdInit).toBe(afterFirstInit) + + ShareNext.dispose() + expect(Bus._getTotalSubscriptionCount()).toBe(baseline) + }, + }) + }) + + test("dispose should clear pending queue items and their timeouts", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + await ShareNext.init() + + // Verify queue starts empty + expect(ShareNext._getQueueSize()).toBe(0) + + // Add items to the queue using the test helper + ShareNext._addToQueueForTesting("session-1") + ShareNext._addToQueueForTesting("session-2") + ShareNext._addToQueueForTesting("session-3") + + // Verify items were added + expect(ShareNext._getQueueSize()).toBe(3) + + // dispose() should clear all queue items and their timeouts + ShareNext.dispose() + + // Verify queue is cleared + expect(ShareNext._getQueueSize()).toBe(0) + }, + }) + }) + }) + + describe("Format.dispose()", () => { + test("should unsubscribe from file edit events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Format } = await import("../../src/format") + + const before = Bus._getSubscriptionCount(File.Event.Edited.type) + + Format.init() + + const afterInit = Bus._getSubscriptionCount(File.Event.Edited.type) + expect(afterInit).toBe(before + 1) + + Format.dispose() + + const afterDispose = Bus._getSubscriptionCount(File.Event.Edited.type) + expect(afterDispose).toBe(before) + }, + }) + }) + }) + + describe("Plugin.dispose()", () => { + test("should unsubscribe from wildcard events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + // Test the dispose pattern directly without Plugin.init() which is slow + // Plugin.init() loads npm packages which can timeout in tests + const before = Bus._getSubscriptionCount("*") + + // Simulate what Plugin.init() does - subscribe to all events + const unsub = Bus.subscribeAll(() => {}) + const afterSubscribe = Bus._getSubscriptionCount("*") + expect(afterSubscribe).toBe(before + 1) + + // Unsubscribe like dispose() would + unsub() + const afterUnsubscribe = Bus._getSubscriptionCount("*") + expect(afterUnsubscribe).toBe(before) + }, + }) + }) + }) +}) + +describe("memory stability", () => { + test("repeated init/dispose cycles should not leak subscriptions", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const baseline = Bus._getTotalSubscriptionCount() + const iterations = 100 + + for (let i = 0; i < iterations; i++) { + Share.init() + Share.dispose() + } + + const afterCycles = Bus._getTotalSubscriptionCount() + expect(afterCycles).toBe(baseline) + }, + }) + }) + + test("init without dispose should accumulate subscriptions (verifies test validity)", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + // This test verifies that without dispose(), subscriptions DO accumulate + // This proves our dispose() tests are meaningful + const before = Bus._getTotalSubscriptionCount() + + // Manually subscribe without disposing + const unsub1 = Bus.subscribe(Session.Event.Updated, () => {}) + const unsub2 = Bus.subscribe(Session.Event.Updated, () => {}) + const unsub3 = Bus.subscribe(Session.Event.Updated, () => {}) + + const afterSubscribe = Bus._getTotalSubscriptionCount() + expect(afterSubscribe).toBe(before + 3) + + // Clean up manually + unsub1() + unsub2() + unsub3() + + const afterUnsubscribe = Bus._getTotalSubscriptionCount() + expect(afterUnsubscribe).toBe(before) + }, + }) + }) +}) From 3fb83f72094042662322b23e8a42c43e4c1629b7 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Sun, 11 Jan 2026 22:42:52 -0700 Subject: [PATCH 2/7] fix: address Copilot review feedback - Add multiple disposed checks in Share.sync() after async boundaries - Use splice(0) instead of length = 0 for clearer array clearing - Reduce test helper timeout from 10s to 100ms - Use Array.from() for iteration safety in Bus test helpers --- packages/opencode/src/bus/index.ts | 3 ++- packages/opencode/src/plugin/index.ts | 2 +- packages/opencode/src/share/share-next.ts | 5 +++-- packages/opencode/src/share/share.ts | 10 +++++++--- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 26e220fe3e3..6b69c6a5077 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -112,7 +112,8 @@ export namespace Bus { /** @internal Test helper to get total subscription count across all event types */ export function _getTotalSubscriptionCount(): number { let total = 0 - for (const subs of state().subscriptions.values()) { + // Use Array.from to snapshot values in case of concurrent modification + for (const subs of Array.from(state().subscriptions.values())) { total += subs.length } return total diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 51952f27c3d..499f7ae0af7 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -126,7 +126,7 @@ export namespace Plugin { for (const unsub of unsubscribers) { unsub() } - unsubscribers.length = 0 + unsubscribers.splice(0) log.info("disposed plugin subscriptions") } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 35954903f7c..a0cf49ca400 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -80,7 +80,7 @@ export namespace ShareNext { for (const unsub of unsubscribers) { unsub() } - unsubscribers.length = 0 + unsubscribers.splice(0) // Clear pending timeouts for (const entry of queue.values()) { clearTimeout(entry.timeout) @@ -227,7 +227,8 @@ export namespace ShareNext { /** @internal Test helper to add items to queue for testing dispose cleanup */ export function _addToQueueForTesting(sessionID: string) { const dataMap = new Map() - const timeout = setTimeout(() => {}, 10000) + // Use short timeout for tests - this is a no-op callback that won't cause issues if it fires + const timeout = setTimeout(() => {}, 100) queue.set(sessionID, { timeout, data: dataMap }) } } diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index ff01be5ef24..13d88b7512e 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -15,7 +15,7 @@ export namespace Share { const unsubscribers: Array<() => void> = [] export async function sync(key: string, content: any) { - // Skip if disposed + // Skip if disposed - check at entry point if (disposed) return const [root, ...splits] = key.split("/") if (root !== "session") return @@ -23,15 +23,19 @@ export namespace Share { if (sub === "share") return const share = await Session.getShare(sessionID).catch(() => {}) if (!share) return + // Re-check disposed after async operation + if (disposed) return const { secret } = share pending.set(key, content) queue = queue .then(async () => { - // Check if disposed before processing + // Check disposed at start of queued operation if (disposed) return const content = pending.get(key) if (content === undefined) return pending.delete(key) + // Final check before network request + if (disposed) return return fetch(`${URL}/share_sync`, { method: "POST", @@ -83,7 +87,7 @@ export namespace Share { for (const unsub of unsubscribers) { unsub() } - unsubscribers.length = 0 + unsubscribers.splice(0) pending.clear() queue = Promise.resolve() log.info("disposed share subscriptions") From 520ca09ce6d2aa4dce8bd0653f33136a1be1fa01 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Sun, 11 Jan 2026 23:03:18 -0700 Subject: [PATCH 3/7] fix: address round 2 Copilot review feedback - Add try-catch around unsubscribe calls to ensure cleanup completes even if one fails - Use splice(0) pattern consistently before iteration for safe array clearing - Rename cleanupSession to cleanupSessionEventSubscription for clarity - Add try-finally to ACP test for generator cleanup on test failure - Fix Format.dispose() to use splice(0) for consistency with other modules --- packages/opencode/src/acp/agent.ts | 8 ++-- packages/opencode/src/format/index.ts | 10 ++-- packages/opencode/src/plugin/index.ts | 10 ++-- packages/opencode/src/share/share-next.ts | 10 ++-- packages/opencode/src/share/share.ts | 10 ++-- .../opencode/test/memory/acp-cleanup.test.ts | 46 ++++++++++--------- 6 files changed, 56 insertions(+), 38 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 635ee192399..64e9d0c434e 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -63,7 +63,7 @@ export namespace ACP { }) } - private cleanupSession(sessionId: string) { + private cleanupSessionEventSubscription(sessionId: string) { const controller = this.sessionAbortControllers.get(sessionId) if (controller) { controller.abort() @@ -77,7 +77,7 @@ export namespace ACP { const directory = session.cwd // Cleanup any existing subscription for this session - this.cleanupSession(sessionId) + this.cleanupSessionEventSubscription(sessionId) // Create abort controller for this session's event subscription const controller = new AbortController() @@ -986,7 +986,7 @@ export namespace ACP { async cancel(params: CancelNotification) { const session = this.sessionManager.get(params.sessionId) // Cleanup event subscription for this session - this.cleanupSession(params.sessionId) + this.cleanupSessionEventSubscription(params.sessionId) await this.config.sdk.session.abort( { sessionID: params.sessionId, @@ -999,7 +999,7 @@ export namespace ACP { dispose() { // Cleanup all session event subscriptions for (const sessionId of this.sessionAbortControllers.keys()) { - this.cleanupSession(sessionId) + this.cleanupSessionEventSubscription(sessionId) } log.info("disposed all event subscriptions") } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index 539a2be36fb..d026aefdf1c 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -142,10 +142,14 @@ export namespace Format { } export function dispose() { - for (const unsub of unsubscribers) { - unsub() + const toUnsubscribe = unsubscribers.splice(0) + for (const unsub of toUnsubscribe) { + try { + unsub() + } catch (error) { + log.error("failed to unsubscribe format handler", { error }) + } } - unsubscribers.length = 0 log.info("disposed format subscriptions") } } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 499f7ae0af7..b2615e1f360 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -123,10 +123,14 @@ export namespace Plugin { } export function dispose() { - for (const unsub of unsubscribers) { - unsub() + const toUnsubscribe = unsubscribers.splice(0) + for (const unsub of toUnsubscribe) { + try { + unsub() + } catch (error) { + log.error("failed to unsubscribe plugin handler", { error }) + } } - unsubscribers.splice(0) log.info("disposed plugin subscriptions") } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index a0cf49ca400..e99b7048ea9 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -77,10 +77,14 @@ export namespace ShareNext { export function dispose() { disposed = true - for (const unsub of unsubscribers) { - unsub() + const toUnsubscribe = unsubscribers.splice(0) + for (const unsub of toUnsubscribe) { + try { + unsub() + } catch (error) { + log.error("failed to unsubscribe", { error }) + } } - unsubscribers.splice(0) // Clear pending timeouts for (const entry of queue.values()) { clearTimeout(entry.timeout) diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 13d88b7512e..cdda6432e38 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -84,10 +84,14 @@ export namespace Share { export function dispose() { disposed = true - for (const unsub of unsubscribers) { - unsub() + const toUnsubscribe = unsubscribers.splice(0) + for (const unsub of toUnsubscribe) { + try { + unsub() + } catch (error) { + log.error("failed to unsubscribe", { error }) + } } - unsubscribers.splice(0) pending.clear() queue = Promise.resolve() log.info("disposed share subscriptions") diff --git a/packages/opencode/test/memory/acp-cleanup.test.ts b/packages/opencode/test/memory/acp-cleanup.test.ts index 3d976c850e9..4f1cd6503ed 100644 --- a/packages/opencode/test/memory/acp-cleanup.test.ts +++ b/packages/opencode/test/memory/acp-cleanup.test.ts @@ -7,7 +7,7 @@ import { ACP } from "../../src/acp/agent" */ describe("ACP.Agent session cleanup", () => { - test("cleanupSession removes abort controller", () => { + test("cleanupSessionEventSubscription removes abort controller", () => { // Create a mock connection and config const mockConnection = { requestPermission: async () => ({ outcome: { outcome: "selected", optionId: "once" } }), @@ -50,7 +50,7 @@ describe("ACP.Agent session cleanup", () => { // Call cleanup // @ts-expect-error - accessing private for testing - agent.cleanupSession("test-session-1") + agent.cleanupSessionEventSubscription("test-session-1") expect(controllers.size).toBe(0) expect(controller.signal.aborted).toBe(true) @@ -139,26 +139,28 @@ describe("ACP.Agent session cleanup", () => { const existingController = new AbortController() controllers.set("session-1", existingController) - // Setup event subscriptions for the same session - const mockSession = { id: "session-1", cwd: "/test" } - // @ts-expect-error - accessing private for testing - agent.setupEventSubscriptions(mockSession) - - // The existing controller should be aborted - expect(existingController.signal.aborted).toBe(true) - - // A new controller should exist - expect(controllers.has("session-1")).toBe(true) - const newController = controllers.get("session-1") - expect(newController).not.toBe(existingController) - expect(newController?.signal.aborted).toBe(false) - - // Cleanup - dispose the agent and abort all active generators - agent.dispose() - for (const genController of activeGenerators) { - genController.abort() + try { + // Setup event subscriptions for the same session + const mockSession = { id: "session-1", cwd: "/test" } + // @ts-expect-error - accessing private for testing + agent.setupEventSubscriptions(mockSession) + + // The existing controller should be aborted + expect(existingController.signal.aborted).toBe(true) + + // A new controller should exist + expect(controllers.has("session-1")).toBe(true) + const newController = controllers.get("session-1") + expect(newController).not.toBe(existingController) + expect(newController?.signal.aborted).toBe(false) + } finally { + // Cleanup - dispose the agent and abort all active generators + agent.dispose() + for (const genController of activeGenerators) { + genController.abort() + } + // Give generators time to exit cleanly + await new Promise((r) => setTimeout(r, 50)) } - // Give generators time to exit cleanly - await new Promise((r) => setTimeout(r, 50)) }) }) From 663546a770af5502ae62cd750565f6115711fb13 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Sun, 11 Jan 2026 23:31:16 -0700 Subject: [PATCH 4/7] fix: update profile.ts to use renamed method cleanupSessionEventSubscription --- packages/opencode/test/memory/profile.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opencode/test/memory/profile.ts b/packages/opencode/test/memory/profile.ts index 1797a35ad70..244d433e797 100644 --- a/packages/opencode/test/memory/profile.ts +++ b/packages/opencode/test/memory/profile.ts @@ -263,7 +263,7 @@ async function testACPControllerCleanup() { // Clean it up // @ts-expect-error - accessing private for testing - agent.cleanupSession(`session-${i}`) + agent.cleanupSessionEventSubscription(`session-${i}`) if (i % 100 === 0 && i > 0) { process.stdout.write(` ${i} sessions cleaned\r`) From 48a68e6ff781cf24a06a529144fee44efc390fd4 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Sun, 11 Jan 2026 23:33:02 -0700 Subject: [PATCH 5/7] fix: harden share-next dispose to avoid race on queue iteration --- packages/opencode/src/share/share-next.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index e99b7048ea9..2769e25db5d 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -85,11 +85,12 @@ export namespace ShareNext { log.error("failed to unsubscribe", { error }) } } - // Clear pending timeouts - for (const entry of queue.values()) { + // Hardened: snapshot and clear atomically to avoid race during iteration + const pending = Array.from(queue.values()) + queue.clear() + for (const entry of pending) { clearTimeout(entry.timeout) } - queue.clear() log.info("disposed share-next subscriptions") } From d1b5d6e791adaa06fbbc5c5a903252f11e30d198 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Sun, 11 Jan 2026 23:37:44 -0700 Subject: [PATCH 6/7] fix: use generation counter in share-next to prevent cross-init interference --- packages/opencode/src/share/share-next.ts | 40 ++++++++++++++++------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 2769e25db5d..f1f038325f0 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -10,6 +10,8 @@ import type * as SDK from "@opencode-ai/sdk/v2" export namespace ShareNext { const log = Log.create({ service: "share-next" }) + // Generation counter to invalidate in-flight operations from previous init cycles + let generation = 0 let disposed = false // Store unsubscribe functions for cleanup @@ -23,10 +25,12 @@ export namespace ShareNext { // Clean up any existing subscriptions before adding new ones dispose() disposed = false + // Increment generation so in-flight operations from previous cycle are invalidated + const gen = ++generation const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { - if (disposed) return - await sync(evt.properties.info.id, [ + if (disposed || gen !== generation) return + await sync(gen, evt.properties.info.id, [ { type: "session", data: evt.properties.info, @@ -34,15 +38,16 @@ export namespace ShareNext { ]) }) const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - if (disposed) return - await sync(evt.properties.info.sessionID, [ + if (disposed || gen !== generation) return + await sync(gen, evt.properties.info.sessionID, [ { type: "message", data: evt.properties.info, }, ]) + if (gen !== generation) return if (evt.properties.info.role === "user") { - await sync(evt.properties.info.sessionID, [ + await sync(gen, evt.properties.info.sessionID, [ { type: "model", data: [ @@ -55,8 +60,8 @@ export namespace ShareNext { } }) const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - if (disposed) return - await sync(evt.properties.part.sessionID, [ + if (disposed || gen !== generation) return + await sync(gen, evt.properties.part.sessionID, [ { type: "part", data: evt.properties.part, @@ -64,8 +69,8 @@ export namespace ShareNext { ]) }) const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => { - if (disposed) return - await sync(evt.properties.sessionID, [ + if (disposed || gen !== generation) return + await sync(gen, evt.properties.sessionID, [ { type: "session_diff", data: evt.properties.diff, @@ -141,7 +146,10 @@ export namespace ShareNext { } const queue = new Map }>() - async function sync(sessionID: string, data: Data[]) { + async function sync(gen: number, sessionID: string, data: Data[]) { + // Check generation before any work + if (gen !== generation) return + const existing = queue.get(sessionID) if (existing) { for (const item of data) { @@ -156,11 +164,15 @@ export namespace ShareNext { } const timeout = setTimeout(async () => { + // Check generation before processing queued data + if (gen !== generation) return const queued = queue.get(sessionID) if (!queued) return queue.delete(sessionID) const share = await get(sessionID).catch(() => undefined) if (!share) return + // Check generation after async operation + if (gen !== generation) return await fetch(`${await url()}/api/share/${share.id}/sync`, { method: "POST", @@ -193,17 +205,23 @@ export namespace ShareNext { } async function fullSync(sessionID: string) { + // Capture current generation for this sync operation + const gen = generation log.info("full sync", { sessionID }) const session = await Session.get(sessionID) + if (gen !== generation) return const diffs = await Session.diff(sessionID) + if (gen !== generation) return const messages = await Array.fromAsync(MessageV2.stream(sessionID)) + if (gen !== generation) return const models = await Promise.all( messages .filter((m) => m.info.role === "user") .map((m) => (m.info as SDK.UserMessage).model) .map((m) => Provider.getModel(m.providerID, m.modelID).then((m) => m)), ) - await sync(sessionID, [ + if (gen !== generation) return + await sync(gen, sessionID, [ { type: "session", data: session, From 7b271abbe1df4546b144afe1542d963ad4c3b159 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Sun, 11 Jan 2026 23:38:56 -0700 Subject: [PATCH 7/7] fix: use generation counter in share.ts to prevent cross-init interference --- packages/opencode/src/share/share.ts | 29 ++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index cdda6432e38..4d08ba1b963 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -9,33 +9,35 @@ export namespace Share { let queue: Promise = Promise.resolve() const pending = new Map() + // Generation counter to invalidate in-flight operations from previous init cycles + let generation = 0 let disposed = false // Store unsubscribe functions for cleanup const unsubscribers: Array<() => void> = [] - export async function sync(key: string, content: any) { - // Skip if disposed - check at entry point - if (disposed) return + export async function sync(gen: number, key: string, content: any) { + // Skip if disposed or wrong generation + if (disposed || gen !== generation) return const [root, ...splits] = key.split("/") if (root !== "session") return const [sub, sessionID] = splits if (sub === "share") return const share = await Session.getShare(sessionID).catch(() => {}) if (!share) return - // Re-check disposed after async operation - if (disposed) return + // Re-check after async operation + if (disposed || gen !== generation) return const { secret } = share pending.set(key, content) queue = queue .then(async () => { - // Check disposed at start of queued operation - if (disposed) return + // Check at start of queued operation + if (disposed || gen !== generation) return const content = pending.get(key) if (content === undefined) return pending.delete(key) // Final check before network request - if (disposed) return + if (disposed || gen !== generation) return return fetch(`${URL}/share_sync`, { method: "POST", @@ -61,15 +63,22 @@ export namespace Share { // Clean up any existing subscriptions before adding new ones dispose() disposed = false + // Increment generation so in-flight operations from previous cycle are invalidated + const gen = ++generation const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { - await sync("session/info/" + evt.properties.info.id, evt.properties.info) + await sync(gen, "session/info/" + evt.properties.info.id, evt.properties.info) }) const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) + await sync( + gen, + "session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, + evt.properties.info, + ) }) const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { await sync( + gen, "session/part/" + evt.properties.part.sessionID + "/" +