Skip to content
526 changes: 285 additions & 241 deletions packages/opencode/src/acp/agent.ts

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions packages/opencode/src/bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,20 @@ export namespace Bus {
match.splice(index, 1)
}
}

/** @internal Test helper to get subscription count for a specific event type */
export function _getSubscriptionCount(type: string): number {
const match = state().subscriptions.get(type)
return match?.length ?? 0
}

/** @internal Test helper to get total subscription count across all event types */
export function _getTotalSubscriptionCount(): number {
let total = 0
// Use Array.from to snapshot values in case of concurrent modification
for (const subs of Array.from(state().subscriptions.values())) {
total += subs.length
}
return total
}
}
20 changes: 19 additions & 1 deletion packages/opencode/src/format/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ export namespace Format {
return result
}

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export function init() {
log.info("init")
Bus.subscribe(File.Event.Edited, async (payload) => {
// Clean up any existing subscriptions before adding new ones
dispose()
const unsub = Bus.subscribe(File.Event.Edited, async (payload) => {
const file = payload.properties.file
log.info("formatting", { file })
const ext = path.extname(file)
Expand Down Expand Up @@ -133,5 +138,18 @@ export namespace Format {
}
}
})
unsubscribers.push(unsub)
}

export function dispose() {
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe format handler", { error })
}
}
log.info("disposed format subscriptions")
}
}
20 changes: 19 additions & 1 deletion packages/opencode/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,38 @@ export namespace Plugin {
return state().then((x) => x.hooks)
}

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export async function init() {
// Clean up any existing subscriptions before adding new ones
dispose()
const hooks = await state().then((x) => x.hooks)
const config = await Config.get()
for (const hook of hooks) {
// @ts-expect-error this is because we haven't moved plugin to sdk v2
await hook.config?.(config)
}
Bus.subscribeAll(async (input) => {
const unsub = Bus.subscribeAll(async (input) => {
const hooks = await state().then((x) => x.hooks)
for (const hook of hooks) {
hook["event"]?.({
event: input,
})
}
})
unsubscribers.push(unsub)
}

export function dispose() {
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe plugin handler", { error })
}
}
log.info("disposed plugin subscriptions")
}
}
85 changes: 74 additions & 11 deletions packages/opencode/src/share/share-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,44 @@ import type * as SDK from "@opencode-ai/sdk/v2"

export namespace ShareNext {
const log = Log.create({ service: "share-next" })
// Generation counter to invalidate in-flight operations from previous init cycles
let generation = 0
let disposed = false

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

async function url() {
return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai")
}

export async function init() {
Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync(evt.properties.info.id, [
// Clean up any existing subscriptions before adding new ones
dispose()
disposed = false
// Increment generation so in-flight operations from previous cycle are invalidated
const gen = ++generation
Comment on lines 18 to +29
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async init() function calls dispose() synchronously at the start, but then continues with async operations. If multiple init() calls happen concurrently, they could all pass the dispose() gate and create duplicate subscriptions. The same issue exists in Share.ts but is more critical here since init() is async and takes longer to complete.

Copilot uses AI. Check for mistakes.

const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.info.id, [
{
type: "session",
data: evt.properties.info,
},
])
})
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync(evt.properties.info.sessionID, [
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.info.sessionID, [
{
type: "message",
data: evt.properties.info,
},
])
if (gen !== generation) return
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an inconsistency in generation checks. Line 48 checks 'if (gen !== generation) return' after the first sync call, but line 32 checks both 'disposed' and 'gen !== generation' at the start of the subscription handler. For consistency and safety, all early returns should check both conditions or have a clear reason for checking only one.

Suggested change
if (gen !== generation) return
if (disposed || gen !== generation) return

Copilot uses AI. Check for mistakes.
if (evt.properties.info.role === "user") {
await sync(evt.properties.info.sessionID, [
await sync(gen, evt.properties.info.sessionID, [
{
type: "model",
data: [
Expand All @@ -44,22 +59,44 @@ export namespace ShareNext {
])
}
})
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
await sync(evt.properties.part.sessionID, [
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.part.sessionID, [
{
type: "part",
data: evt.properties.part,
},
])
})
Bus.subscribe(Session.Event.Diff, async (evt) => {
await sync(evt.properties.sessionID, [
const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.sessionID, [
{
type: "session_diff",
data: evt.properties.diff,
},
])
})
unsubscribers.push(unsub1, unsub2, unsub3, unsub4)
}

export function dispose() {
disposed = true
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe", { error })
}
}
// Hardened: snapshot and clear atomically to avoid race during iteration
const pending = Array.from(queue.values())
queue.clear()
for (const entry of pending) {
clearTimeout(entry.timeout)
}
Comment on lines +85 to +98
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same concurrency issue exists here as in Share.ts. The unsubscribers.splice(0) operation could race with init() adding new subscribers. Additionally, the queue clearing logic snapshots the queue after clearing it, which means timeouts being added during this operation could be missed.

Suggested change
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe", { error })
}
}
// Hardened: snapshot and clear atomically to avoid race during iteration
const pending = Array.from(queue.values())
queue.clear()
for (const entry of pending) {
clearTimeout(entry.timeout)
}
// Drain the live unsubscribers array so handlers added during disposal
// are also cleaned up before we return.
while (unsubscribers.length > 0) {
const unsub = unsubscribers.pop()
if (!unsub) {
continue
}
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe", { error })
}
}
// Clear all pending timeouts from the live queue and then empty it.
for (const entry of queue.values()) {
clearTimeout(entry.timeout)
}
queue.clear()

Copilot uses AI. Check for mistakes.
log.info("disposed share-next subscriptions")
}

export async function create(sessionID: string) {
Expand Down Expand Up @@ -109,7 +146,10 @@ export namespace ShareNext {
}

const queue = new Map<string, { timeout: NodeJS.Timeout; data: Map<string, Data> }>()
async function sync(sessionID: string, data: Data[]) {
async function sync(gen: number, sessionID: string, data: Data[]) {
// Check generation before any work
if (gen !== generation) return

const existing = queue.get(sessionID)
if (existing) {
for (const item of data) {
Expand All @@ -124,11 +164,15 @@ export namespace ShareNext {
}

const timeout = setTimeout(async () => {
// Check generation before processing queued data
if (gen !== generation) return
const queued = queue.get(sessionID)
if (!queued) return
queue.delete(sessionID)
const share = await get(sessionID).catch(() => undefined)
if (!share) return
// Check generation after async operation
if (gen !== generation) return

await fetch(`${await url()}/api/share/${share.id}/sync`, {
method: "POST",
Expand Down Expand Up @@ -161,17 +205,23 @@ export namespace ShareNext {
}

async function fullSync(sessionID: string) {
// Capture current generation for this sync operation
const gen = generation
log.info("full sync", { sessionID })
const session = await Session.get(sessionID)
if (gen !== generation) return
const diffs = await Session.diff(sessionID)
if (gen !== generation) return
const messages = await Array.fromAsync(MessageV2.stream(sessionID))
if (gen !== generation) return
const models = await Promise.all(
messages
.filter((m) => m.info.role === "user")
.map((m) => (m.info as SDK.UserMessage).model)
.map((m) => Provider.getModel(m.providerID, m.modelID).then((m) => m)),
)
await sync(sessionID, [
if (gen !== generation) return
await sync(gen, sessionID, [
{
type: "session",
data: session,
Expand All @@ -191,4 +241,17 @@ export namespace ShareNext {
},
])
}

/** @internal Test helper to get queue size */
export function _getQueueSize(): number {
return queue.size
}

/** @internal Test helper to add items to queue for testing dispose cleanup */
export function _addToQueueForTesting(sessionID: string) {
const dataMap = new Map<string, Data>()
// Use short timeout for tests - this is a no-op callback that won't cause issues if it fires
const timeout = setTimeout(() => {}, 100)
queue.set(sessionID, { timeout, data: dataMap })
Comment on lines +253 to +255
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test helper function _addToQueueForTesting creates a timeout with an empty callback that fires after 100ms. While the comment says it's a "no-op callback that won't cause issues if it fires," the timeout will still consume resources and fire during test execution. Consider using a much longer timeout (e.g., 10000ms) or clearTimeout immediately after adding to the queue if the goal is just to test disposal without the timeout actually firing.

Suggested change
// Use short timeout for tests - this is a no-op callback that won't cause issues if it fires
const timeout = setTimeout(() => {}, 100)
queue.set(sessionID, { timeout, data: dataMap })
// Use timeout handle for testing dispose cleanup, but clear it immediately so it never fires
const timeout = setTimeout(() => {}, 100)
queue.set(sessionID, { timeout, data: dataMap })
clearTimeout(timeout)

Copilot uses AI. Check for mistakes.
}
}
53 changes: 47 additions & 6 deletions packages/opencode/src/share/share.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,35 @@ export namespace Share {

let queue: Promise<void> = Promise.resolve()
const pending = new Map<string, any>()
// Generation counter to invalidate in-flight operations from previous init cycles
let generation = 0
let disposed = false

export async function sync(key: string, content: any) {
// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export async function sync(gen: number, key: string, content: any) {
// Skip if disposed or wrong generation
if (disposed || gen !== generation) return
const [root, ...splits] = key.split("/")
if (root !== "session") return
const [sub, sessionID] = splits
if (sub === "share") return
const share = await Session.getShare(sessionID).catch(() => {})
if (!share) return
// Re-check after async operation
if (disposed || gen !== generation) return
const { secret } = share
pending.set(key, content)
queue = queue
.then(async () => {
// Check at start of queued operation
if (disposed || gen !== generation) return
const content = pending.get(key)
if (content === undefined) return
pending.delete(key)
// Final check before network request
if (disposed || gen !== generation) return

return fetch(`${URL}/share_sync`, {
method: "POST",
Expand All @@ -46,14 +60,25 @@ export namespace Share {
}

export function init() {
Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync("session/info/" + evt.properties.info.id, evt.properties.info)
// Clean up any existing subscriptions before adding new ones
dispose()
disposed = false
Comment on lines +63 to +65
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init() function calls dispose() at the start (line 62) which sets disposed = false on line 63. However, if there are any in-flight async operations from a previous init() that are checking the disposed flag, they might continue executing because disposed is set back to false. This creates a potential issue where operations from a previous initialization cycle could interfere with the new initialization. Consider using a unique token or generation counter to ensure operations from different initialization cycles don't interfere with each other.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion for a more robust solution. Current approach prevents the primary issue (subscription accumulation). Generation counter could be added as future enhancement if race conditions prove problematic in practice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion for a more robust solution. Current approach prevents the primary issue (subscription accumulation). Generation counter could be added as future enhancement if race conditions prove problematic in practice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented generation counter in both share.ts and share-next.ts. Each init() increments the generation and all async callbacks capture and check it after every await. This ensures operations from a previous initialization cycle are properly invalidated even if they were in-flight during re-init.

// Increment generation so in-flight operations from previous cycle are invalidated
const gen = ++generation
Comment on lines +63 to +67
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init() function calls dispose() at the start but then immediately sets disposed=false and increments generation before creating new subscriptions. However, there's no protection against concurrent init() calls. If init() is called twice simultaneously, both calls will increment generation and create duplicate subscriptions that won't all be tracked for cleanup. Consider adding a guard to prevent concurrent initialization.

Copilot uses AI. Check for mistakes.

const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync(gen, "session/info/" + evt.properties.info.id, evt.properties.info)
})
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info)
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync(
gen,
"session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id,
evt.properties.info,
)
})
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
await sync(
gen,
"session/part/" +
evt.properties.part.sessionID +
"/" +
Expand All @@ -63,6 +88,22 @@ export namespace Share {
evt.properties.part,
)
})
unsubscribers.push(unsub1, unsub2, unsub3)
}

export function dispose() {
disposed = true
const toUnsubscribe = unsubscribers.splice(0)
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unsubscribers array is shared across multiple init() calls, but the splice(0) operation in dispose() may not be thread-safe if init() is called concurrently during dispose(). While JavaScript is single-threaded, async operations could interleave. Consider saving the reference before splicing to avoid potential race conditions where new subscribers are added during disposal.

Copilot uses AI. Check for mistakes.
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe", { error })
}
}
pending.clear()
queue = Promise.resolve()
log.info("disposed share subscriptions")
}

export const URL =
Expand Down
Loading
Loading