From 6200d3ace1f8b7730ec689a50c4345c59c99402d Mon Sep 17 00:00:00 2001 From: Frank Date: Thu, 8 Jan 2026 19:24:20 -0500 Subject: [PATCH 01/15] wip: zen Revert "wip: zen" This reverts commit 9cedbfc2e758aa79abceeeb4446f0732d9707867. From ff5ceba9730595bededf9b2c8e9563c678001ca7 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Mon, 5 Jan 2026 21:21:55 -0700 Subject: [PATCH 02/15] fix(core): add dispose functions to prevent subscription memory leaks - Add sessionAbortControllers and cleanupSession() to ACP Agent for event subscription cleanup - Add subscriptions array and dispose() to Share, ShareNext, Plugin, and Format namespaces - Add Bus._getSubscriptionCount() and Bus._getTotalSubscriptionCount() test helpers - Add memory tests to verify subscription cleanup works correctly Fixes memory leak where Bus subscriptions accumulated during extended use, eventually causing Bun to run out of memory. --- packages/opencode/src/acp/agent.ts | 494 ++++++++++-------- packages/opencode/src/bus/index.ts | 15 + packages/opencode/src/format/index.ts | 62 ++- packages/opencode/src/plugin/index.ts | 26 +- packages/opencode/src/share/share-next.ts | 100 ++-- packages/opencode/src/share/share.ts | 53 +- .../opencode/test/memory/acp-cleanup.test.ts | 145 +++++ packages/opencode/test/memory/profile.ts | 296 +++++++++++ .../test/memory/subscription-cleanup.test.ts | 217 ++++++++ 9 files changed, 1086 insertions(+), 322 deletions(-) create mode 100644 packages/opencode/test/memory/acp-cleanup.test.ts create mode 100644 packages/opencode/test/memory/profile.ts create mode 100644 packages/opencode/test/memory/subscription-cleanup.test.ts diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 6d8a64b7d02..b3f8ae7cc4b 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -47,287 +47,315 @@ export namespace ACP { private config: ACPConfig private sdk: OpencodeClient private sessionManager + private sessionAbortControllers = new Map() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) + + // Cleanup all subscriptions when connection closes + this.connection.closed.then(() => { + log.info("connection closed, disposing agent") + this.dispose() + }) + } + + private cleanupSession(sessionId: string) { + const controller = this.sessionAbortControllers.get(sessionId) + if (controller) { + controller.abort() + this.sessionAbortControllers.delete(sessionId) + log.info("cleaned up event subscription", { sessionId }) + } } private setupEventSubscriptions(session: ACPSessionState) { const sessionId = session.id const directory = session.cwd + // Cleanup any existing subscription for this session + this.cleanupSession(sessionId) + + // Create abort controller for this session's event subscription + const controller = new AbortController() + this.sessionAbortControllers.set(sessionId, controller) + const options: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { - for await (const event of events.stream) { - switch (event.type) { - case "permission.asked": - try { - const permission = event.properties - const res = await this.connection - .requestPermission({ - sessionId, - toolCall: { - toolCallId: permission.tool?.callID ?? permission.id, - status: "pending", - title: permission.permission, - rawInput: permission.metadata, - kind: toToolKind(permission.permission), - locations: toLocations(permission.permission, permission.metadata), - }, - options, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, + this.config.sdk.event + .subscribe({ directory }) + .then(async (events) => { + for await (const event of events.stream) { + // Check if subscription was aborted + if (controller.signal.aborted) { + log.info("event subscription aborted", { sessionId }) + break + } + switch (event.type) { + case "permission.asked": + try { + const permission = event.properties + const res = await this.connection + .requestPermission({ + sessionId, + toolCall: { + toolCallId: permission.tool?.callID ?? permission.id, + status: "pending", + title: permission.permission, + rawInput: permission.metadata, + kind: toToolKind(permission.permission), + locations: toLocations(permission.permission, permission.metadata), + }, + options, + }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + await this.config.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }) + return }) + if (!res) return + if (res.outcome.outcome !== "selected") { await this.config.sdk.permission.reply({ requestID: permission.id, reply: "reject", directory, }) return - }) - if (!res) return - if (res.outcome.outcome !== "selected") { + } await this.config.sdk.permission.reply({ requestID: permission.id, - reply: "reject", + reply: res.outcome.optionId as "once" | "always" | "reject", directory, }) - return + } catch (err) { + log.error("unexpected error when handling permission", { error: err }) + } finally { + break } - await this.config.sdk.permission.reply({ - requestID: permission.id, - reply: res.outcome.optionId as "once" | "always" | "reject", - directory, - }) - } catch (err) { - log.error("unexpected error when handling permission", { error: err }) - } finally { - break - } - case "message.part.updated": - log.info("message part updated", { event: event.properties }) - try { - const props = event.properties - const { part } = props - - const message = await this.config.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined - }) + case "message.part.updated": + log.info("message part updated", { event: event.properties }) + try { + const props = event.properties + const { part } = props - if (!message || message.info.role !== "assistant") return + const message = await this.config.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((err) => { + log.error("unexpected error when fetching message", { error: err }) + return undefined + }) - if (part.type === "tool") { - switch (part.state.status) { - case "pending": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((err) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - break - case "running": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - break - case "completed": - const kind = toToolKind(part.tool) - const content: ToolCallContent[] = [ - { - type: "content", - content: { - type: "text", - text: part.state.output, + if (!message || message.info.role !== "assistant") return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, + }, + }) + .catch((err) => { + log.error("failed to send tool pending to ACP", { error: err }) + }) + break + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((err) => { + log.error("failed to send tool in_progress to ACP", { error: err }) + }) + break + case "completed": + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, + }, }, - }, - ] - - if (kind === "edit") { - const input = part.state.input - const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" - const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" - const newText = - typeof input["newString"] === "string" - ? input["newString"] - : typeof input["content"] === "string" - ? input["content"] - : "" - content.push({ - type: "diff", - path: filePath, - oldText, - newText, - }) - } - - if (part.tool === "todowrite") { - const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) - if (parsedTodos.success) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.data.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) - } else { - log.error("failed to parse todo output", { error: parsedTodos.error }) + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, + }) } - } + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), + }, + }) + .catch((err) => { + log.error("failed to send session update for todo", { error: err }) + }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) + } + } + + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool completed to ACP", { error: err }) + }) + break + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, + }, + }, + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool error to ACP", { error: err }) + }) + break + } + } else if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawInput: part.state.input, - rawOutput: { - output: part.state.output, - metadata: part.state.metadata, + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) + log.error("failed to send text to ACP", { error: err }) }) - break - case "error": + } + } else if (part.type === "reasoning") { + const delta = props.delta + if (delta) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, - }, - ], - rawOutput: { - error: part.state.error, + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) + log.error("failed to send reasoning to ACP", { error: err }) }) - break - } - } else if (part.type === "text") { - const delta = props.delta - if (delta && part.synthetic !== true) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) - }) - } - } else if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) + } } + } finally { + break } - } finally { - break - } + } } - } - }) + }) + .finally(() => { + // Cleanup controller reference when stream ends + this.sessionAbortControllers.delete(sessionId) + }) } async initialize(params: InitializeRequest): Promise { @@ -497,8 +525,6 @@ export namespace ACP { sessionUpdate: "tool_call_update", toolCallId: part.callID, status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, }, @@ -574,7 +600,6 @@ export namespace ACP { kind, content, title: part.state.title, - rawInput: part.state.input, rawOutput: { output: part.state.output, metadata: part.state.metadata, @@ -593,9 +618,6 @@ export namespace ACP { sessionUpdate: "tool_call_update", toolCallId: part.callID, status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, content: [ { type: "content", @@ -934,6 +956,8 @@ export namespace ACP { async cancel(params: CancelNotification) { const session = this.sessionManager.get(params.sessionId) + // Cleanup event subscription for this session + this.cleanupSession(params.sessionId) await this.config.sdk.session.abort( { sessionID: params.sessionId, @@ -942,6 +966,14 @@ export namespace ACP { { throwOnError: true }, ) } + + dispose() { + // Cleanup all session event subscriptions + for (const sessionId of this.sessionAbortControllers.keys()) { + this.cleanupSession(sessionId) + } + log.info("disposed all event subscriptions") + } } function toToolKind(toolName: string): ToolKind { diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..9d0b71410e8 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -102,4 +102,19 @@ export namespace Bus { match.splice(index, 1) } } + + /** @internal For testing purposes only - returns subscription count for a given event type */ + export function _getSubscriptionCount(type: string): number { + const subs = state().subscriptions.get(type) + return subs?.length ?? 0 + } + + /** @internal For testing purposes only - returns total subscription count across all event types */ + export function _getTotalSubscriptionCount(): number { + let total = 0 + for (const subs of state().subscriptions.values()) { + total += subs.length + } + return total + } } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index bab758030b9..ee5134dfe03 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -11,6 +11,7 @@ import { Instance } from "../project/instance" export namespace Format { const log = Log.create({ service: "format" }) + const subscriptions: (() => void)[] = [] export const Status = z .object({ @@ -102,36 +103,45 @@ export namespace Format { export function init() { log.info("init") - Bus.subscribe(File.Event.Edited, async (payload) => { - const file = payload.properties.file - log.info("formatting", { file }) - const ext = path.extname(file) + subscriptions.push( + Bus.subscribe(File.Event.Edited, async (payload) => { + const file = payload.properties.file + log.info("formatting", { file }) + const ext = path.extname(file) - for (const item of await getFormatter(ext)) { - log.info("running", { command: item.command }) - try { - const proc = Bun.spawn({ - cmd: item.command.map((x) => x.replace("$FILE", file)), - cwd: Instance.directory, - env: { ...process.env, ...item.environment }, - stdout: "ignore", - stderr: "ignore", - }) - const exit = await proc.exited - if (exit !== 0) - log.error("failed", { + for (const item of await getFormatter(ext)) { + log.info("running", { command: item.command }) + try { + const proc = Bun.spawn({ + cmd: item.command.map((x) => x.replace("$FILE", file)), + cwd: Instance.directory, + env: { ...process.env, ...item.environment }, + stdout: "ignore", + stderr: "ignore", + }) + const exit = await proc.exited + if (exit !== 0) + log.error("failed", { + command: item.command, + ...item.environment, + }) + } catch (error) { + log.error("failed to format file", { + error, command: item.command, ...item.environment, + file, }) - } catch (error) { - log.error("failed to format file", { - error, - command: item.command, - ...item.environment, - file, - }) + } } - } - }) + }), + ) + } + + export function dispose() { + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 } } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 4912b8f74ba..c7b3fc44960 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -11,6 +11,7 @@ import { CodexAuthPlugin } from "./codex" export namespace Plugin { const log = Log.create({ service: "plugin" }) + const subscriptions: (() => void)[] = [] const BUILTIN = ["opencode-copilot-auth@0.0.11", "opencode-anthropic-auth@0.0.8"] @@ -109,13 +110,22 @@ export namespace Plugin { // @ts-expect-error this is because we haven't moved plugin to sdk v2 await hook.config?.(config) } - Bus.subscribeAll(async (input) => { - const hooks = await state().then((x) => x.hooks) - for (const hook of hooks) { - hook["event"]?.({ - event: input, - }) - } - }) + subscriptions.push( + Bus.subscribeAll(async (input) => { + const hooks = await state().then((x) => x.hooks) + for (const hook of hooks) { + hook["event"]?.({ + event: input, + }) + } + }), + ) + } + + export function dispose() { + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 95271f8c827..9b85bf2d844 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -10,56 +10,76 @@ import type * as SDK from "@opencode-ai/sdk/v2" export namespace ShareNext { const log = Log.create({ service: "share-next" }) + const subscriptions: (() => void)[] = [] async function url() { return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") } export async function init() { - Bus.subscribe(Session.Event.Updated, async (evt) => { - await sync(evt.properties.info.id, [ - { - type: "session", - data: evt.properties.info, - }, - ]) - }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - await sync(evt.properties.info.sessionID, [ - { - type: "message", - data: evt.properties.info, - }, - ]) - if (evt.properties.info.role === "user") { + subscriptions.push( + Bus.subscribe(Session.Event.Updated, async (evt) => { + await sync(evt.properties.info.id, [ + { + type: "session", + data: evt.properties.info, + }, + ]) + }), + ) + subscriptions.push( + Bus.subscribe(MessageV2.Event.Updated, async (evt) => { await sync(evt.properties.info.sessionID, [ { - type: "model", - data: [ - await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then( - (m) => m, - ), - ], + type: "message", + data: evt.properties.info, }, ]) - } - }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - await sync(evt.properties.part.sessionID, [ - { - type: "part", - data: evt.properties.part, - }, - ]) - }) - Bus.subscribe(Session.Event.Diff, async (evt) => { - await sync(evt.properties.sessionID, [ - { - type: "session_diff", - data: evt.properties.diff, - }, - ]) - }) + if (evt.properties.info.role === "user") { + await sync(evt.properties.info.sessionID, [ + { + type: "model", + data: [ + await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then( + (m) => m, + ), + ], + }, + ]) + } + }), + ) + subscriptions.push( + Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + await sync(evt.properties.part.sessionID, [ + { + type: "part", + data: evt.properties.part, + }, + ]) + }), + ) + subscriptions.push( + Bus.subscribe(Session.Event.Diff, async (evt) => { + await sync(evt.properties.sessionID, [ + { + type: "session_diff", + data: evt.properties.diff, + }, + ]) + }), + ) + } + + export function dispose() { + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 + for (const entry of queue.values()) { + clearTimeout(entry.timeout) + } + queue.clear() } export async function create(sessionID: string) { diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 1006b23d556..c48855a0514 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -9,6 +9,7 @@ export namespace Share { let queue: Promise = Promise.resolve() const pending = new Map() + const subscriptions: (() => void)[] = [] export async function sync(key: string, content: any) { const [root, ...splits] = key.split("/") @@ -46,23 +47,41 @@ export namespace Share { } export function init() { - Bus.subscribe(Session.Event.Updated, async (evt) => { - await sync("session/info/" + evt.properties.info.id, evt.properties.info) - }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) - }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - await sync( - "session/part/" + - evt.properties.part.sessionID + - "/" + - evt.properties.part.messageID + - "/" + - evt.properties.part.id, - evt.properties.part, - ) - }) + subscriptions.push( + Bus.subscribe(Session.Event.Updated, async (evt) => { + await sync("session/info/" + evt.properties.info.id, evt.properties.info) + }), + ) + subscriptions.push( + Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + await sync( + "session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, + evt.properties.info, + ) + }), + ) + subscriptions.push( + Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + await sync( + "session/part/" + + evt.properties.part.sessionID + + "/" + + evt.properties.part.messageID + + "/" + + evt.properties.part.id, + evt.properties.part, + ) + }), + ) + } + + export function dispose() { + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 + pending.clear() + log.info("disposed share subscriptions") } export const URL = diff --git a/packages/opencode/test/memory/acp-cleanup.test.ts b/packages/opencode/test/memory/acp-cleanup.test.ts new file mode 100644 index 00000000000..ade90895fbc --- /dev/null +++ b/packages/opencode/test/memory/acp-cleanup.test.ts @@ -0,0 +1,145 @@ +import { test, expect, describe } from "bun:test" +import { ACP } from "../../src/acp/agent" + +/** + * Tests for ACP Agent session cleanup. + * Verifies that session event subscriptions are properly cleaned up. + */ + +describe("ACP.Agent session cleanup", () => { + test("cleanupSession removes abort controller", () => { + // Create a mock connection and config + const mockConnection = { + requestPermission: async () => ({ outcome: { outcome: "selected", optionId: "once" } }), + sessionUpdate: async () => {}, + } + + const mockConfig = { + sdk: { + event: { + subscribe: async () => ({ + stream: (async function* () { + // Empty stream + })(), + }), + }, + permission: { + reply: async () => {}, + }, + session: { + message: async () => ({ data: null }), + abort: async () => {}, + }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // Access private map for testing + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Simulate adding a controller + const controller = new AbortController() + controllers.set("test-session-1", controller) + + expect(controllers.size).toBe(1) + expect(controller.signal.aborted).toBe(false) + + // Call cleanup + // @ts-expect-error - accessing private for testing + agent.cleanupSession("test-session-1") + + expect(controllers.size).toBe(0) + expect(controller.signal.aborted).toBe(true) + }) + + test("dispose cleans up all session controllers", () => { + const mockConnection = {} + const mockConfig = { + sdk: { + event: { subscribe: async () => ({ stream: (async function* () {})() }) }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Add multiple controllers + const controller1 = new AbortController() + const controller2 = new AbortController() + const controller3 = new AbortController() + + controllers.set("session-1", controller1) + controllers.set("session-2", controller2) + controllers.set("session-3", controller3) + + expect(controllers.size).toBe(3) + + // Dispose all + agent.dispose() + + expect(controllers.size).toBe(0) + expect(controller1.signal.aborted).toBe(true) + expect(controller2.signal.aborted).toBe(true) + expect(controller3.signal.aborted).toBe(true) + }) + + test("setupEventSubscriptions replaces existing subscription for same session", () => { + const mockConnection = {} + const subscribeCallCount = { count: 0 } + const mockConfig = { + sdk: { + event: { + subscribe: async () => { + subscribeCallCount.count++ + return { + stream: (async function* () { + // Simulate a long-running stream that checks abort + while (true) { + await new Promise((r) => setTimeout(r, 100)) + yield { type: "test" } + } + })(), + } + }, + }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Manually add an existing controller to simulate an existing subscription + const existingController = new AbortController() + controllers.set("session-1", existingController) + + // Setup event subscriptions for the same session + const mockSession = { id: "session-1", cwd: "/test" } + // @ts-expect-error - accessing private for testing + agent.setupEventSubscriptions(mockSession) + + // The existing controller should be aborted + expect(existingController.signal.aborted).toBe(true) + + // A new controller should exist + expect(controllers.has("session-1")).toBe(true) + const newController = controllers.get("session-1") + expect(newController).not.toBe(existingController) + expect(newController?.signal.aborted).toBe(false) + + // Cleanup + agent.dispose() + }) +}) diff --git a/packages/opencode/test/memory/profile.ts b/packages/opencode/test/memory/profile.ts new file mode 100644 index 00000000000..e679f663be3 --- /dev/null +++ b/packages/opencode/test/memory/profile.ts @@ -0,0 +1,296 @@ +#!/usr/bin/env bun +/** + * Memory Profiling Script for OpenCode + * + * This script simulates subscription lifecycle patterns and monitors memory usage + * to verify that memory leaks have been fixed. + * + * Usage: + * bun run test/memory/profile.ts + * + * For heap snapshots (requires --expose-gc flag): + * bun --expose-gc run test/memory/profile.ts + */ + +import { Log } from "../../src/util/log" + +Log.init({ print: false, dev: false, level: "ERROR" }) + +interface MemorySnapshot { + label: string + heapUsed: number + heapTotal: number + external: number + rss: number + timestamp: number +} + +function takeSnapshot(label: string): MemorySnapshot { + const mem = process.memoryUsage() + return { + label, + heapUsed: mem.heapUsed, + heapTotal: mem.heapTotal, + external: mem.external, + rss: mem.rss, + timestamp: Date.now(), + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B` + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(2)} KB` + return `${(bytes / 1024 / 1024).toFixed(2)} MB` +} + +function printSnapshot(snapshot: MemorySnapshot) { + console.log(`[${snapshot.label}]`) + console.log(` Heap Used: ${formatBytes(snapshot.heapUsed)}`) + console.log(` Heap Total: ${formatBytes(snapshot.heapTotal)}`) + console.log(` RSS: ${formatBytes(snapshot.rss)}`) +} + +function compareSnapshots(before: MemorySnapshot, after: MemorySnapshot) { + const heapDiff = after.heapUsed - before.heapUsed + const rssDiff = after.rss - before.rss + console.log(`\n[Delta: ${before.label} -> ${after.label}]`) + console.log(` Heap Used: ${heapDiff >= 0 ? "+" : ""}${formatBytes(heapDiff)}`) + console.log(` RSS: ${rssDiff >= 0 ? "+" : ""}${formatBytes(rssDiff)}`) + return { heapDiff, rssDiff } +} + +function forceGC() { + if (global.gc) { + global.gc() + console.log(" (GC forced)") + } +} + +async function sleep(ms: number) { + await new Promise((r) => setTimeout(r, ms)) +} + +// ============================================================================ +// Test Scenarios +// ============================================================================ + +async function testShareSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: Share subscription init/dispose cycles") + console.log("=".repeat(60)) + + const { Share } = await import("../../src/share/share") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Share.init() + Share.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + // Check for leaks (should be less than 1MB growth for 1000 cycles) + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true +} + +async function testShareNextSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: ShareNext subscription init/dispose cycles") + console.log("=".repeat(60)) + + const { ShareNext } = await import("../../src/share/share-next") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + await ShareNext.init() + ShareNext.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true +} + +async function testFormatSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: Format subscription init/dispose cycles") + console.log("=".repeat(60)) + + const { Format } = await import("../../src/format") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Format.init() + Format.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true +} + +async function testACPControllerCleanup() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: ACP Agent controller cleanup") + console.log("=".repeat(60)) + + const { ACP } = await import("../../src/acp/agent") + + const mockConnection = {} + const mockConfig = { + sdk: { + event: { subscribe: async () => ({ stream: (async function* () {})() }) }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nCreating and cleaning up ${iterations} sessions...`) + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + for (let i = 0; i < iterations; i++) { + // Simulate session creation with abort controller + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + controllers.set(`session-${i}`, new AbortController()) + + // Clean it up + // @ts-expect-error - accessing private for testing + agent.cleanupSession(`session-${i}`) + + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} sessions cleaned\r`) + } + } + console.log(` ${iterations} sessions cleaned`) + + agent.dispose() + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after controller cleanup cycles") + return true +} + +// ============================================================================ +// Main +// ============================================================================ + +async function main() { + console.log("OpenCode Memory Profiling") + console.log("=".repeat(60)) + console.log(`GC Available: ${global.gc ? "Yes" : "No (run with --expose-gc for accurate results)"}`) + console.log(`Platform: ${process.platform}`) + console.log(`Bun Version: ${Bun.version}`) + + const results: boolean[] = [] + + try { + results.push(await testShareSubscriptionCycles()) + results.push(await testShareNextSubscriptionCycles()) + results.push(await testFormatSubscriptionCycles()) + results.push(await testACPControllerCleanup()) + } catch (err) { + console.error("\nError during profiling:", err) + process.exit(1) + } + + console.log("\n" + "=".repeat(60)) + console.log("SUMMARY") + console.log("=".repeat(60)) + + const passed = results.filter(Boolean).length + const total = results.length + + console.log(`Tests passed: ${passed}/${total}`) + + if (passed === total) { + console.log("\n✅ All memory tests passed!") + process.exit(0) + } else { + console.log("\n❌ Some memory tests failed!") + process.exit(1) + } +} + +main() diff --git a/packages/opencode/test/memory/subscription-cleanup.test.ts b/packages/opencode/test/memory/subscription-cleanup.test.ts new file mode 100644 index 00000000000..64210b8d24d --- /dev/null +++ b/packages/opencode/test/memory/subscription-cleanup.test.ts @@ -0,0 +1,217 @@ +import { test, expect, describe, beforeAll } from "bun:test" +import { Bus } from "../../src/bus" +import { Session } from "../../src/session" +import { MessageV2 } from "../../src/session/message-v2" +import { File } from "../../src/file" +import { Instance } from "../../src/project/instance" +import path from "path" +import os from "os" +import fs from "fs/promises" + +/** + * Tests to verify that subscription cleanup functions work correctly. + * These tests verify that dispose() functions properly unsubscribe from the Bus, + * preventing memory leaks from accumulated event handlers. + */ + +let testDir: string + +beforeAll(async () => { + // Create a temp directory for the test instance + testDir = path.join(os.tmpdir(), `opencode-memory-test-${Date.now()}`) + await fs.mkdir(testDir, { recursive: true }) + await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) +}) + +describe("subscription cleanup", () => { + describe("Share.dispose()", () => { + test("should unsubscribe from all events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const beforeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const beforeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const beforePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + Share.init() + + const afterInitSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterInitMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterInitPart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + // Verify subscriptions were added + expect(afterInitSession).toBe(beforeSession + 1) + expect(afterInitMessage).toBe(beforeMessage + 1) + expect(afterInitPart).toBe(beforePart + 1) + + Share.dispose() + + const afterDisposeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterDisposeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterDisposePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + // Verify subscriptions were removed + expect(afterDisposeSession).toBe(beforeSession) + expect(afterDisposeMessage).toBe(beforeMessage) + expect(afterDisposePart).toBe(beforePart) + }, + }) + }) + + test("multiple init/dispose cycles maintain correct count", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const baseline = Bus._getTotalSubscriptionCount() + + for (let i = 0; i < 10; i++) { + Share.init() + Share.dispose() + } + + const afterCycles = Bus._getTotalSubscriptionCount() + expect(afterCycles).toBe(baseline) + }, + }) + }) + }) + + describe("ShareNext.dispose()", () => { + test("should unsubscribe from all events and clear queue", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + const beforeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const beforeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const beforePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const beforeDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + await ShareNext.init() + + const afterInitSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterInitMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterInitPart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const afterInitDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + expect(afterInitSession).toBe(beforeSession + 1) + expect(afterInitMessage).toBe(beforeMessage + 1) + expect(afterInitPart).toBe(beforePart + 1) + expect(afterInitDiff).toBe(beforeDiff + 1) + + ShareNext.dispose() + + const afterDisposeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterDisposeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterDisposePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const afterDisposeDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + expect(afterDisposeSession).toBe(beforeSession) + expect(afterDisposeMessage).toBe(beforeMessage) + expect(afterDisposePart).toBe(beforePart) + expect(afterDisposeDiff).toBe(beforeDiff) + }, + }) + }) + }) + + describe("Format.dispose()", () => { + test("should unsubscribe from file edit events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Format } = await import("../../src/format") + + const before = Bus._getSubscriptionCount(File.Event.Edited.type) + + Format.init() + + const afterInit = Bus._getSubscriptionCount(File.Event.Edited.type) + expect(afterInit).toBe(before + 1) + + Format.dispose() + + const afterDispose = Bus._getSubscriptionCount(File.Event.Edited.type) + expect(afterDispose).toBe(before) + }, + }) + }) + }) + + describe("Plugin.dispose()", () => { + test("should unsubscribe from wildcard events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + // Test the dispose pattern directly without Plugin.init() which is slow + // Plugin.init() loads npm packages which can timeout in tests + const before = Bus._getSubscriptionCount("*") + + // Simulate what Plugin.init() does - subscribe to all events + const unsub = Bus.subscribeAll(() => {}) + const afterSubscribe = Bus._getSubscriptionCount("*") + expect(afterSubscribe).toBe(before + 1) + + // Unsubscribe like dispose() would + unsub() + const afterUnsubscribe = Bus._getSubscriptionCount("*") + expect(afterUnsubscribe).toBe(before) + }, + }) + }) + }) +}) + +describe("memory stability", () => { + test("repeated init/dispose cycles should not leak subscriptions", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const baseline = Bus._getTotalSubscriptionCount() + const iterations = 100 + + for (let i = 0; i < iterations; i++) { + Share.init() + Share.dispose() + } + + const afterCycles = Bus._getTotalSubscriptionCount() + expect(afterCycles).toBe(baseline) + }, + }) + }) + + test("init without dispose should accumulate subscriptions (verifies test validity)", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + // This test verifies that without dispose(), subscriptions DO accumulate + // This proves our dispose() tests are meaningful + const before = Bus._getTotalSubscriptionCount() + + // Manually subscribe without disposing + const unsub1 = Bus.subscribe(Session.Event.Updated, () => {}) + const unsub2 = Bus.subscribe(Session.Event.Updated, () => {}) + const unsub3 = Bus.subscribe(Session.Event.Updated, () => {}) + + const afterSubscribe = Bus._getTotalSubscriptionCount() + expect(afterSubscribe).toBe(before + 3) + + // Clean up manually + unsub1() + unsub2() + unsub3() + + const afterUnsubscribe = Bus._getTotalSubscriptionCount() + expect(afterUnsubscribe).toBe(before) + }, + }) + }) +}) From 00210a4c80c1f74062910f70508a42b5f90e3f54 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Mon, 5 Jan 2026 21:41:55 -0700 Subject: [PATCH 03/15] fix: address PR review feedback - prevent duplicate subscriptions on re-init - Add dispose() call at start of init() in Share, ShareNext, Format, Plugin to prevent subscription accumulation if init() is called multiple times - Add .catch() handler to ACP event subscription promise to log errors - Fix race condition in ACP finally block by checking controller identity - Add test for multiple init() calls not accumulating subscriptions --- packages/opencode/src/acp/agent.ts | 11 +++++-- packages/opencode/src/format/index.ts | 2 ++ packages/opencode/src/plugin/index.ts | 2 ++ packages/opencode/src/share/share-next.ts | 2 ++ packages/opencode/src/share/share.ts | 2 ++ .../test/memory/subscription-cleanup.test.ts | 29 +++++++++++++++++++ 6 files changed, 46 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index b3f8ae7cc4b..e2fea5bdca4 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -352,9 +352,16 @@ export namespace ACP { } } }) + .catch((err) => { + log.error("error in event subscription", { sessionId, error: err }) + }) .finally(() => { - // Cleanup controller reference when stream ends - this.sessionAbortControllers.delete(sessionId) + // Cleanup controller reference when stream ends, but only if it's the same controller + // to prevent race condition with re-subscriptions + const current = this.sessionAbortControllers.get(sessionId) + if (current === controller) { + this.sessionAbortControllers.delete(sessionId) + } }) } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index ee5134dfe03..592b9be1c1c 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -103,6 +103,8 @@ export namespace Format { export function init() { log.info("init") + // Clean up any existing subscriptions to prevent duplicates on re-init + dispose() subscriptions.push( Bus.subscribe(File.Event.Edited, async (payload) => { const file = payload.properties.file diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index c7b3fc44960..8191d9959fc 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -104,6 +104,8 @@ export namespace Plugin { } export async function init() { + // Clean up any existing subscriptions to prevent duplicates on re-init + dispose() const hooks = await state().then((x) => x.hooks) const config = await Config.get() for (const hook of hooks) { diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 9b85bf2d844..5492a78137e 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -17,6 +17,8 @@ export namespace ShareNext { } export async function init() { + // Clean up any existing subscriptions to prevent duplicates on re-init + dispose() subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync(evt.properties.info.id, [ diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index c48855a0514..3835d5d9c14 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -47,6 +47,8 @@ export namespace Share { } export function init() { + // Clean up any existing subscriptions to prevent duplicates on re-init + dispose() subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync("session/info/" + evt.properties.info.id, evt.properties.info) diff --git a/packages/opencode/test/memory/subscription-cleanup.test.ts b/packages/opencode/test/memory/subscription-cleanup.test.ts index 64210b8d24d..5956cf6d129 100644 --- a/packages/opencode/test/memory/subscription-cleanup.test.ts +++ b/packages/opencode/test/memory/subscription-cleanup.test.ts @@ -118,6 +118,35 @@ describe("subscription cleanup", () => { }, }) }) + + test("multiple init calls should not accumulate subscriptions", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + const baseline = Bus._getTotalSubscriptionCount() + + // Call init multiple times without dispose - should not accumulate + // because init() now calls dispose() at the start + await ShareNext.init() + const afterFirstInit = Bus._getTotalSubscriptionCount() + + await ShareNext.init() + const afterSecondInit = Bus._getTotalSubscriptionCount() + + await ShareNext.init() + const afterThirdInit = Bus._getTotalSubscriptionCount() + + // All counts should be the same - no accumulation + expect(afterSecondInit).toBe(afterFirstInit) + expect(afterThirdInit).toBe(afterFirstInit) + + ShareNext.dispose() + expect(Bus._getTotalSubscriptionCount()).toBe(baseline) + }, + }) + }) }) describe("Format.dispose()", () => { From f0953f76d073926ee85cb0cb10850ac19a192b87 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Mon, 5 Jan 2026 21:44:27 -0700 Subject: [PATCH 04/15] fix: add queue cleanup verification to ShareNext test - Add _getQueueSize() test helper to ShareNext - Add assertion to verify queue is cleared after dispose() --- packages/opencode/src/share/share-next.ts | 5 +++++ packages/opencode/test/memory/subscription-cleanup.test.ts | 3 +++ 2 files changed, 8 insertions(+) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 5492a78137e..f400c5fa50a 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -84,6 +84,11 @@ export namespace ShareNext { queue.clear() } + /** @internal Test helper to get queue size */ + export function _getQueueSize() { + return queue.size + } + export async function create(sessionID: string) { log.info("creating share", { sessionID }) const result = await fetch(`${await url()}/api/share`, { diff --git a/packages/opencode/test/memory/subscription-cleanup.test.ts b/packages/opencode/test/memory/subscription-cleanup.test.ts index 5956cf6d129..084defd9dbc 100644 --- a/packages/opencode/test/memory/subscription-cleanup.test.ts +++ b/packages/opencode/test/memory/subscription-cleanup.test.ts @@ -115,6 +115,9 @@ describe("subscription cleanup", () => { expect(afterDisposeMessage).toBe(beforeMessage) expect(afterDisposePart).toBe(beforePart) expect(afterDisposeDiff).toBe(beforeDiff) + + // Verify queue is cleared + expect(ShareNext._getQueueSize()).toBe(0) }, }) }) From e01e0913b76e02a47c295f612eb7b03c23ccfd4e Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Tue, 6 Jan 2026 21:03:33 -0700 Subject: [PATCH 05/15] test: add queue cleanup verification to ShareNext dispose test Added _addToQueueForTesting helper and a new test that verifies dispose() properly clears queue items with pending timeouts. --- packages/opencode/src/share/share-next.ts | 6 ++++ .../test/memory/subscription-cleanup.test.ts | 28 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index f400c5fa50a..c9a0e67d562 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -89,6 +89,12 @@ export namespace ShareNext { return queue.size } + /** @internal Test helper to add items to queue for testing dispose cleanup */ + export function _addToQueueForTesting(sessionID: string) { + const timeout = setTimeout(() => {}, 10000) + queue.set(sessionID, { timeout, data: new Map() }) + } + export async function create(sessionID: string) { log.info("creating share", { sessionID }) const result = await fetch(`${await url()}/api/share`, { diff --git a/packages/opencode/test/memory/subscription-cleanup.test.ts b/packages/opencode/test/memory/subscription-cleanup.test.ts index 084defd9dbc..e5b02b9211d 100644 --- a/packages/opencode/test/memory/subscription-cleanup.test.ts +++ b/packages/opencode/test/memory/subscription-cleanup.test.ts @@ -150,6 +150,34 @@ describe("subscription cleanup", () => { }, }) }) + + test("dispose should clear pending queue items and their timeouts", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + await ShareNext.init() + + // Verify queue starts empty + expect(ShareNext._getQueueSize()).toBe(0) + + // Add items to the queue using the test helper + ShareNext._addToQueueForTesting("session-1") + ShareNext._addToQueueForTesting("session-2") + ShareNext._addToQueueForTesting("session-3") + + // Verify items were added + expect(ShareNext._getQueueSize()).toBe(3) + + // dispose() should clear all queue items and their timeouts + ShareNext.dispose() + + // Verify queue is cleared + expect(ShareNext._getQueueSize()).toBe(0) + }, + }) + }) }) describe("Format.dispose()", () => { From 57a3134689d60d09bb5d501675710f2bf6ae0cbf Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Tue, 6 Jan 2026 21:24:40 -0700 Subject: [PATCH 06/15] fix: address PR review comments for queue reset and test generator cleanup - Reset queue Promise chain in Share.dispose() to fully clean up state - Fix async generator in test to use finite loop with abort signal check to prevent background runaway after test completion --- packages/opencode/src/share/share.ts | 1 + packages/opencode/test/memory/acp-cleanup.test.ts | 13 +++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 3835d5d9c14..4ef5fb11f76 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -83,6 +83,7 @@ export namespace Share { } subscriptions.length = 0 pending.clear() + queue = Promise.resolve() log.info("disposed share subscriptions") } diff --git a/packages/opencode/test/memory/acp-cleanup.test.ts b/packages/opencode/test/memory/acp-cleanup.test.ts index ade90895fbc..3ef75c87d84 100644 --- a/packages/opencode/test/memory/acp-cleanup.test.ts +++ b/packages/opencode/test/memory/acp-cleanup.test.ts @@ -94,19 +94,28 @@ describe("ACP.Agent session cleanup", () => { test("setupEventSubscriptions replaces existing subscription for same session", () => { const mockConnection = {} const subscribeCallCount = { count: 0 } + // Track active generators so we can verify they terminate + const activeGenerators = new Set() const mockConfig = { sdk: { event: { subscribe: async () => { subscribeCallCount.count++ + // Create a signal to control this generator's lifecycle + const genController = new AbortController() + activeGenerators.add(genController) return { stream: (async function* () { - // Simulate a long-running stream that checks abort - while (true) { + // Use finite loop with abort check to prevent background runaway + for (let i = 0; i < 100 && !genController.signal.aborted; i++) { await new Promise((r) => setTimeout(r, 100)) + if (genController.signal.aborted) break yield { type: "test" } } + activeGenerators.delete(genController) })(), + // Expose abort to allow cleanup + abort: () => genController.abort(), } }, }, From 95284eaf6a0e3740b3e4b14fd2f90bba256cfee5 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Tue, 6 Jan 2026 22:13:42 -0700 Subject: [PATCH 07/15] chore: trigger CI From 58ef6b83aeeebd7e4b845c337d60beb819cd861c Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 7 Jan 2026 08:46:28 -0700 Subject: [PATCH 08/15] fix: add null check for session in cancel handler to prevent runtime error --- packages/opencode/src/acp/agent.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index e2fea5bdca4..56741561971 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -965,6 +965,10 @@ export namespace ACP { const session = this.sessionManager.get(params.sessionId) // Cleanup event subscription for this session this.cleanupSession(params.sessionId) + if (!session) { + log.warn("cancel called for unknown session", { sessionId: params.sessionId }) + return + } await this.config.sdk.session.abort( { sessionID: params.sessionId, From efc7b55b72676a32ba057bd3edcafd592aeb509d Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 7 Jan 2026 21:30:25 -0700 Subject: [PATCH 09/15] fix: always call SDK abort in cancel handler, use optional chaining for directory Since directory is optional in the SDK, we can always call abort even if session is not found in the manager. This ensures the SDK is never left in an inconsistent state. --- packages/opencode/src/acp/agent.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 56741561971..466b93f914e 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -965,14 +965,11 @@ export namespace ACP { const session = this.sessionManager.get(params.sessionId) // Cleanup event subscription for this session this.cleanupSession(params.sessionId) - if (!session) { - log.warn("cancel called for unknown session", { sessionId: params.sessionId }) - return - } + // Always call SDK abort - directory is optional, session.cwd provides optimization hint await this.config.sdk.session.abort( { sessionID: params.sessionId, - directory: session.cwd, + directory: session?.cwd, }, { throwOnError: true }, ) From 46c2fec941bcd461f6030a6e0654d3a77978009c Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 7 Jan 2026 21:48:31 -0700 Subject: [PATCH 10/15] fix: register dispose callbacks with Instance.state() and hook Agent dispose to connection close - Refactor share.ts, share-next.ts, format/index.ts, plugin/index.ts to use Instance.state() with dispose callbacks for automatic subscription cleanup - Add connection.closed handler in ACP Agent constructor to call dispose() when connection ends - Update test mocks to include closed property --- packages/opencode/src/format/index.ts | 16 ++++- packages/opencode/src/plugin/index.ts | 16 ++++- packages/opencode/src/share/share-next.ts | 59 ++++++++++++++----- packages/opencode/src/share/share.ts | 51 +++++++++++----- .../opencode/test/memory/acp-cleanup.test.ts | 9 ++- 5 files changed, 117 insertions(+), 34 deletions(-) diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index 592b9be1c1c..fe9e485d48c 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -11,7 +11,6 @@ import { Instance } from "../project/instance" export namespace Format { const log = Log.create({ service: "format" }) - const subscriptions: (() => void)[] = [] export const Status = z .object({ @@ -64,6 +63,18 @@ export namespace Format { } }) + // Separate state for subscriptions with dispose callback + const subscriptionState = Instance.state<(() => void)[]>( + () => [], + async (subscriptions) => { + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 + log.info("disposed format subscriptions") + }, + ) + async function isEnabled(item: Formatter.Info) { const s = await state() let status = s.enabled[item.name] @@ -103,6 +114,7 @@ export namespace Format { export function init() { log.info("init") + const subscriptions = subscriptionState() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() subscriptions.push( @@ -141,9 +153,11 @@ export namespace Format { } export function dispose() { + const subscriptions = subscriptionState() for (const unsub of subscriptions) { unsub() } subscriptions.length = 0 + log.info("disposed format subscriptions") } } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 8191d9959fc..3a5eede4f8d 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -11,7 +11,6 @@ import { CodexAuthPlugin } from "./codex" export namespace Plugin { const log = Log.create({ service: "plugin" }) - const subscriptions: (() => void)[] = [] const BUILTIN = ["opencode-copilot-auth@0.0.11", "opencode-anthropic-auth@0.0.8"] @@ -82,6 +81,18 @@ export namespace Plugin { } }) + // Separate state for subscriptions with dispose callback + const subscriptionState = Instance.state<(() => void)[]>( + () => [], + async (subscriptions) => { + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 + log.info("disposed plugin subscriptions") + }, + ) + export async function trigger< Name extends Exclude, "auth" | "event" | "tool">, Input = Parameters[Name]>[0], @@ -104,6 +115,7 @@ export namespace Plugin { } export async function init() { + const subscriptions = subscriptionState() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() const hooks = await state().then((x) => x.hooks) @@ -125,9 +137,11 @@ export namespace Plugin { } export function dispose() { + const subscriptions = subscriptionState() for (const unsub of subscriptions) { unsub() } subscriptions.length = 0 + log.info("disposed plugin subscriptions") } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index c9a0e67d562..f509b75e1ab 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -2,6 +2,7 @@ import { Bus } from "@/bus" import { Config } from "@/config/config" import { ulid } from "ulid" import { Provider } from "@/provider/provider" +import { Instance } from "@/project/instance" import { Session } from "@/session" import { MessageV2 } from "@/session/message-v2" import { Storage } from "@/storage/storage" @@ -10,16 +11,39 @@ import type * as SDK from "@opencode-ai/sdk/v2" export namespace ShareNext { const log = Log.create({ service: "share-next" }) - const subscriptions: (() => void)[] = [] + + interface ShareNextState { + subscriptions: (() => void)[] + queue: Map }> + } + + const state = Instance.state( + () => ({ + subscriptions: [], + queue: new Map(), + }), + async (s) => { + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + for (const entry of s.queue.values()) { + clearTimeout(entry.timeout) + } + s.queue.clear() + log.info("disposed share-next subscriptions") + }, + ) async function url() { return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") } export async function init() { + const s = state() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() - subscriptions.push( + s.subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync(evt.properties.info.id, [ { @@ -29,7 +53,7 @@ export namespace ShareNext { ]) }), ) - subscriptions.push( + s.subscriptions.push( Bus.subscribe(MessageV2.Event.Updated, async (evt) => { await sync(evt.properties.info.sessionID, [ { @@ -51,7 +75,7 @@ export namespace ShareNext { } }), ) - subscriptions.push( + s.subscriptions.push( Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { await sync(evt.properties.part.sessionID, [ { @@ -61,7 +85,7 @@ export namespace ShareNext { ]) }), ) - subscriptions.push( + s.subscriptions.push( Bus.subscribe(Session.Event.Diff, async (evt) => { await sync(evt.properties.sessionID, [ { @@ -74,25 +98,28 @@ export namespace ShareNext { } export function dispose() { - for (const unsub of subscriptions) { + const s = state() + for (const unsub of s.subscriptions) { unsub() } - subscriptions.length = 0 - for (const entry of queue.values()) { + s.subscriptions.length = 0 + for (const entry of s.queue.values()) { clearTimeout(entry.timeout) } - queue.clear() + s.queue.clear() + log.info("disposed share-next subscriptions") } /** @internal Test helper to get queue size */ export function _getQueueSize() { - return queue.size + return state().queue.size } /** @internal Test helper to add items to queue for testing dispose cleanup */ export function _addToQueueForTesting(sessionID: string) { + const s = state() const timeout = setTimeout(() => {}, 10000) - queue.set(sessionID, { timeout, data: new Map() }) + s.queue.set(sessionID, { timeout, data: new Map() }) } export async function create(sessionID: string) { @@ -141,9 +168,9 @@ export namespace ShareNext { data: SDK.Model[] } - const queue = new Map }>() async function sync(sessionID: string, data: Data[]) { - const existing = queue.get(sessionID) + const s = state() + const existing = s.queue.get(sessionID) if (existing) { for (const item of data) { existing.data.set("id" in item ? (item.id as string) : ulid(), item) @@ -157,9 +184,9 @@ export namespace ShareNext { } const timeout = setTimeout(async () => { - const queued = queue.get(sessionID) + const queued = s.queue.get(sessionID) if (!queued) return - queue.delete(sessionID) + s.queue.delete(sessionID) const share = await get(sessionID).catch(() => undefined) if (!share) return @@ -174,7 +201,7 @@ export namespace ShareNext { }), }) }, 1000) - queue.set(sessionID, { timeout, data: dataMap }) + s.queue.set(sessionID, { timeout, data: dataMap }) } export async function remove(sessionID: string) { diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 4ef5fb11f76..53bf64a9b3c 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -1,5 +1,6 @@ import { Bus } from "../bus" import { Installation } from "../installation" +import { Instance } from "../project/instance" import { Session } from "../session" import { MessageV2 } from "../session/message-v2" import { Log } from "../util/log" @@ -7,11 +8,31 @@ import { Log } from "../util/log" export namespace Share { const log = Log.create({ service: "share" }) - let queue: Promise = Promise.resolve() - const pending = new Map() - const subscriptions: (() => void)[] = [] + interface ShareState { + queue: Promise + pending: Map + subscriptions: (() => void)[] + } + + const state = Instance.state( + () => ({ + queue: Promise.resolve(), + pending: new Map(), + subscriptions: [], + }), + async (s) => { + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + s.pending.clear() + s.queue = Promise.resolve() + log.info("disposed share subscriptions") + }, + ) export async function sync(key: string, content: any) { + const s = state() const [root, ...splits] = key.split("/") if (root !== "session") return const [sub, sessionID] = splits @@ -19,12 +40,12 @@ export namespace Share { const share = await Session.getShare(sessionID).catch(() => {}) if (!share) return const { secret } = share - pending.set(key, content) - queue = queue + s.pending.set(key, content) + s.queue = s.queue .then(async () => { - const content = pending.get(key) + const content = s.pending.get(key) if (content === undefined) return - pending.delete(key) + s.pending.delete(key) return fetch(`${URL}/share_sync`, { method: "POST", @@ -47,14 +68,15 @@ export namespace Share { } export function init() { + const s = state() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() - subscriptions.push( + s.subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync("session/info/" + evt.properties.info.id, evt.properties.info) }), ) - subscriptions.push( + s.subscriptions.push( Bus.subscribe(MessageV2.Event.Updated, async (evt) => { await sync( "session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, @@ -62,7 +84,7 @@ export namespace Share { ) }), ) - subscriptions.push( + s.subscriptions.push( Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { await sync( "session/part/" + @@ -78,12 +100,13 @@ export namespace Share { } export function dispose() { - for (const unsub of subscriptions) { + const s = state() + for (const unsub of s.subscriptions) { unsub() } - subscriptions.length = 0 - pending.clear() - queue = Promise.resolve() + s.subscriptions.length = 0 + s.pending.clear() + s.queue = Promise.resolve() log.info("disposed share subscriptions") } diff --git a/packages/opencode/test/memory/acp-cleanup.test.ts b/packages/opencode/test/memory/acp-cleanup.test.ts index 3ef75c87d84..0267d65e90d 100644 --- a/packages/opencode/test/memory/acp-cleanup.test.ts +++ b/packages/opencode/test/memory/acp-cleanup.test.ts @@ -12,6 +12,7 @@ describe("ACP.Agent session cleanup", () => { const mockConnection = { requestPermission: async () => ({ outcome: { outcome: "selected", optionId: "once" } }), sessionUpdate: async () => {}, + closed: new Promise(() => {}), // Never resolves during test } const mockConfig = { @@ -56,7 +57,9 @@ describe("ACP.Agent session cleanup", () => { }) test("dispose cleans up all session controllers", () => { - const mockConnection = {} + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } const mockConfig = { sdk: { event: { subscribe: async () => ({ stream: (async function* () {})() }) }, @@ -92,7 +95,9 @@ describe("ACP.Agent session cleanup", () => { }) test("setupEventSubscriptions replaces existing subscription for same session", () => { - const mockConnection = {} + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } const subscribeCallCount = { count: 0 } // Track active generators so we can verify they terminate const activeGenerators = new Set() From 2945dd85ed743dd5af20f24582b484a7387fa758 Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 7 Jan 2026 22:36:51 -0700 Subject: [PATCH 11/15] fix: consolidate dispose logic and add Instance.provide wrappers - Have Instance.state dispose callbacks delegate to exported dispose() functions to avoid duplication - Reorder init() to call dispose() before getting state reference for clearer semantics - Add afterAll cleanup in subscription-cleanup.test.ts to clean up temp directory - Wrap profile.ts test functions in Instance.provide for proper Instance context - Add closed property to ACP mock in profile.ts --- packages/opencode/src/format/index.ts | 11 +- packages/opencode/src/plugin/index.ts | 11 +- packages/opencode/src/share/share-next.ts | 15 +- packages/opencode/src/share/share.ts | 13 +- packages/opencode/test/memory/profile.ts | 239 +++++++++++------- .../test/memory/subscription-cleanup.test.ts | 9 +- 6 files changed, 165 insertions(+), 133 deletions(-) diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index fe9e485d48c..741696140b8 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -66,12 +66,9 @@ export namespace Format { // Separate state for subscriptions with dispose callback const subscriptionState = Instance.state<(() => void)[]>( () => [], - async (subscriptions) => { - for (const unsub of subscriptions) { - unsub() - } - subscriptions.length = 0 - log.info("disposed format subscriptions") + async () => { + // Delegate to the exported dispose function to keep cleanup logic centralized + dispose() }, ) @@ -114,9 +111,9 @@ export namespace Format { export function init() { log.info("init") - const subscriptions = subscriptionState() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() + const subscriptions = subscriptionState() subscriptions.push( Bus.subscribe(File.Event.Edited, async (payload) => { const file = payload.properties.file diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 3a5eede4f8d..6ab2a0814d3 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -84,12 +84,9 @@ export namespace Plugin { // Separate state for subscriptions with dispose callback const subscriptionState = Instance.state<(() => void)[]>( () => [], - async (subscriptions) => { - for (const unsub of subscriptions) { - unsub() - } - subscriptions.length = 0 - log.info("disposed plugin subscriptions") + async () => { + // Delegate to the exported dispose function to keep cleanup logic centralized + dispose() }, ) @@ -115,9 +112,9 @@ export namespace Plugin { } export async function init() { - const subscriptions = subscriptionState() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() + const subscriptions = subscriptionState() const hooks = await state().then((x) => x.hooks) const config = await Config.get() for (const hook of hooks) { diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index f509b75e1ab..9cec394aebd 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -22,16 +22,9 @@ export namespace ShareNext { subscriptions: [], queue: new Map(), }), - async (s) => { - for (const unsub of s.subscriptions) { - unsub() - } - s.subscriptions.length = 0 - for (const entry of s.queue.values()) { - clearTimeout(entry.timeout) - } - s.queue.clear() - log.info("disposed share-next subscriptions") + async () => { + // Delegate to the exported dispose function to keep cleanup logic centralized + dispose() }, ) @@ -40,9 +33,9 @@ export namespace ShareNext { } export async function init() { - const s = state() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() + const s = state() s.subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync(evt.properties.info.id, [ diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 53bf64a9b3c..7a8011bd08a 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -20,14 +20,9 @@ export namespace Share { pending: new Map(), subscriptions: [], }), - async (s) => { - for (const unsub of s.subscriptions) { - unsub() - } - s.subscriptions.length = 0 - s.pending.clear() - s.queue = Promise.resolve() - log.info("disposed share subscriptions") + async () => { + // Delegate to the exported dispose function to keep cleanup logic centralized + dispose() }, ) @@ -68,9 +63,9 @@ export namespace Share { } export function init() { - const s = state() // Clean up any existing subscriptions to prevent duplicates on re-init dispose() + const s = state() s.subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync("session/info/" + evt.properties.info.id, evt.properties.info) diff --git a/packages/opencode/test/memory/profile.ts b/packages/opencode/test/memory/profile.ts index e679f663be3..1797a35ad70 100644 --- a/packages/opencode/test/memory/profile.ts +++ b/packages/opencode/test/memory/profile.ts @@ -13,9 +13,28 @@ */ import { Log } from "../../src/util/log" +import { Instance } from "../../src/project/instance" +import path from "path" +import os from "os" +import fs from "fs/promises" Log.init({ print: false, dev: false, level: "ERROR" }) +// Create a temp directory for Instance.provide +let testDir: string + +async function setupTestDir() { + testDir = path.join(os.tmpdir(), `opencode-profile-${Date.now()}`) + await fs.mkdir(testDir, { recursive: true }) + await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) +} + +async function cleanupTestDir() { + if (testDir) { + await fs.rm(testDir, { recursive: true, force: true }).catch(() => {}) + } +} + interface MemorySnapshot { label: string heapUsed: number @@ -79,39 +98,44 @@ async function testShareSubscriptionCycles() { console.log("TEST: Share subscription init/dispose cycles") console.log("=".repeat(60)) - const { Share } = await import("../../src/share/share") - - forceGC() - await sleep(100) - const baseline = takeSnapshot("Baseline") - printSnapshot(baseline) - - const iterations = 1000 - console.log(`\nRunning ${iterations} init/dispose cycles...`) - - for (let i = 0; i < iterations; i++) { - Share.init() - Share.dispose() - if (i % 100 === 0 && i > 0) { - process.stdout.write(` ${i} cycles completed\r`) - } - } - console.log(` ${iterations} cycles completed`) - - forceGC() - await sleep(100) - const afterCycles = takeSnapshot("After cycles") - printSnapshot(afterCycles) - - const result = compareSnapshots(baseline, afterCycles) - - // Check for leaks (should be less than 1MB growth for 1000 cycles) - if (result.heapDiff > 1024 * 1024) { - console.log("\n⚠️ WARNING: Potential memory leak detected!") - return false - } - console.log("\n✅ PASS: Memory stable after subscription cycles") - return true + return Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Share.init() + Share.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + // Check for leaks (should be less than 1MB growth for 1000 cycles) + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) } async function testShareNextSubscriptionCycles() { @@ -119,38 +143,43 @@ async function testShareNextSubscriptionCycles() { console.log("TEST: ShareNext subscription init/dispose cycles") console.log("=".repeat(60)) - const { ShareNext } = await import("../../src/share/share-next") - - forceGC() - await sleep(100) - const baseline = takeSnapshot("Baseline") - printSnapshot(baseline) - - const iterations = 1000 - console.log(`\nRunning ${iterations} init/dispose cycles...`) - - for (let i = 0; i < iterations; i++) { - await ShareNext.init() - ShareNext.dispose() - if (i % 100 === 0 && i > 0) { - process.stdout.write(` ${i} cycles completed\r`) - } - } - console.log(` ${iterations} cycles completed`) - - forceGC() - await sleep(100) - const afterCycles = takeSnapshot("After cycles") - printSnapshot(afterCycles) - - const result = compareSnapshots(baseline, afterCycles) - - if (result.heapDiff > 1024 * 1024) { - console.log("\n⚠️ WARNING: Potential memory leak detected!") - return false - } - console.log("\n✅ PASS: Memory stable after subscription cycles") - return true + return Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + await ShareNext.init() + ShareNext.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) } async function testFormatSubscriptionCycles() { @@ -158,38 +187,43 @@ async function testFormatSubscriptionCycles() { console.log("TEST: Format subscription init/dispose cycles") console.log("=".repeat(60)) - const { Format } = await import("../../src/format") - - forceGC() - await sleep(100) - const baseline = takeSnapshot("Baseline") - printSnapshot(baseline) - - const iterations = 1000 - console.log(`\nRunning ${iterations} init/dispose cycles...`) - - for (let i = 0; i < iterations; i++) { - Format.init() - Format.dispose() - if (i % 100 === 0 && i > 0) { - process.stdout.write(` ${i} cycles completed\r`) - } - } - console.log(` ${iterations} cycles completed`) - - forceGC() - await sleep(100) - const afterCycles = takeSnapshot("After cycles") - printSnapshot(afterCycles) - - const result = compareSnapshots(baseline, afterCycles) - - if (result.heapDiff > 1024 * 1024) { - console.log("\n⚠️ WARNING: Potential memory leak detected!") - return false - } - console.log("\n✅ PASS: Memory stable after subscription cycles") - return true + return Instance.provide({ + directory: testDir, + fn: async () => { + const { Format } = await import("../../src/format") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Format.init() + Format.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) } async function testACPControllerCleanup() { @@ -199,7 +233,9 @@ async function testACPControllerCleanup() { const { ACP } = await import("../../src/acp/agent") - const mockConnection = {} + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } const mockConfig = { sdk: { event: { subscribe: async () => ({ stream: (async function* () {})() }) }, @@ -263,6 +299,9 @@ async function main() { console.log(`Platform: ${process.platform}`) console.log(`Bun Version: ${Bun.version}`) + // Setup test directory for Instance.provide + await setupTestDir() + const results: boolean[] = [] try { @@ -272,9 +311,13 @@ async function main() { results.push(await testACPControllerCleanup()) } catch (err) { console.error("\nError during profiling:", err) + await cleanupTestDir() process.exit(1) } + // Cleanup test directory + await cleanupTestDir() + console.log("\n" + "=".repeat(60)) console.log("SUMMARY") console.log("=".repeat(60)) diff --git a/packages/opencode/test/memory/subscription-cleanup.test.ts b/packages/opencode/test/memory/subscription-cleanup.test.ts index e5b02b9211d..4dac834bef3 100644 --- a/packages/opencode/test/memory/subscription-cleanup.test.ts +++ b/packages/opencode/test/memory/subscription-cleanup.test.ts @@ -1,4 +1,4 @@ -import { test, expect, describe, beforeAll } from "bun:test" +import { test, expect, describe, beforeAll, afterAll } from "bun:test" import { Bus } from "../../src/bus" import { Session } from "../../src/session" import { MessageV2 } from "../../src/session/message-v2" @@ -23,6 +23,13 @@ beforeAll(async () => { await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) }) +afterAll(async () => { + // Clean up the temp directory to prevent test artifacts from accumulating + if (testDir) { + await fs.rm(testDir, { recursive: true, force: true }).catch(() => {}) + } +}) + describe("subscription cleanup", () => { describe("Share.dispose()", () => { test("should unsubscribe from all events", async () => { From 03a298e5e9b802d09413989e56861aa39cb0958b Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 7 Jan 2026 22:50:08 -0700 Subject: [PATCH 12/15] fix: address race conditions and disposal issues in share modules - Perform Instance.state dispose cleanup inline to prevent state() reinitialization during Instance disposal - Add AbortController to cancel in-flight fetch requests during dispose - Pass abort signal to SDK event.subscribe to properly cancel SSE stream - Update comments to accurately reflect full dispose behavior - Add proper async generator cleanup in test --- packages/opencode/src/acp/agent.ts | 5 ++- packages/opencode/src/share/share-next.ts | 39 +++++++++++++++---- packages/opencode/src/share/share.ts | 38 ++++++++++++++++-- .../opencode/test/memory/acp-cleanup.test.ts | 9 ++++- 4 files changed, 76 insertions(+), 15 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 466b93f914e..90549bc6975 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -87,11 +87,12 @@ export namespace ACP { { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] + // Pass the abort signal to the SDK so it can cancel the underlying HTTP connection this.config.sdk.event - .subscribe({ directory }) + .subscribe({ directory }, { signal: controller.signal }) .then(async (events) => { for await (const event of events.stream) { - // Check if subscription was aborted + // Check if subscription was aborted (belt and suspenders with signal) if (controller.signal.aborted) { log.info("event subscription aborted", { sessionId }) break diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 9cec394aebd..9706ed34655 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -14,17 +14,30 @@ export namespace ShareNext { interface ShareNextState { subscriptions: (() => void)[] - queue: Map }> + queue: Map; abortController: AbortController }> + disposed: boolean } const state = Instance.state( () => ({ subscriptions: [], queue: new Map(), + disposed: false, }), - async () => { - // Delegate to the exported dispose function to keep cleanup logic centralized - dispose() + async (s) => { + // Perform cleanup inline to avoid calling state() during Instance disposal. + // Calling state() here could reinitialize after the Instance has been disposed. + s.disposed = true + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + for (const entry of s.queue.values()) { + clearTimeout(entry.timeout) + entry.abortController.abort() + } + s.queue.clear() + log.info("disposed share-next subscriptions (via Instance)") }, ) @@ -33,7 +46,7 @@ export namespace ShareNext { } export async function init() { - // Clean up any existing subscriptions to prevent duplicates on re-init + // Fully dispose existing state (subscriptions, queue, pending timeouts) to prevent duplicates on re-init dispose() const s = state() s.subscriptions.push( @@ -92,12 +105,14 @@ export namespace ShareNext { export function dispose() { const s = state() + s.disposed = false for (const unsub of s.subscriptions) { unsub() } s.subscriptions.length = 0 for (const entry of s.queue.values()) { clearTimeout(entry.timeout) + entry.abortController.abort() } s.queue.clear() log.info("disposed share-next subscriptions") @@ -112,7 +127,7 @@ export namespace ShareNext { export function _addToQueueForTesting(sessionID: string) { const s = state() const timeout = setTimeout(() => {}, 10000) - s.queue.set(sessionID, { timeout, data: new Map() }) + s.queue.set(sessionID, { timeout, data: new Map(), abortController: new AbortController() }) } export async function create(sessionID: string) { @@ -163,6 +178,8 @@ export namespace ShareNext { async function sync(sessionID: string, data: Data[]) { const s = state() + // Skip if already disposed + if (s.disposed) return const existing = s.queue.get(sessionID) if (existing) { for (const item of data) { @@ -176,9 +193,12 @@ export namespace ShareNext { dataMap.set("id" in item ? (item.id as string) : ulid(), item) } + const abortController = new AbortController() const timeout = setTimeout(async () => { const queued = s.queue.get(sessionID) if (!queued) return + // Check if aborted before starting fetch + if (queued.abortController.signal.aborted) return s.queue.delete(sessionID) const share = await get(sessionID).catch(() => undefined) if (!share) return @@ -192,9 +212,14 @@ export namespace ShareNext { secret: share.secret, data: Array.from(queued.data.values()), }), + signal: queued.abortController.signal, + }).catch((err) => { + // Ignore abort errors during disposal + if (err.name === "AbortError") return + log.error("sync error", { sessionID, error: err }) }) }, 1000) - s.queue.set(sessionID, { timeout, data: dataMap }) + s.queue.set(sessionID, { timeout, data: dataMap, abortController }) } export async function remove(sessionID: string) { diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 7a8011bd08a..c7c3fb98c50 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -12,6 +12,8 @@ export namespace Share { queue: Promise pending: Map subscriptions: (() => void)[] + abortController: AbortController + disposed: boolean } const state = Instance.state( @@ -19,15 +21,29 @@ export namespace Share { queue: Promise.resolve(), pending: new Map(), subscriptions: [], + abortController: new AbortController(), + disposed: false, }), - async () => { - // Delegate to the exported dispose function to keep cleanup logic centralized - dispose() + async (s) => { + // Perform cleanup inline to avoid calling state() during Instance disposal. + // Calling state() here could reinitialize after the Instance has been disposed. + s.disposed = true + s.abortController.abort() + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + s.pending.clear() + s.queue = Promise.resolve() + log.info("disposed share subscriptions (via Instance)") }, ) export async function sync(key: string, content: any) { const s = state() + // Skip if already disposed + if (s.disposed) return + const signal = s.abortController.signal const [root, ...splits] = key.split("/") if (root !== "session") return const [sub, sessionID] = splits @@ -38,6 +54,8 @@ export namespace Share { s.pending.set(key, content) s.queue = s.queue .then(async () => { + // Check if disposed before starting fetch + if (signal.aborted) return const content = s.pending.get(key) if (content === undefined) return s.pending.delete(key) @@ -50,6 +68,7 @@ export namespace Share { key: key, content, }), + signal, }) }) .then((x) => { @@ -60,10 +79,16 @@ export namespace Share { }) } }) + .catch((err) => { + // Ignore abort errors during disposal + if (err.name === "AbortError") return + log.error("sync error", { key, error: err }) + }) } export function init() { - // Clean up any existing subscriptions to prevent duplicates on re-init + // Fully dispose existing share state (subscriptions, pending map, queue, abort controller) + // before re-init to prevent duplicates and orphaned requests dispose() const s = state() s.subscriptions.push( @@ -96,6 +121,11 @@ export namespace Share { export function dispose() { const s = state() + // Abort any in-flight fetch requests + s.abortController.abort() + // Create a new controller for potential re-init + s.abortController = new AbortController() + s.disposed = false for (const unsub of s.subscriptions) { unsub() } diff --git a/packages/opencode/test/memory/acp-cleanup.test.ts b/packages/opencode/test/memory/acp-cleanup.test.ts index 0267d65e90d..3d976c850e9 100644 --- a/packages/opencode/test/memory/acp-cleanup.test.ts +++ b/packages/opencode/test/memory/acp-cleanup.test.ts @@ -94,7 +94,7 @@ describe("ACP.Agent session cleanup", () => { expect(controller3.signal.aborted).toBe(true) }) - test("setupEventSubscriptions replaces existing subscription for same session", () => { + test("setupEventSubscriptions replaces existing subscription for same session", async () => { const mockConnection = { closed: new Promise(() => {}), // Never resolves during test } @@ -153,7 +153,12 @@ describe("ACP.Agent session cleanup", () => { expect(newController).not.toBe(existingController) expect(newController?.signal.aborted).toBe(false) - // Cleanup + // Cleanup - dispose the agent and abort all active generators agent.dispose() + for (const genController of activeGenerators) { + genController.abort() + } + // Give generators time to exit cleanly + await new Promise((r) => setTimeout(r, 50)) }) }) From ecb66e6cc2ddaa0dca89020fc7877564297e0b7c Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 7 Jan 2026 23:02:58 -0700 Subject: [PATCH 13/15] fix: set disposed flag during cleanup before resetting for re-init --- packages/opencode/src/share/share-next.ts | 5 ++++- packages/opencode/src/share/share.ts | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 9706ed34655..ecd731c8960 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -105,7 +105,8 @@ export namespace ShareNext { export function dispose() { const s = state() - s.disposed = false + // Mark as disposed to prevent new sync operations during cleanup + s.disposed = true for (const unsub of s.subscriptions) { unsub() } @@ -115,6 +116,8 @@ export namespace ShareNext { entry.abortController.abort() } s.queue.clear() + // Reset disposed flag to allow operations after re-init via init() + s.disposed = false log.info("disposed share-next subscriptions") } diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index c7c3fb98c50..feaecacedd7 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -121,10 +121,13 @@ export namespace Share { export function dispose() { const s = state() + // Mark as disposed to prevent new sync operations during cleanup + s.disposed = true // Abort any in-flight fetch requests s.abortController.abort() // Create a new controller for potential re-init s.abortController = new AbortController() + // Reset disposed flag to allow operations after re-init via init() s.disposed = false for (const unsub of s.subscriptions) { unsub() From 434ac095053cef4b3f95392ecca52c31b2e6541a Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 7 Jan 2026 23:14:04 -0700 Subject: [PATCH 14/15] fix: address race conditions by moving disposed flag reset to init() and adding abort checks --- packages/opencode/src/acp/agent.ts | 3 ++- packages/opencode/src/share/share-next.ts | 20 ++++++++++++-------- packages/opencode/src/share/share.ts | 6 ++++-- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 90549bc6975..0ef8b2e2dc3 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -92,7 +92,8 @@ export namespace ACP { .subscribe({ directory }, { signal: controller.signal }) .then(async (events) => { for await (const event of events.stream) { - // Check if subscription was aborted (belt and suspenders with signal) + // Belt-and-suspenders abort check: the signal passed to subscribe() should terminate + // the stream, but we also check here for clean loop exit in case of SDK variations if (controller.signal.aborted) { log.info("event subscription aborted", { sessionId }) break diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index ecd731c8960..933db6e1791 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -49,6 +49,8 @@ export namespace ShareNext { // Fully dispose existing state (subscriptions, queue, pending timeouts) to prevent duplicates on re-init dispose() const s = state() + // Reset disposed flag to allow operations after re-init + s.disposed = false s.subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync(evt.properties.info.id, [ @@ -116,8 +118,6 @@ export namespace ShareNext { entry.abortController.abort() } s.queue.clear() - // Reset disposed flag to allow operations after re-init via init() - s.disposed = false log.info("disposed share-next subscriptions") } @@ -129,7 +129,7 @@ export namespace ShareNext { /** @internal Test helper to add items to queue for testing dispose cleanup */ export function _addToQueueForTesting(sessionID: string) { const s = state() - const timeout = setTimeout(() => {}, 10000) + const timeout = setTimeout(() => {}, 100) s.queue.set(sessionID, { timeout, data: new Map(), abortController: new AbortController() }) } @@ -199,12 +199,16 @@ export namespace ShareNext { const abortController = new AbortController() const timeout = setTimeout(async () => { const queued = s.queue.get(sessionID) - if (!queued) return - // Check if aborted before starting fetch - if (queued.abortController.signal.aborted) return + // Check both existence and abort status atomically + if (!queued || queued.abortController.signal.aborted) return + // Store local references before any async operations to avoid race conditions + const queuedData = queued.data + const queuedSignal = queued.abortController.signal s.queue.delete(sessionID) const share = await get(sessionID).catch(() => undefined) if (!share) return + // Re-check abort after async operation + if (queuedSignal.aborted) return await fetch(`${await url()}/api/share/${share.id}/sync`, { method: "POST", @@ -213,9 +217,9 @@ export namespace ShareNext { }, body: JSON.stringify({ secret: share.secret, - data: Array.from(queued.data.values()), + data: Array.from(queuedData.values()), }), - signal: queued.abortController.signal, + signal: queuedSignal, }).catch((err) => { // Ignore abort errors during disposal if (err.name === "AbortError") return diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index feaecacedd7..5fc63d82935 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -58,6 +58,8 @@ export namespace Share { if (signal.aborted) return const content = s.pending.get(key) if (content === undefined) return + // Re-check abort in case disposal occurred after reading from pending + if (signal.aborted) return s.pending.delete(key) return fetch(`${URL}/share_sync`, { @@ -91,6 +93,8 @@ export namespace Share { // before re-init to prevent duplicates and orphaned requests dispose() const s = state() + // Reset disposed flag to allow operations after re-init + s.disposed = false s.subscriptions.push( Bus.subscribe(Session.Event.Updated, async (evt) => { await sync("session/info/" + evt.properties.info.id, evt.properties.info) @@ -127,8 +131,6 @@ export namespace Share { s.abortController.abort() // Create a new controller for potential re-init s.abortController = new AbortController() - // Reset disposed flag to allow operations after re-init via init() - s.disposed = false for (const unsub of s.subscriptions) { unsub() } From 0bcf7bbb1c02fb6be2ed71787283663bb0638c7c Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Thu, 8 Jan 2026 21:14:55 -0700 Subject: [PATCH 15/15] docs: clarify inline cleanup comment per review feedback --- packages/opencode/src/share/share-next.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 933db6e1791..2c8206f66f1 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -25,8 +25,9 @@ export namespace ShareNext { disposed: false, }), async (s) => { - // Perform cleanup inline to avoid calling state() during Instance disposal. - // Calling state() here could reinitialize after the Instance has been disposed. + // Perform cleanup inline using the provided state object. + // We cannot call the exported dispose() function here because it calls state(), + // which could reinitialize after the Instance has been disposed. s.disposed = true for (const unsub of s.subscriptions) { unsub()