diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 6d8a64b7d02..0ef8b2e2dc3 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -47,287 +47,324 @@ export namespace ACP { private config: ACPConfig private sdk: OpencodeClient private sessionManager + private sessionAbortControllers = new Map() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) + + // Cleanup all subscriptions when connection closes + this.connection.closed.then(() => { + log.info("connection closed, disposing agent") + this.dispose() + }) + } + + private cleanupSession(sessionId: string) { + const controller = this.sessionAbortControllers.get(sessionId) + if (controller) { + controller.abort() + this.sessionAbortControllers.delete(sessionId) + log.info("cleaned up event subscription", { sessionId }) + } } private setupEventSubscriptions(session: ACPSessionState) { const sessionId = session.id const directory = session.cwd + // Cleanup any existing subscription for this session + this.cleanupSession(sessionId) + + // Create abort controller for this session's event subscription + const controller = new AbortController() + this.sessionAbortControllers.set(sessionId, controller) + const options: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { - for await (const event of events.stream) { - switch (event.type) { - case "permission.asked": - try { - const permission = event.properties - const res = await this.connection - .requestPermission({ - sessionId, - toolCall: { - toolCallId: permission.tool?.callID ?? permission.id, - status: "pending", - title: permission.permission, - rawInput: permission.metadata, - kind: toToolKind(permission.permission), - locations: toLocations(permission.permission, permission.metadata), - }, - options, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, + // Pass the abort signal to the SDK so it can cancel the underlying HTTP connection + this.config.sdk.event + .subscribe({ directory }, { signal: controller.signal }) + .then(async (events) => { + for await (const event of events.stream) { + // Belt-and-suspenders abort check: the signal passed to subscribe() should terminate + // the stream, but we also check here for clean loop exit in case of SDK variations + if (controller.signal.aborted) { + log.info("event subscription aborted", { sessionId }) + break + } + switch (event.type) { + case "permission.asked": + try { + const permission = event.properties + const res = await this.connection + .requestPermission({ + sessionId, + toolCall: { + toolCallId: permission.tool?.callID ?? permission.id, + status: "pending", + title: permission.permission, + rawInput: permission.metadata, + kind: toToolKind(permission.permission), + locations: toLocations(permission.permission, permission.metadata), + }, + options, + }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + await this.config.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }) + return }) + if (!res) return + if (res.outcome.outcome !== "selected") { await this.config.sdk.permission.reply({ requestID: permission.id, reply: "reject", directory, }) return - }) - if (!res) return - if (res.outcome.outcome !== "selected") { + } await this.config.sdk.permission.reply({ requestID: permission.id, - reply: "reject", + reply: res.outcome.optionId as "once" | "always" | "reject", directory, }) - return + } catch (err) { + log.error("unexpected error when handling permission", { error: err }) + } finally { + break } - await this.config.sdk.permission.reply({ - requestID: permission.id, - reply: res.outcome.optionId as "once" | "always" | "reject", - directory, - }) - } catch (err) { - log.error("unexpected error when handling permission", { error: err }) - } finally { - break - } - case "message.part.updated": - log.info("message part updated", { event: event.properties }) - try { - const props = event.properties - const { part } = props - - const message = await this.config.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined - }) + case "message.part.updated": + log.info("message part updated", { event: event.properties }) + try { + const props = event.properties + const { part } = props - if (!message || message.info.role !== "assistant") return + const message = await this.config.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((err) => { + log.error("unexpected error when fetching message", { error: err }) + return undefined + }) - if (part.type === "tool") { - switch (part.state.status) { - case "pending": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((err) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - break - case "running": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - break - case "completed": - const kind = toToolKind(part.tool) - const content: ToolCallContent[] = [ - { - type: "content", - content: { - type: "text", - text: part.state.output, + if (!message || message.info.role !== "assistant") return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, + }, + }) + .catch((err) => { + log.error("failed to send tool pending to ACP", { error: err }) + }) + break + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((err) => { + log.error("failed to send tool in_progress to ACP", { error: err }) + }) + break + case "completed": + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, + }, }, - }, - ] - - if (kind === "edit") { - const input = part.state.input - const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" - const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" - const newText = - typeof input["newString"] === "string" - ? input["newString"] - : typeof input["content"] === "string" - ? input["content"] - : "" - content.push({ - type: "diff", - path: filePath, - oldText, - newText, - }) - } - - if (part.tool === "todowrite") { - const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) - if (parsedTodos.success) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.data.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) - } else { - log.error("failed to parse todo output", { error: parsedTodos.error }) + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, + }) } - } + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), + }, + }) + .catch((err) => { + log.error("failed to send session update for todo", { error: err }) + }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) + } + } + + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool completed to ACP", { error: err }) + }) + break + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, + }, + }, + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool error to ACP", { error: err }) + }) + break + } + } else if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawInput: part.state.input, - rawOutput: { - output: part.state.output, - metadata: part.state.metadata, + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) + log.error("failed to send text to ACP", { error: err }) }) - break - case "error": + } + } else if (part.type === "reasoning") { + const delta = props.delta + if (delta) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, - }, - ], - rawOutput: { - error: part.state.error, + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, }, }, }) .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) + log.error("failed to send reasoning to ACP", { error: err }) }) - break - } - } else if (part.type === "text") { - const delta = props.delta - if (delta && part.synthetic !== true) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) - }) - } - } else if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text: delta, - }, - }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) + } } + } finally { + break } - } finally { - break - } + } } - } - }) + }) + .catch((err) => { + log.error("error in event subscription", { sessionId, error: err }) + }) + .finally(() => { + // Cleanup controller reference when stream ends, but only if it's the same controller + // to prevent race condition with re-subscriptions + const current = this.sessionAbortControllers.get(sessionId) + if (current === controller) { + this.sessionAbortControllers.delete(sessionId) + } + }) } async initialize(params: InitializeRequest): Promise { @@ -497,8 +534,6 @@ export namespace ACP { sessionUpdate: "tool_call_update", toolCallId: part.callID, status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, }, @@ -574,7 +609,6 @@ export namespace ACP { kind, content, title: part.state.title, - rawInput: part.state.input, rawOutput: { output: part.state.output, metadata: part.state.metadata, @@ -593,9 +627,6 @@ export namespace ACP { sessionUpdate: "tool_call_update", toolCallId: part.callID, status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, content: [ { type: "content", @@ -934,14 +965,25 @@ export namespace ACP { async cancel(params: CancelNotification) { const session = this.sessionManager.get(params.sessionId) + // Cleanup event subscription for this session + this.cleanupSession(params.sessionId) + // Always call SDK abort - directory is optional, session.cwd provides optimization hint await this.config.sdk.session.abort( { sessionID: params.sessionId, - directory: session.cwd, + directory: session?.cwd, }, { throwOnError: true }, ) } + + dispose() { + // Cleanup all session event subscriptions + for (const sessionId of this.sessionAbortControllers.keys()) { + this.cleanupSession(sessionId) + } + log.info("disposed all event subscriptions") + } } function toToolKind(toolName: string): ToolKind { diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..9d0b71410e8 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -102,4 +102,19 @@ export namespace Bus { match.splice(index, 1) } } + + /** @internal For testing purposes only - returns subscription count for a given event type */ + export function _getSubscriptionCount(type: string): number { + const subs = state().subscriptions.get(type) + return subs?.length ?? 0 + } + + /** @internal For testing purposes only - returns total subscription count across all event types */ + export function _getTotalSubscriptionCount(): number { + let total = 0 + for (const subs of state().subscriptions.values()) { + total += subs.length + } + return total + } } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index bab758030b9..741696140b8 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -63,6 +63,15 @@ export namespace Format { } }) + // Separate state for subscriptions with dispose callback + const subscriptionState = Instance.state<(() => void)[]>( + () => [], + async () => { + // Delegate to the exported dispose function to keep cleanup logic centralized + dispose() + }, + ) + async function isEnabled(item: Formatter.Info) { const s = await state() let status = s.enabled[item.name] @@ -102,36 +111,50 @@ export namespace Format { export function init() { log.info("init") - Bus.subscribe(File.Event.Edited, async (payload) => { - const file = payload.properties.file - log.info("formatting", { file }) - const ext = path.extname(file) - - for (const item of await getFormatter(ext)) { - log.info("running", { command: item.command }) - try { - const proc = Bun.spawn({ - cmd: item.command.map((x) => x.replace("$FILE", file)), - cwd: Instance.directory, - env: { ...process.env, ...item.environment }, - stdout: "ignore", - stderr: "ignore", - }) - const exit = await proc.exited - if (exit !== 0) - log.error("failed", { + // Clean up any existing subscriptions to prevent duplicates on re-init + dispose() + const subscriptions = subscriptionState() + subscriptions.push( + Bus.subscribe(File.Event.Edited, async (payload) => { + const file = payload.properties.file + log.info("formatting", { file }) + const ext = path.extname(file) + + for (const item of await getFormatter(ext)) { + log.info("running", { command: item.command }) + try { + const proc = Bun.spawn({ + cmd: item.command.map((x) => x.replace("$FILE", file)), + cwd: Instance.directory, + env: { ...process.env, ...item.environment }, + stdout: "ignore", + stderr: "ignore", + }) + const exit = await proc.exited + if (exit !== 0) + log.error("failed", { + command: item.command, + ...item.environment, + }) + } catch (error) { + log.error("failed to format file", { + error, command: item.command, ...item.environment, + file, }) - } catch (error) { - log.error("failed to format file", { - error, - command: item.command, - ...item.environment, - file, - }) + } } - } - }) + }), + ) + } + + export function dispose() { + const subscriptions = subscriptionState() + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 + log.info("disposed format subscriptions") } } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 4912b8f74ba..6ab2a0814d3 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -81,6 +81,15 @@ export namespace Plugin { } }) + // Separate state for subscriptions with dispose callback + const subscriptionState = Instance.state<(() => void)[]>( + () => [], + async () => { + // Delegate to the exported dispose function to keep cleanup logic centralized + dispose() + }, + ) + export async function trigger< Name extends Exclude, "auth" | "event" | "tool">, Input = Parameters[Name]>[0], @@ -103,19 +112,33 @@ export namespace Plugin { } export async function init() { + // Clean up any existing subscriptions to prevent duplicates on re-init + dispose() + const subscriptions = subscriptionState() const hooks = await state().then((x) => x.hooks) const config = await Config.get() for (const hook of hooks) { // @ts-expect-error this is because we haven't moved plugin to sdk v2 await hook.config?.(config) } - Bus.subscribeAll(async (input) => { - const hooks = await state().then((x) => x.hooks) - for (const hook of hooks) { - hook["event"]?.({ - event: input, - }) - } - }) + subscriptions.push( + Bus.subscribeAll(async (input) => { + const hooks = await state().then((x) => x.hooks) + for (const hook of hooks) { + hook["event"]?.({ + event: input, + }) + } + }), + ) + } + + export function dispose() { + const subscriptions = subscriptionState() + for (const unsub of subscriptions) { + unsub() + } + subscriptions.length = 0 + log.info("disposed plugin subscriptions") } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 95271f8c827..2c8206f66f1 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -2,6 +2,7 @@ import { Bus } from "@/bus" import { Config } from "@/config/config" import { ulid } from "ulid" import { Provider } from "@/provider/provider" +import { Instance } from "@/project/instance" import { Session } from "@/session" import { MessageV2 } from "@/session/message-v2" import { Storage } from "@/storage/storage" @@ -11,55 +12,126 @@ import type * as SDK from "@opencode-ai/sdk/v2" export namespace ShareNext { const log = Log.create({ service: "share-next" }) + interface ShareNextState { + subscriptions: (() => void)[] + queue: Map; abortController: AbortController }> + disposed: boolean + } + + const state = Instance.state( + () => ({ + subscriptions: [], + queue: new Map(), + disposed: false, + }), + async (s) => { + // Perform cleanup inline using the provided state object. + // We cannot call the exported dispose() function here because it calls state(), + // which could reinitialize after the Instance has been disposed. + s.disposed = true + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + for (const entry of s.queue.values()) { + clearTimeout(entry.timeout) + entry.abortController.abort() + } + s.queue.clear() + log.info("disposed share-next subscriptions (via Instance)") + }, + ) + async function url() { return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") } export async function init() { - Bus.subscribe(Session.Event.Updated, async (evt) => { - await sync(evt.properties.info.id, [ - { - type: "session", - data: evt.properties.info, - }, - ]) - }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - await sync(evt.properties.info.sessionID, [ - { - type: "message", - data: evt.properties.info, - }, - ]) - if (evt.properties.info.role === "user") { + // Fully dispose existing state (subscriptions, queue, pending timeouts) to prevent duplicates on re-init + dispose() + const s = state() + // Reset disposed flag to allow operations after re-init + s.disposed = false + s.subscriptions.push( + Bus.subscribe(Session.Event.Updated, async (evt) => { + await sync(evt.properties.info.id, [ + { + type: "session", + data: evt.properties.info, + }, + ]) + }), + ) + s.subscriptions.push( + Bus.subscribe(MessageV2.Event.Updated, async (evt) => { await sync(evt.properties.info.sessionID, [ { - type: "model", - data: [ - await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then( - (m) => m, - ), - ], + type: "message", + data: evt.properties.info, }, ]) - } - }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - await sync(evt.properties.part.sessionID, [ - { - type: "part", - data: evt.properties.part, - }, - ]) - }) - Bus.subscribe(Session.Event.Diff, async (evt) => { - await sync(evt.properties.sessionID, [ - { - type: "session_diff", - data: evt.properties.diff, - }, - ]) - }) + if (evt.properties.info.role === "user") { + await sync(evt.properties.info.sessionID, [ + { + type: "model", + data: [ + await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then( + (m) => m, + ), + ], + }, + ]) + } + }), + ) + s.subscriptions.push( + Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + await sync(evt.properties.part.sessionID, [ + { + type: "part", + data: evt.properties.part, + }, + ]) + }), + ) + s.subscriptions.push( + Bus.subscribe(Session.Event.Diff, async (evt) => { + await sync(evt.properties.sessionID, [ + { + type: "session_diff", + data: evt.properties.diff, + }, + ]) + }), + ) + } + + export function dispose() { + const s = state() + // Mark as disposed to prevent new sync operations during cleanup + s.disposed = true + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + for (const entry of s.queue.values()) { + clearTimeout(entry.timeout) + entry.abortController.abort() + } + s.queue.clear() + log.info("disposed share-next subscriptions") + } + + /** @internal Test helper to get queue size */ + export function _getQueueSize() { + return state().queue.size + } + + /** @internal Test helper to add items to queue for testing dispose cleanup */ + export function _addToQueueForTesting(sessionID: string) { + const s = state() + const timeout = setTimeout(() => {}, 100) + s.queue.set(sessionID, { timeout, data: new Map(), abortController: new AbortController() }) } export async function create(sessionID: string) { @@ -108,9 +180,11 @@ export namespace ShareNext { data: SDK.Model[] } - const queue = new Map }>() async function sync(sessionID: string, data: Data[]) { - const existing = queue.get(sessionID) + const s = state() + // Skip if already disposed + if (s.disposed) return + const existing = s.queue.get(sessionID) if (existing) { for (const item of data) { existing.data.set("id" in item ? (item.id as string) : ulid(), item) @@ -123,12 +197,19 @@ export namespace ShareNext { dataMap.set("id" in item ? (item.id as string) : ulid(), item) } + const abortController = new AbortController() const timeout = setTimeout(async () => { - const queued = queue.get(sessionID) - if (!queued) return - queue.delete(sessionID) + const queued = s.queue.get(sessionID) + // Check both existence and abort status atomically + if (!queued || queued.abortController.signal.aborted) return + // Store local references before any async operations to avoid race conditions + const queuedData = queued.data + const queuedSignal = queued.abortController.signal + s.queue.delete(sessionID) const share = await get(sessionID).catch(() => undefined) if (!share) return + // Re-check abort after async operation + if (queuedSignal.aborted) return await fetch(`${await url()}/api/share/${share.id}/sync`, { method: "POST", @@ -137,11 +218,16 @@ export namespace ShareNext { }, body: JSON.stringify({ secret: share.secret, - data: Array.from(queued.data.values()), + data: Array.from(queuedData.values()), }), + signal: queuedSignal, + }).catch((err) => { + // Ignore abort errors during disposal + if (err.name === "AbortError") return + log.error("sync error", { sessionID, error: err }) }) }, 1000) - queue.set(sessionID, { timeout, data: dataMap }) + s.queue.set(sessionID, { timeout, data: dataMap, abortController }) } export async function remove(sessionID: string) { diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index 1006b23d556..5fc63d82935 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -1,5 +1,6 @@ import { Bus } from "../bus" import { Installation } from "../installation" +import { Instance } from "../project/instance" import { Session } from "../session" import { MessageV2 } from "../session/message-v2" import { Log } from "../util/log" @@ -7,10 +8,42 @@ import { Log } from "../util/log" export namespace Share { const log = Log.create({ service: "share" }) - let queue: Promise = Promise.resolve() - const pending = new Map() + interface ShareState { + queue: Promise + pending: Map + subscriptions: (() => void)[] + abortController: AbortController + disposed: boolean + } + + const state = Instance.state( + () => ({ + queue: Promise.resolve(), + pending: new Map(), + subscriptions: [], + abortController: new AbortController(), + disposed: false, + }), + async (s) => { + // Perform cleanup inline to avoid calling state() during Instance disposal. + // Calling state() here could reinitialize after the Instance has been disposed. + s.disposed = true + s.abortController.abort() + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + s.pending.clear() + s.queue = Promise.resolve() + log.info("disposed share subscriptions (via Instance)") + }, + ) export async function sync(key: string, content: any) { + const s = state() + // Skip if already disposed + if (s.disposed) return + const signal = s.abortController.signal const [root, ...splits] = key.split("/") if (root !== "session") return const [sub, sessionID] = splits @@ -18,12 +51,16 @@ export namespace Share { const share = await Session.getShare(sessionID).catch(() => {}) if (!share) return const { secret } = share - pending.set(key, content) - queue = queue + s.pending.set(key, content) + s.queue = s.queue .then(async () => { - const content = pending.get(key) + // Check if disposed before starting fetch + if (signal.aborted) return + const content = s.pending.get(key) if (content === undefined) return - pending.delete(key) + // Re-check abort in case disposal occurred after reading from pending + if (signal.aborted) return + s.pending.delete(key) return fetch(`${URL}/share_sync`, { method: "POST", @@ -33,6 +70,7 @@ export namespace Share { key: key, content, }), + signal, }) }) .then((x) => { @@ -43,26 +81,63 @@ export namespace Share { }) } }) + .catch((err) => { + // Ignore abort errors during disposal + if (err.name === "AbortError") return + log.error("sync error", { key, error: err }) + }) } export function init() { - Bus.subscribe(Session.Event.Updated, async (evt) => { - await sync("session/info/" + evt.properties.info.id, evt.properties.info) - }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) - }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - await sync( - "session/part/" + - evt.properties.part.sessionID + - "/" + - evt.properties.part.messageID + - "/" + - evt.properties.part.id, - evt.properties.part, - ) - }) + // Fully dispose existing share state (subscriptions, pending map, queue, abort controller) + // before re-init to prevent duplicates and orphaned requests + dispose() + const s = state() + // Reset disposed flag to allow operations after re-init + s.disposed = false + s.subscriptions.push( + Bus.subscribe(Session.Event.Updated, async (evt) => { + await sync("session/info/" + evt.properties.info.id, evt.properties.info) + }), + ) + s.subscriptions.push( + Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + await sync( + "session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, + evt.properties.info, + ) + }), + ) + s.subscriptions.push( + Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + await sync( + "session/part/" + + evt.properties.part.sessionID + + "/" + + evt.properties.part.messageID + + "/" + + evt.properties.part.id, + evt.properties.part, + ) + }), + ) + } + + export function dispose() { + const s = state() + // Mark as disposed to prevent new sync operations during cleanup + s.disposed = true + // Abort any in-flight fetch requests + s.abortController.abort() + // Create a new controller for potential re-init + s.abortController = new AbortController() + for (const unsub of s.subscriptions) { + unsub() + } + s.subscriptions.length = 0 + s.pending.clear() + s.queue = Promise.resolve() + log.info("disposed share subscriptions") } export const URL = diff --git a/packages/opencode/test/memory/acp-cleanup.test.ts b/packages/opencode/test/memory/acp-cleanup.test.ts new file mode 100644 index 00000000000..3d976c850e9 --- /dev/null +++ b/packages/opencode/test/memory/acp-cleanup.test.ts @@ -0,0 +1,164 @@ +import { test, expect, describe } from "bun:test" +import { ACP } from "../../src/acp/agent" + +/** + * Tests for ACP Agent session cleanup. + * Verifies that session event subscriptions are properly cleaned up. + */ + +describe("ACP.Agent session cleanup", () => { + test("cleanupSession removes abort controller", () => { + // Create a mock connection and config + const mockConnection = { + requestPermission: async () => ({ outcome: { outcome: "selected", optionId: "once" } }), + sessionUpdate: async () => {}, + closed: new Promise(() => {}), // Never resolves during test + } + + const mockConfig = { + sdk: { + event: { + subscribe: async () => ({ + stream: (async function* () { + // Empty stream + })(), + }), + }, + permission: { + reply: async () => {}, + }, + session: { + message: async () => ({ data: null }), + abort: async () => {}, + }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // Access private map for testing + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Simulate adding a controller + const controller = new AbortController() + controllers.set("test-session-1", controller) + + expect(controllers.size).toBe(1) + expect(controller.signal.aborted).toBe(false) + + // Call cleanup + // @ts-expect-error - accessing private for testing + agent.cleanupSession("test-session-1") + + expect(controllers.size).toBe(0) + expect(controller.signal.aborted).toBe(true) + }) + + test("dispose cleans up all session controllers", () => { + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } + const mockConfig = { + sdk: { + event: { subscribe: async () => ({ stream: (async function* () {})() }) }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Add multiple controllers + const controller1 = new AbortController() + const controller2 = new AbortController() + const controller3 = new AbortController() + + controllers.set("session-1", controller1) + controllers.set("session-2", controller2) + controllers.set("session-3", controller3) + + expect(controllers.size).toBe(3) + + // Dispose all + agent.dispose() + + expect(controllers.size).toBe(0) + expect(controller1.signal.aborted).toBe(true) + expect(controller2.signal.aborted).toBe(true) + expect(controller3.signal.aborted).toBe(true) + }) + + test("setupEventSubscriptions replaces existing subscription for same session", async () => { + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } + const subscribeCallCount = { count: 0 } + // Track active generators so we can verify they terminate + const activeGenerators = new Set() + const mockConfig = { + sdk: { + event: { + subscribe: async () => { + subscribeCallCount.count++ + // Create a signal to control this generator's lifecycle + const genController = new AbortController() + activeGenerators.add(genController) + return { + stream: (async function* () { + // Use finite loop with abort check to prevent background runaway + for (let i = 0; i < 100 && !genController.signal.aborted; i++) { + await new Promise((r) => setTimeout(r, 100)) + if (genController.signal.aborted) break + yield { type: "test" } + } + activeGenerators.delete(genController) + })(), + // Expose abort to allow cleanup + abort: () => genController.abort(), + } + }, + }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + + // Manually add an existing controller to simulate an existing subscription + const existingController = new AbortController() + controllers.set("session-1", existingController) + + // Setup event subscriptions for the same session + const mockSession = { id: "session-1", cwd: "/test" } + // @ts-expect-error - accessing private for testing + agent.setupEventSubscriptions(mockSession) + + // The existing controller should be aborted + expect(existingController.signal.aborted).toBe(true) + + // A new controller should exist + expect(controllers.has("session-1")).toBe(true) + const newController = controllers.get("session-1") + expect(newController).not.toBe(existingController) + expect(newController?.signal.aborted).toBe(false) + + // Cleanup - dispose the agent and abort all active generators + agent.dispose() + for (const genController of activeGenerators) { + genController.abort() + } + // Give generators time to exit cleanly + await new Promise((r) => setTimeout(r, 50)) + }) +}) diff --git a/packages/opencode/test/memory/profile.ts b/packages/opencode/test/memory/profile.ts new file mode 100644 index 00000000000..1797a35ad70 --- /dev/null +++ b/packages/opencode/test/memory/profile.ts @@ -0,0 +1,339 @@ +#!/usr/bin/env bun +/** + * Memory Profiling Script for OpenCode + * + * This script simulates subscription lifecycle patterns and monitors memory usage + * to verify that memory leaks have been fixed. + * + * Usage: + * bun run test/memory/profile.ts + * + * For heap snapshots (requires --expose-gc flag): + * bun --expose-gc run test/memory/profile.ts + */ + +import { Log } from "../../src/util/log" +import { Instance } from "../../src/project/instance" +import path from "path" +import os from "os" +import fs from "fs/promises" + +Log.init({ print: false, dev: false, level: "ERROR" }) + +// Create a temp directory for Instance.provide +let testDir: string + +async function setupTestDir() { + testDir = path.join(os.tmpdir(), `opencode-profile-${Date.now()}`) + await fs.mkdir(testDir, { recursive: true }) + await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) +} + +async function cleanupTestDir() { + if (testDir) { + await fs.rm(testDir, { recursive: true, force: true }).catch(() => {}) + } +} + +interface MemorySnapshot { + label: string + heapUsed: number + heapTotal: number + external: number + rss: number + timestamp: number +} + +function takeSnapshot(label: string): MemorySnapshot { + const mem = process.memoryUsage() + return { + label, + heapUsed: mem.heapUsed, + heapTotal: mem.heapTotal, + external: mem.external, + rss: mem.rss, + timestamp: Date.now(), + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B` + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(2)} KB` + return `${(bytes / 1024 / 1024).toFixed(2)} MB` +} + +function printSnapshot(snapshot: MemorySnapshot) { + console.log(`[${snapshot.label}]`) + console.log(` Heap Used: ${formatBytes(snapshot.heapUsed)}`) + console.log(` Heap Total: ${formatBytes(snapshot.heapTotal)}`) + console.log(` RSS: ${formatBytes(snapshot.rss)}`) +} + +function compareSnapshots(before: MemorySnapshot, after: MemorySnapshot) { + const heapDiff = after.heapUsed - before.heapUsed + const rssDiff = after.rss - before.rss + console.log(`\n[Delta: ${before.label} -> ${after.label}]`) + console.log(` Heap Used: ${heapDiff >= 0 ? "+" : ""}${formatBytes(heapDiff)}`) + console.log(` RSS: ${rssDiff >= 0 ? "+" : ""}${formatBytes(rssDiff)}`) + return { heapDiff, rssDiff } +} + +function forceGC() { + if (global.gc) { + global.gc() + console.log(" (GC forced)") + } +} + +async function sleep(ms: number) { + await new Promise((r) => setTimeout(r, ms)) +} + +// ============================================================================ +// Test Scenarios +// ============================================================================ + +async function testShareSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: Share subscription init/dispose cycles") + console.log("=".repeat(60)) + + return Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Share.init() + Share.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + // Check for leaks (should be less than 1MB growth for 1000 cycles) + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) +} + +async function testShareNextSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: ShareNext subscription init/dispose cycles") + console.log("=".repeat(60)) + + return Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + await ShareNext.init() + ShareNext.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) +} + +async function testFormatSubscriptionCycles() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: Format subscription init/dispose cycles") + console.log("=".repeat(60)) + + return Instance.provide({ + directory: testDir, + fn: async () => { + const { Format } = await import("../../src/format") + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nRunning ${iterations} init/dispose cycles...`) + + for (let i = 0; i < iterations; i++) { + Format.init() + Format.dispose() + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} cycles completed\r`) + } + } + console.log(` ${iterations} cycles completed`) + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after subscription cycles") + return true + }, + }) +} + +async function testACPControllerCleanup() { + console.log("\n" + "=".repeat(60)) + console.log("TEST: ACP Agent controller cleanup") + console.log("=".repeat(60)) + + const { ACP } = await import("../../src/acp/agent") + + const mockConnection = { + closed: new Promise(() => {}), // Never resolves during test + } + const mockConfig = { + sdk: { + event: { subscribe: async () => ({ stream: (async function* () {})() }) }, + permission: { reply: async () => {} }, + session: { message: async () => ({ data: null }), abort: async () => {} }, + }, + } + + forceGC() + await sleep(100) + const baseline = takeSnapshot("Baseline") + printSnapshot(baseline) + + const iterations = 1000 + console.log(`\nCreating and cleaning up ${iterations} sessions...`) + + // @ts-expect-error - testing with mocks + const agent = new ACP.Agent(mockConnection, mockConfig) + + for (let i = 0; i < iterations; i++) { + // Simulate session creation with abort controller + // @ts-expect-error - accessing private for testing + const controllers = agent.sessionAbortControllers + controllers.set(`session-${i}`, new AbortController()) + + // Clean it up + // @ts-expect-error - accessing private for testing + agent.cleanupSession(`session-${i}`) + + if (i % 100 === 0 && i > 0) { + process.stdout.write(` ${i} sessions cleaned\r`) + } + } + console.log(` ${iterations} sessions cleaned`) + + agent.dispose() + + forceGC() + await sleep(100) + const afterCycles = takeSnapshot("After cycles") + printSnapshot(afterCycles) + + const result = compareSnapshots(baseline, afterCycles) + + if (result.heapDiff > 1024 * 1024) { + console.log("\n⚠️ WARNING: Potential memory leak detected!") + return false + } + console.log("\n✅ PASS: Memory stable after controller cleanup cycles") + return true +} + +// ============================================================================ +// Main +// ============================================================================ + +async function main() { + console.log("OpenCode Memory Profiling") + console.log("=".repeat(60)) + console.log(`GC Available: ${global.gc ? "Yes" : "No (run with --expose-gc for accurate results)"}`) + console.log(`Platform: ${process.platform}`) + console.log(`Bun Version: ${Bun.version}`) + + // Setup test directory for Instance.provide + await setupTestDir() + + const results: boolean[] = [] + + try { + results.push(await testShareSubscriptionCycles()) + results.push(await testShareNextSubscriptionCycles()) + results.push(await testFormatSubscriptionCycles()) + results.push(await testACPControllerCleanup()) + } catch (err) { + console.error("\nError during profiling:", err) + await cleanupTestDir() + process.exit(1) + } + + // Cleanup test directory + await cleanupTestDir() + + console.log("\n" + "=".repeat(60)) + console.log("SUMMARY") + console.log("=".repeat(60)) + + const passed = results.filter(Boolean).length + const total = results.length + + console.log(`Tests passed: ${passed}/${total}`) + + if (passed === total) { + console.log("\n✅ All memory tests passed!") + process.exit(0) + } else { + console.log("\n❌ Some memory tests failed!") + process.exit(1) + } +} + +main() diff --git a/packages/opencode/test/memory/subscription-cleanup.test.ts b/packages/opencode/test/memory/subscription-cleanup.test.ts new file mode 100644 index 00000000000..4dac834bef3 --- /dev/null +++ b/packages/opencode/test/memory/subscription-cleanup.test.ts @@ -0,0 +1,284 @@ +import { test, expect, describe, beforeAll, afterAll } from "bun:test" +import { Bus } from "../../src/bus" +import { Session } from "../../src/session" +import { MessageV2 } from "../../src/session/message-v2" +import { File } from "../../src/file" +import { Instance } from "../../src/project/instance" +import path from "path" +import os from "os" +import fs from "fs/promises" + +/** + * Tests to verify that subscription cleanup functions work correctly. + * These tests verify that dispose() functions properly unsubscribe from the Bus, + * preventing memory leaks from accumulated event handlers. + */ + +let testDir: string + +beforeAll(async () => { + // Create a temp directory for the test instance + testDir = path.join(os.tmpdir(), `opencode-memory-test-${Date.now()}`) + await fs.mkdir(testDir, { recursive: true }) + await fs.writeFile(path.join(testDir, "opencode.json"), JSON.stringify({})) +}) + +afterAll(async () => { + // Clean up the temp directory to prevent test artifacts from accumulating + if (testDir) { + await fs.rm(testDir, { recursive: true, force: true }).catch(() => {}) + } +}) + +describe("subscription cleanup", () => { + describe("Share.dispose()", () => { + test("should unsubscribe from all events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const beforeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const beforeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const beforePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + Share.init() + + const afterInitSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterInitMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterInitPart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + // Verify subscriptions were added + expect(afterInitSession).toBe(beforeSession + 1) + expect(afterInitMessage).toBe(beforeMessage + 1) + expect(afterInitPart).toBe(beforePart + 1) + + Share.dispose() + + const afterDisposeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterDisposeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterDisposePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + + // Verify subscriptions were removed + expect(afterDisposeSession).toBe(beforeSession) + expect(afterDisposeMessage).toBe(beforeMessage) + expect(afterDisposePart).toBe(beforePart) + }, + }) + }) + + test("multiple init/dispose cycles maintain correct count", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const baseline = Bus._getTotalSubscriptionCount() + + for (let i = 0; i < 10; i++) { + Share.init() + Share.dispose() + } + + const afterCycles = Bus._getTotalSubscriptionCount() + expect(afterCycles).toBe(baseline) + }, + }) + }) + }) + + describe("ShareNext.dispose()", () => { + test("should unsubscribe from all events and clear queue", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + const beforeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const beforeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const beforePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const beforeDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + await ShareNext.init() + + const afterInitSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterInitMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterInitPart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const afterInitDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + expect(afterInitSession).toBe(beforeSession + 1) + expect(afterInitMessage).toBe(beforeMessage + 1) + expect(afterInitPart).toBe(beforePart + 1) + expect(afterInitDiff).toBe(beforeDiff + 1) + + ShareNext.dispose() + + const afterDisposeSession = Bus._getSubscriptionCount(Session.Event.Updated.type) + const afterDisposeMessage = Bus._getSubscriptionCount(MessageV2.Event.Updated.type) + const afterDisposePart = Bus._getSubscriptionCount(MessageV2.Event.PartUpdated.type) + const afterDisposeDiff = Bus._getSubscriptionCount(Session.Event.Diff.type) + + expect(afterDisposeSession).toBe(beforeSession) + expect(afterDisposeMessage).toBe(beforeMessage) + expect(afterDisposePart).toBe(beforePart) + expect(afterDisposeDiff).toBe(beforeDiff) + + // Verify queue is cleared + expect(ShareNext._getQueueSize()).toBe(0) + }, + }) + }) + + test("multiple init calls should not accumulate subscriptions", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + const baseline = Bus._getTotalSubscriptionCount() + + // Call init multiple times without dispose - should not accumulate + // because init() now calls dispose() at the start + await ShareNext.init() + const afterFirstInit = Bus._getTotalSubscriptionCount() + + await ShareNext.init() + const afterSecondInit = Bus._getTotalSubscriptionCount() + + await ShareNext.init() + const afterThirdInit = Bus._getTotalSubscriptionCount() + + // All counts should be the same - no accumulation + expect(afterSecondInit).toBe(afterFirstInit) + expect(afterThirdInit).toBe(afterFirstInit) + + ShareNext.dispose() + expect(Bus._getTotalSubscriptionCount()).toBe(baseline) + }, + }) + }) + + test("dispose should clear pending queue items and their timeouts", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { ShareNext } = await import("../../src/share/share-next") + + await ShareNext.init() + + // Verify queue starts empty + expect(ShareNext._getQueueSize()).toBe(0) + + // Add items to the queue using the test helper + ShareNext._addToQueueForTesting("session-1") + ShareNext._addToQueueForTesting("session-2") + ShareNext._addToQueueForTesting("session-3") + + // Verify items were added + expect(ShareNext._getQueueSize()).toBe(3) + + // dispose() should clear all queue items and their timeouts + ShareNext.dispose() + + // Verify queue is cleared + expect(ShareNext._getQueueSize()).toBe(0) + }, + }) + }) + }) + + describe("Format.dispose()", () => { + test("should unsubscribe from file edit events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Format } = await import("../../src/format") + + const before = Bus._getSubscriptionCount(File.Event.Edited.type) + + Format.init() + + const afterInit = Bus._getSubscriptionCount(File.Event.Edited.type) + expect(afterInit).toBe(before + 1) + + Format.dispose() + + const afterDispose = Bus._getSubscriptionCount(File.Event.Edited.type) + expect(afterDispose).toBe(before) + }, + }) + }) + }) + + describe("Plugin.dispose()", () => { + test("should unsubscribe from wildcard events", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + // Test the dispose pattern directly without Plugin.init() which is slow + // Plugin.init() loads npm packages which can timeout in tests + const before = Bus._getSubscriptionCount("*") + + // Simulate what Plugin.init() does - subscribe to all events + const unsub = Bus.subscribeAll(() => {}) + const afterSubscribe = Bus._getSubscriptionCount("*") + expect(afterSubscribe).toBe(before + 1) + + // Unsubscribe like dispose() would + unsub() + const afterUnsubscribe = Bus._getSubscriptionCount("*") + expect(afterUnsubscribe).toBe(before) + }, + }) + }) + }) +}) + +describe("memory stability", () => { + test("repeated init/dispose cycles should not leak subscriptions", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + const { Share } = await import("../../src/share/share") + + const baseline = Bus._getTotalSubscriptionCount() + const iterations = 100 + + for (let i = 0; i < iterations; i++) { + Share.init() + Share.dispose() + } + + const afterCycles = Bus._getTotalSubscriptionCount() + expect(afterCycles).toBe(baseline) + }, + }) + }) + + test("init without dispose should accumulate subscriptions (verifies test validity)", async () => { + await Instance.provide({ + directory: testDir, + fn: async () => { + // This test verifies that without dispose(), subscriptions DO accumulate + // This proves our dispose() tests are meaningful + const before = Bus._getTotalSubscriptionCount() + + // Manually subscribe without disposing + const unsub1 = Bus.subscribe(Session.Event.Updated, () => {}) + const unsub2 = Bus.subscribe(Session.Event.Updated, () => {}) + const unsub3 = Bus.subscribe(Session.Event.Updated, () => {}) + + const afterSubscribe = Bus._getTotalSubscriptionCount() + expect(afterSubscribe).toBe(before + 3) + + // Clean up manually + unsub1() + unsub2() + unsub3() + + const afterUnsubscribe = Bus._getTotalSubscriptionCount() + expect(afterUnsubscribe).toBe(before) + }, + }) + }) +})