Skip to content
526 changes: 285 additions & 241 deletions packages/opencode/src/acp/agent.ts

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions packages/opencode/src/bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,20 @@ export namespace Bus {
match.splice(index, 1)
}
}

/** @internal Test helper to get subscription count for a specific event type */
export function _getSubscriptionCount(type: string): number {
const match = state().subscriptions.get(type)
return match?.length ?? 0
}

/** @internal Test helper to get total subscription count across all event types */
export function _getTotalSubscriptionCount(): number {
let total = 0
// Use Array.from to snapshot values in case of concurrent modification
for (const subs of Array.from(state().subscriptions.values())) {
total += subs.length
}
return total
}
}
20 changes: 19 additions & 1 deletion packages/opencode/src/format/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ export namespace Format {
return result
}

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export function init() {
log.info("init")
Bus.subscribe(File.Event.Edited, async (payload) => {
// Clean up any existing subscriptions before adding new ones
dispose()
const unsub = Bus.subscribe(File.Event.Edited, async (payload) => {
const file = payload.properties.file
log.info("formatting", { file })
const ext = path.extname(file)
Expand Down Expand Up @@ -133,5 +138,18 @@ export namespace Format {
}
}
})
unsubscribers.push(unsub)
}

export function dispose() {
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe format handler", { error })
}
}
log.info("disposed format subscriptions")
}
}
20 changes: 19 additions & 1 deletion packages/opencode/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,38 @@ export namespace Plugin {
return state().then((x) => x.hooks)
}

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export async function init() {
// Clean up any existing subscriptions before adding new ones
dispose()
const hooks = await state().then((x) => x.hooks)
const config = await Config.get()
for (const hook of hooks) {
// @ts-expect-error this is because we haven't moved plugin to sdk v2
await hook.config?.(config)
}
Bus.subscribeAll(async (input) => {
const unsub = Bus.subscribeAll(async (input) => {
const hooks = await state().then((x) => x.hooks)
for (const hook of hooks) {
hook["event"]?.({
event: input,
})
}
})
unsubscribers.push(unsub)
}

export function dispose() {
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe plugin handler", { error })
}
}
log.info("disposed plugin subscriptions")
}
}
85 changes: 74 additions & 11 deletions packages/opencode/src/share/share-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,44 @@ import type * as SDK from "@opencode-ai/sdk/v2"

export namespace ShareNext {
const log = Log.create({ service: "share-next" })
// Generation counter to invalidate in-flight operations from previous init cycles
let generation = 0
let disposed = false

// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

async function url() {
return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai")
}

export async function init() {
Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync(evt.properties.info.id, [
// Clean up any existing subscriptions before adding new ones
dispose()
disposed = false
// Increment generation so in-flight operations from previous cycle are invalidated
const gen = ++generation

const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.info.id, [
{
type: "session",
data: evt.properties.info,
},
])
})
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync(evt.properties.info.sessionID, [
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.info.sessionID, [
{
type: "message",
data: evt.properties.info,
},
])
if (gen !== generation) return
if (evt.properties.info.role === "user") {
await sync(evt.properties.info.sessionID, [
await sync(gen, evt.properties.info.sessionID, [
{
type: "model",
data: [
Expand All @@ -44,22 +59,44 @@ export namespace ShareNext {
])
}
})
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
await sync(evt.properties.part.sessionID, [
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.part.sessionID, [
{
type: "part",
data: evt.properties.part,
},
])
})
Bus.subscribe(Session.Event.Diff, async (evt) => {
await sync(evt.properties.sessionID, [
const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => {
if (disposed || gen !== generation) return
await sync(gen, evt.properties.sessionID, [
{
type: "session_diff",
data: evt.properties.diff,
},
])
})
unsubscribers.push(unsub1, unsub2, unsub3, unsub4)
}

export function dispose() {
disposed = true
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe", { error })
}
}
// Hardened: snapshot and clear atomically to avoid race during iteration
const pending = Array.from(queue.values())
queue.clear()
for (const entry of pending) {
clearTimeout(entry.timeout)
}
log.info("disposed share-next subscriptions")
}

export async function create(sessionID: string) {
Expand Down Expand Up @@ -109,7 +146,10 @@ export namespace ShareNext {
}

const queue = new Map<string, { timeout: NodeJS.Timeout; data: Map<string, Data> }>()
async function sync(sessionID: string, data: Data[]) {
async function sync(gen: number, sessionID: string, data: Data[]) {
// Check generation before any work
if (gen !== generation) return

const existing = queue.get(sessionID)
if (existing) {
for (const item of data) {
Expand All @@ -124,11 +164,15 @@ export namespace ShareNext {
}

const timeout = setTimeout(async () => {
// Check generation before processing queued data
if (gen !== generation) return
const queued = queue.get(sessionID)
if (!queued) return
queue.delete(sessionID)
const share = await get(sessionID).catch(() => undefined)
if (!share) return
// Check generation after async operation
if (gen !== generation) return

await fetch(`${await url()}/api/share/${share.id}/sync`, {
method: "POST",
Expand Down Expand Up @@ -161,17 +205,23 @@ export namespace ShareNext {
}

async function fullSync(sessionID: string) {
// Capture current generation for this sync operation
const gen = generation
log.info("full sync", { sessionID })
const session = await Session.get(sessionID)
if (gen !== generation) return
const diffs = await Session.diff(sessionID)
if (gen !== generation) return
const messages = await Array.fromAsync(MessageV2.stream(sessionID))
if (gen !== generation) return
const models = await Promise.all(
messages
.filter((m) => m.info.role === "user")
.map((m) => (m.info as SDK.UserMessage).model)
.map((m) => Provider.getModel(m.providerID, m.modelID).then((m) => m)),
)
await sync(sessionID, [
if (gen !== generation) return
await sync(gen, sessionID, [
{
type: "session",
data: session,
Expand All @@ -191,4 +241,17 @@ export namespace ShareNext {
},
])
}

/** @internal Test helper to get queue size */
export function _getQueueSize(): number {
return queue.size
}

/** @internal Test helper to add items to queue for testing dispose cleanup */
export function _addToQueueForTesting(sessionID: string) {
const dataMap = new Map<string, Data>()
// Use short timeout for tests - this is a no-op callback that won't cause issues if it fires
const timeout = setTimeout(() => {}, 100)
queue.set(sessionID, { timeout, data: dataMap })
}
}
53 changes: 47 additions & 6 deletions packages/opencode/src/share/share.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,35 @@ export namespace Share {

let queue: Promise<void> = Promise.resolve()
const pending = new Map<string, any>()
// Generation counter to invalidate in-flight operations from previous init cycles
let generation = 0
let disposed = false

export async function sync(key: string, content: any) {
// Store unsubscribe functions for cleanup
const unsubscribers: Array<() => void> = []

export async function sync(gen: number, key: string, content: any) {
// Skip if disposed or wrong generation
if (disposed || gen !== generation) return
const [root, ...splits] = key.split("/")
if (root !== "session") return
const [sub, sessionID] = splits
if (sub === "share") return
const share = await Session.getShare(sessionID).catch(() => {})
if (!share) return
// Re-check after async operation
if (disposed || gen !== generation) return
const { secret } = share
pending.set(key, content)
queue = queue
.then(async () => {
// Check at start of queued operation
if (disposed || gen !== generation) return
const content = pending.get(key)
if (content === undefined) return
pending.delete(key)
// Final check before network request
if (disposed || gen !== generation) return

return fetch(`${URL}/share_sync`, {
method: "POST",
Expand All @@ -46,14 +60,25 @@ export namespace Share {
}

export function init() {
Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync("session/info/" + evt.properties.info.id, evt.properties.info)
// Clean up any existing subscriptions before adding new ones
dispose()
disposed = false
// Increment generation so in-flight operations from previous cycle are invalidated
const gen = ++generation

const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync(gen, "session/info/" + evt.properties.info.id, evt.properties.info)
})
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info)
const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync(
gen,
"session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id,
evt.properties.info,
)
})
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
await sync(
gen,
"session/part/" +
evt.properties.part.sessionID +
"/" +
Expand All @@ -63,6 +88,22 @@ export namespace Share {
evt.properties.part,
)
})
unsubscribers.push(unsub1, unsub2, unsub3)
}

export function dispose() {
disposed = true
const toUnsubscribe = unsubscribers.splice(0)
for (const unsub of toUnsubscribe) {
try {
unsub()
} catch (error) {
log.error("failed to unsubscribe", { error })
}
}
pending.clear()
queue = Promise.resolve()
log.info("disposed share subscriptions")
}

export const URL =
Expand Down
Loading
Loading