-
Notifications
You must be signed in to change notification settings - Fork 7.7k
fix(core): add dispose functions to prevent subscription memory leaks #7914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 4 commits
8142552
3fb83f7
9bcdac9
520ca09
663546a
48a68e6
d1b5d6e
7b271ab
76f0ea6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,21 +10,31 @@ import type * as SDK from "@opencode-ai/sdk/v2" | |||||||||||||||
|
|
||||||||||||||||
| export namespace ShareNext { | ||||||||||||||||
| const log = Log.create({ service: "share-next" }) | ||||||||||||||||
| let disposed = false | ||||||||||||||||
|
|
||||||||||||||||
| // Store unsubscribe functions for cleanup | ||||||||||||||||
| const unsubscribers: Array<() => void> = [] | ||||||||||||||||
|
|
||||||||||||||||
| async function url() { | ||||||||||||||||
| return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| export async function init() { | ||||||||||||||||
| Bus.subscribe(Session.Event.Updated, async (evt) => { | ||||||||||||||||
| // Clean up any existing subscriptions before adding new ones | ||||||||||||||||
| dispose() | ||||||||||||||||
| disposed = false | ||||||||||||||||
|
|
||||||||||||||||
| const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.info.id, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "session", | ||||||||||||||||
| data: evt.properties.info, | ||||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| }) | ||||||||||||||||
| Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||||||||||||||||
| const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.info.sessionID, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "message", | ||||||||||||||||
|
|
@@ -44,22 +54,43 @@ export namespace ShareNext { | |||||||||||||||
| ]) | ||||||||||||||||
| } | ||||||||||||||||
| }) | ||||||||||||||||
| Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||||||||||||||||
| const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.part.sessionID, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "part", | ||||||||||||||||
| data: evt.properties.part, | ||||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| }) | ||||||||||||||||
| Bus.subscribe(Session.Event.Diff, async (evt) => { | ||||||||||||||||
| const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.sessionID, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "session_diff", | ||||||||||||||||
| data: evt.properties.diff, | ||||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| }) | ||||||||||||||||
| unsubscribers.push(unsub1, unsub2, unsub3, unsub4) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| export function dispose() { | ||||||||||||||||
| disposed = true | ||||||||||||||||
| const toUnsubscribe = unsubscribers.splice(0) | ||||||||||||||||
| for (const unsub of toUnsubscribe) { | ||||||||||||||||
| try { | ||||||||||||||||
| unsub() | ||||||||||||||||
| } catch (error) { | ||||||||||||||||
| log.error("failed to unsubscribe", { error }) | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| // Clear pending timeouts | ||||||||||||||||
| for (const entry of queue.values()) { | ||||||||||||||||
| clearTimeout(entry.timeout) | ||||||||||||||||
| } | ||||||||||||||||
| queue.clear() | ||||||||||||||||
| log.info("disposed share-next subscriptions") | ||||||||||||||||
| } | ||||||||||||||||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
|
|
||||||||||||||||
| export async function create(sessionID: string) { | ||||||||||||||||
|
|
@@ -191,4 +222,17 @@ export namespace ShareNext { | |||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /** @internal Test helper to get queue size */ | ||||||||||||||||
| export function _getQueueSize(): number { | ||||||||||||||||
| return queue.size | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /** @internal Test helper to add items to queue for testing dispose cleanup */ | ||||||||||||||||
| export function _addToQueueForTesting(sessionID: string) { | ||||||||||||||||
| const dataMap = new Map<string, Data>() | ||||||||||||||||
| // Use short timeout for tests - this is a no-op callback that won't cause issues if it fires | ||||||||||||||||
| const timeout = setTimeout(() => {}, 100) | ||||||||||||||||
| queue.set(sessionID, { timeout, data: dataMap }) | ||||||||||||||||
|
Comment on lines
+253
to
+255
|
||||||||||||||||
| // Use short timeout for tests - this is a no-op callback that won't cause issues if it fires | |
| const timeout = setTimeout(() => {}, 100) | |
| queue.set(sessionID, { timeout, data: dataMap }) | |
| // Use timeout handle for testing dispose cleanup, but clear it immediately so it never fires | |
| const timeout = setTimeout(() => {}, 100) | |
| queue.set(sessionID, { timeout, data: dataMap }) | |
| clearTimeout(timeout) |
hendem marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,21 +9,33 @@ export namespace Share { | |
|
|
||
| let queue: Promise<void> = Promise.resolve() | ||
| const pending = new Map<string, any>() | ||
| let disposed = false | ||
|
|
||
| // Store unsubscribe functions for cleanup | ||
| const unsubscribers: Array<() => void> = [] | ||
|
|
||
| export async function sync(key: string, content: any) { | ||
| // Skip if disposed - check at entry point | ||
| if (disposed) return | ||
| const [root, ...splits] = key.split("/") | ||
| if (root !== "session") return | ||
| const [sub, sessionID] = splits | ||
| if (sub === "share") return | ||
| const share = await Session.getShare(sessionID).catch(() => {}) | ||
| if (!share) return | ||
| // Re-check disposed after async operation | ||
| if (disposed) return | ||
| const { secret } = share | ||
| pending.set(key, content) | ||
| queue = queue | ||
| .then(async () => { | ||
| // Check disposed at start of queued operation | ||
| if (disposed) return | ||
| const content = pending.get(key) | ||
| if (content === undefined) return | ||
| pending.delete(key) | ||
| // Final check before network request | ||
| if (disposed) return | ||
hendem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| return fetch(`${URL}/share_sync`, { | ||
| method: "POST", | ||
|
|
@@ -46,13 +58,17 @@ export namespace Share { | |
| } | ||
|
|
||
| export function init() { | ||
| Bus.subscribe(Session.Event.Updated, async (evt) => { | ||
| // Clean up any existing subscriptions before adding new ones | ||
| dispose() | ||
| disposed = false | ||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+63
to
+65
|
||
|
|
||
| const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { | ||
| await sync("session/info/" + evt.properties.info.id, evt.properties.info) | ||
| }) | ||
| Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||
| const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||
| await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) | ||
| }) | ||
| Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||
| const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||
| await sync( | ||
| "session/part/" + | ||
| evt.properties.part.sessionID + | ||
|
|
@@ -63,6 +79,22 @@ export namespace Share { | |
| evt.properties.part, | ||
| ) | ||
| }) | ||
| unsubscribers.push(unsub1, unsub2, unsub3) | ||
| } | ||
|
|
||
| export function dispose() { | ||
| disposed = true | ||
| const toUnsubscribe = unsubscribers.splice(0) | ||
|
||
| for (const unsub of toUnsubscribe) { | ||
| try { | ||
| unsub() | ||
| } catch (error) { | ||
| log.error("failed to unsubscribe", { error }) | ||
| } | ||
| } | ||
| pending.clear() | ||
| queue = Promise.resolve() | ||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
hendem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| log.info("disposed share subscriptions") | ||
| } | ||
|
|
||
| export const URL = | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.