-
Notifications
You must be signed in to change notification settings - Fork 6.5k
fix(core): add dispose functions to prevent subscription memory leaks #7914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 6 commits
8142552
3fb83f7
9bcdac9
520ca09
663546a
48a68e6
d1b5d6e
7b271ab
76f0ea6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,21 +10,31 @@ import type * as SDK from "@opencode-ai/sdk/v2" | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export namespace ShareNext { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const log = Log.create({ service: "share-next" }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let disposed = false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Store unsubscribe functions for cleanup | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const unsubscribers: Array<() => void> = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function url() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function init() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Bus.subscribe(Session.Event.Updated, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Clean up any existing subscriptions before adding new ones | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dispose() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| disposed = false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (disposed) return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await sync(evt.properties.info.id, [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type: "session", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: evt.properties.info, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (disposed) return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await sync(evt.properties.info.sessionID, [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type: "message", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -44,22 +54,44 @@ export namespace ShareNext { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (disposed) return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await sync(evt.properties.part.sessionID, [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type: "part", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: evt.properties.part, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Bus.subscribe(Session.Event.Diff, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (disposed) return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await sync(evt.properties.sessionID, [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type: "session_diff", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: evt.properties.diff, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| unsubscribers.push(unsub1, unsub2, unsub3, unsub4) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export function dispose() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| disposed = true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const toUnsubscribe = unsubscribers.splice(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (const unsub of toUnsubscribe) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| unsub() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.error("failed to unsubscribe", { error }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Hardened: snapshot and clear atomically to avoid race during iteration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const pending = Array.from(queue.values()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| queue.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (const entry of pending) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| clearTimeout(entry.timeout) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+85
to
+98
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const toUnsubscribe = unsubscribers.splice(0) | |
| for (const unsub of toUnsubscribe) { | |
| try { | |
| unsub() | |
| } catch (error) { | |
| log.error("failed to unsubscribe", { error }) | |
| } | |
| } | |
| // Hardened: snapshot and clear atomically to avoid race during iteration | |
| const pending = Array.from(queue.values()) | |
| queue.clear() | |
| for (const entry of pending) { | |
| clearTimeout(entry.timeout) | |
| } | |
| // Drain the live unsubscribers array so handlers added during disposal | |
| // are also cleaned up before we return. | |
| while (unsubscribers.length > 0) { | |
| const unsub = unsubscribers.pop() | |
| if (!unsub) { | |
| continue | |
| } | |
| try { | |
| unsub() | |
| } catch (error) { | |
| log.error("failed to unsubscribe", { error }) | |
| } | |
| } | |
| // Clear all pending timeouts from the live queue and then empty it. | |
| for (const entry of queue.values()) { | |
| clearTimeout(entry.timeout) | |
| } | |
| queue.clear() |
Copilot
AI
Jan 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test helper function _addToQueueForTesting creates a timeout with an empty callback that fires after 100ms. While the comment says it's a "no-op callback that won't cause issues if it fires," the timeout will still consume resources and fire during test execution. Consider using a much longer timeout (e.g., 10000ms) or clearTimeout immediately after adding to the queue if the goal is just to test disposal without the timeout actually firing.
| // Use short timeout for tests - this is a no-op callback that won't cause issues if it fires | |
| const timeout = setTimeout(() => {}, 100) | |
| queue.set(sessionID, { timeout, data: dataMap }) | |
| // Use timeout handle for testing dispose cleanup, but clear it immediately so it never fires | |
| const timeout = setTimeout(() => {}, 100) | |
| queue.set(sessionID, { timeout, data: dataMap }) | |
| clearTimeout(timeout) |
hendem marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,21 +9,33 @@ export namespace Share { | |
|
|
||
| let queue: Promise<void> = Promise.resolve() | ||
| const pending = new Map<string, any>() | ||
| let disposed = false | ||
|
|
||
| // Store unsubscribe functions for cleanup | ||
| const unsubscribers: Array<() => void> = [] | ||
|
|
||
| export async function sync(key: string, content: any) { | ||
| // Skip if disposed - check at entry point | ||
| if (disposed) return | ||
| const [root, ...splits] = key.split("/") | ||
| if (root !== "session") return | ||
| const [sub, sessionID] = splits | ||
| if (sub === "share") return | ||
| const share = await Session.getShare(sessionID).catch(() => {}) | ||
| if (!share) return | ||
| // Re-check disposed after async operation | ||
| if (disposed) return | ||
| const { secret } = share | ||
| pending.set(key, content) | ||
| queue = queue | ||
| .then(async () => { | ||
| // Check disposed at start of queued operation | ||
| if (disposed) return | ||
| const content = pending.get(key) | ||
| if (content === undefined) return | ||
| pending.delete(key) | ||
| // Final check before network request | ||
| if (disposed) return | ||
hendem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| return fetch(`${URL}/share_sync`, { | ||
| method: "POST", | ||
|
|
@@ -46,13 +58,17 @@ export namespace Share { | |
| } | ||
|
|
||
| export function init() { | ||
| Bus.subscribe(Session.Event.Updated, async (evt) => { | ||
| // Clean up any existing subscriptions before adding new ones | ||
| dispose() | ||
| disposed = false | ||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+63
to
+65
|
||
|
|
||
| const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { | ||
| await sync("session/info/" + evt.properties.info.id, evt.properties.info) | ||
| }) | ||
| Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||
| const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||
| await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) | ||
| }) | ||
| Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||
| const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||
| await sync( | ||
| "session/part/" + | ||
| evt.properties.part.sessionID + | ||
|
|
@@ -63,6 +79,22 @@ export namespace Share { | |
| evt.properties.part, | ||
| ) | ||
| }) | ||
| unsubscribers.push(unsub1, unsub2, unsub3) | ||
| } | ||
|
|
||
| export function dispose() { | ||
| disposed = true | ||
| const toUnsubscribe = unsubscribers.splice(0) | ||
|
||
| for (const unsub of toUnsubscribe) { | ||
| try { | ||
| unsub() | ||
| } catch (error) { | ||
| log.error("failed to unsubscribe", { error }) | ||
| } | ||
| } | ||
| pending.clear() | ||
| queue = Promise.resolve() | ||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
hendem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| log.info("disposed share subscriptions") | ||
| } | ||
|
|
||
| export const URL = | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.