Skip to content
526 changes: 285 additions & 241 deletions packages/opencode/src/acp/agent.ts

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions packages/opencode/src/bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,19 @@ export namespace Bus {
match.splice(index, 1)
}
}

/** @internal Test helper to get subscription count for a specific event type */
export function _getSubscriptionCount(type: string): number {
const match = state().subscriptions.get(type)
return match?.length ?? 0
}

/** @internal Test helper to get total subscription count across all event types */
export function _getTotalSubscriptionCount(): number {
let total = 0
for (const subs of state().subscriptions.values()) {
total += subs.length
}
return total
}
}
16 changes: 15 additions & 1 deletion packages/opencode/src/format/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ export namespace Format {
return result
}

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export function init() {
log.info("init")
Bus.subscribe(File.Event.Edited, async (payload) => {
// Clean up any existing subscriptions before adding new ones
dispose()
const unsub = Bus.subscribe(File.Event.Edited, async (payload) => {
const file = payload.properties.file
log.info("formatting", { file })
const ext = path.extname(file)
Expand Down Expand Up @@ -133,5 +138,14 @@ export namespace Format {
}
}
})
unsubscribers.push(unsub)
}

export function dispose() {
for (const unsub of unsubscribers) {
unsub()
}
unsubscribers.length = 0
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent array clearing method compared to other dispose implementations. Lines 87-90 in share.ts and lines 80-83 in share-next.ts use splice(0), while this uses length = 0. For consistency across the codebase, consider using the same approach. Additionally, if an unsubscriber function throws an error during the loop on lines 145-147, subsequent unsubscribers won't be called. Consider wrapping each unsub() call in a try-catch to ensure all subscriptions are cleaned up even if some fail.

Suggested change
unsub()
}
unsubscribers.length = 0
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe format handler", { error })
}
}
unsubscribers.splice(0)

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed. Now uses splice(0) for consistency with other modules and wraps each unsubscribe in try-catch for error resilience.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Fixed. Now uses splice(0) for consistency with other modules and wraps each unsubscribe in try-catch for error resilience.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! Updated Format.dispose() to use the same pattern: splice(0) before iteration for safe array clearing, plus try-catch around each unsubscribe call for robustness.

log.info("disposed format subscriptions")
}
}
16 changes: 15 additions & 1 deletion packages/opencode/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,34 @@ export namespace Plugin {
return state().then((x) => x.hooks)
}

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export async function init() {
// Clean up any existing subscriptions before adding new ones
dispose()
const hooks = await state().then((x) => x.hooks)
const config = await Config.get()
for (const hook of hooks) {
// @ts-expect-error this is because we haven't moved plugin to sdk v2
await hook.config?.(config)
}
Bus.subscribeAll(async (input) => {
const unsub = Bus.subscribeAll(async (input) => {
const hooks = await state().then((x) => x.hooks)
for (const hook of hooks) {
hook["event"]?.({
event: input,
})
}
})
unsubscribers.push(unsub)
}

export function dispose() {
for (const unsub of unsubscribers) {
unsub()
}
unsubscribers.length = 0
log.info("disposed plugin subscriptions")
}
}
47 changes: 43 additions & 4 deletions packages/opencode/src/share/share-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,31 @@ import type * as SDK from "@opencode-ai/sdk/v2"

export namespace ShareNext {
const log = Log.create({ service: "share-next" })
let disposed = false

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

async function url() {
return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai")
}

export async function init() {
Bus.subscribe(Session.Event.Updated, async (evt) => {
// Clean up any existing subscriptions before adding new ones
dispose()
disposed = false

const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
if (disposed) return
await sync(evt.properties.info.id, [
{
type: "session",
data: evt.properties.info,
},
])
})
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
if (disposed) return
await sync(evt.properties.info.sessionID, [
{
type: "message",
Expand All @@ -44,22 +54,39 @@ export namespace ShareNext {
])
}
})
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
if (disposed) return
await sync(evt.properties.part.sessionID, [
{
type: "part",
data: evt.properties.part,
},
])
})
Bus.subscribe(Session.Event.Diff, async (evt) => {
const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => {
if (disposed) return
await sync(evt.properties.sessionID, [
{
type: "session_diff",
data: evt.properties.diff,
},
])
})
unsubscribers.push(unsub1, unsub2, unsub3, unsub4)
}

export function dispose() {
disposed = true
for (const unsub of unsubscribers) {
unsub()
}
unsubscribers.length = 0
// Clear pending timeouts
for (const entry of queue.values()) {
clearTimeout(entry.timeout)
}
queue.clear()
log.info("disposed share-next subscriptions")
}

export async function create(sessionID: string) {
Expand Down Expand Up @@ -191,4 +218,16 @@ export namespace ShareNext {
},
])
}

/** @internal Test helper to get queue size */
export function _getQueueSize(): number {
return queue.size
}

/** @internal Test helper to add items to queue for testing dispose cleanup */
export function _addToQueueForTesting(sessionID: string) {
const dataMap = new Map<string, Data>()
const timeout = setTimeout(() => {}, 10000)
queue.set(sessionID, { timeout, data: dataMap })
}
}
30 changes: 27 additions & 3 deletions packages/opencode/src/share/share.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ export namespace Share {

let queue: Promise<void> = Promise.resolve()
const pending = new Map<string, any>()
let disposed = false

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export async function sync(key: string, content: any) {
// Skip if disposed
if (disposed) return
const [root, ...splits] = key.split("/")
if (root !== "session") return
const [sub, sessionID] = splits
Expand All @@ -21,6 +27,8 @@ export namespace Share {
pending.set(key, content)
queue = queue
.then(async () => {
// Check if disposed before processing
if (disposed) return
const content = pending.get(key)
if (content === undefined) return
pending.delete(key)
Expand All @@ -46,13 +54,17 @@ export namespace Share {
}

export function init() {
Bus.subscribe(Session.Event.Updated, async (evt) => {
// Clean up any existing subscriptions before adding new ones
dispose()
disposed = false
Comment on lines +63 to +65
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init() function calls dispose() at the start (line 62) which sets disposed = false on line 63. However, if there are any in-flight async operations from a previous init() that are checking the disposed flag, they might continue executing because disposed is set back to false. This creates a potential issue where operations from a previous initialization cycle could interfere with the new initialization. Consider using a unique token or generation counter to ensure operations from different initialization cycles don't interfere with each other.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion for a more robust solution. Current approach prevents the primary issue (subscription accumulation). Generation counter could be added as future enhancement if race conditions prove problematic in practice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion for a more robust solution. Current approach prevents the primary issue (subscription accumulation). Generation counter could be added as future enhancement if race conditions prove problematic in practice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented generation counter in both share.ts and share-next.ts. Each init() increments the generation and all async callbacks capture and check it after every await. This ensures operations from a previous initialization cycle are properly invalidated even if they were in-flight during re-init.


const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync("session/info/" + evt.properties.info.id, evt.properties.info)
})
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info)
})
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
await sync(
"session/part/" +
evt.properties.part.sessionID +
Expand All @@ -63,6 +75,18 @@ export namespace Share {
evt.properties.part,
)
})
unsubscribers.push(unsub1, unsub2, unsub3)
}

export function dispose() {
disposed = true
for (const unsub of unsubscribers) {
unsub()
}
unsubscribers.length = 0
pending.clear()
queue = Promise.resolve()
log.info("disposed share subscriptions")
}

export const URL =
Expand Down
Loading