-
Notifications
You must be signed in to change notification settings - Fork 7.7k
fix(core): add dispose functions to prevent subscription memory leaks #7914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 3 commits
8142552
3fb83f7
9bcdac9
520ca09
663546a
48a68e6
d1b5d6e
7b271ab
76f0ea6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,21 +10,31 @@ import type * as SDK from "@opencode-ai/sdk/v2" | |||||||||||||||
|
|
||||||||||||||||
| export namespace ShareNext { | ||||||||||||||||
| const log = Log.create({ service: "share-next" }) | ||||||||||||||||
| let disposed = false | ||||||||||||||||
|
|
||||||||||||||||
| // Store unsubscribe functions for cleanup | ||||||||||||||||
| const unsubscribers: Array<() => void> = [] | ||||||||||||||||
|
|
||||||||||||||||
| async function url() { | ||||||||||||||||
| return Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| export async function init() { | ||||||||||||||||
| Bus.subscribe(Session.Event.Updated, async (evt) => { | ||||||||||||||||
| // Clean up any existing subscriptions before adding new ones | ||||||||||||||||
| dispose() | ||||||||||||||||
| disposed = false | ||||||||||||||||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
|
|
||||||||||||||||
| const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.info.id, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "session", | ||||||||||||||||
| data: evt.properties.info, | ||||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| }) | ||||||||||||||||
| Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||||||||||||||||
| const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.info.sessionID, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "message", | ||||||||||||||||
|
|
@@ -44,22 +54,39 @@ export namespace ShareNext { | |||||||||||||||
| ]) | ||||||||||||||||
| } | ||||||||||||||||
| }) | ||||||||||||||||
| Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||||||||||||||||
| const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.part.sessionID, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "part", | ||||||||||||||||
| data: evt.properties.part, | ||||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| }) | ||||||||||||||||
| Bus.subscribe(Session.Event.Diff, async (evt) => { | ||||||||||||||||
| const unsub4 = Bus.subscribe(Session.Event.Diff, async (evt) => { | ||||||||||||||||
| if (disposed) return | ||||||||||||||||
| await sync(evt.properties.sessionID, [ | ||||||||||||||||
| { | ||||||||||||||||
| type: "session_diff", | ||||||||||||||||
| data: evt.properties.diff, | ||||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| }) | ||||||||||||||||
| unsubscribers.push(unsub1, unsub2, unsub3, unsub4) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| export function dispose() { | ||||||||||||||||
| disposed = true | ||||||||||||||||
| for (const unsub of unsubscribers) { | ||||||||||||||||
| unsub() | ||||||||||||||||
| } | ||||||||||||||||
| unsubscribers.splice(0) | ||||||||||||||||
hendem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||
| // Clear pending timeouts | ||||||||||||||||
| for (const entry of queue.values()) { | ||||||||||||||||
| clearTimeout(entry.timeout) | ||||||||||||||||
| } | ||||||||||||||||
| queue.clear() | ||||||||||||||||
| log.info("disposed share-next subscriptions") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| export async function create(sessionID: string) { | ||||||||||||||||
|
|
@@ -191,4 +218,17 @@ export namespace ShareNext { | |||||||||||||||
| }, | ||||||||||||||||
| ]) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /** @internal Test helper to get queue size */ | ||||||||||||||||
| export function _getQueueSize(): number { | ||||||||||||||||
| return queue.size | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /** @internal Test helper to add items to queue for testing dispose cleanup */ | ||||||||||||||||
| export function _addToQueueForTesting(sessionID: string) { | ||||||||||||||||
| const dataMap = new Map<string, Data>() | ||||||||||||||||
| // Use short timeout for tests - this is a no-op callback that won't cause issues if it fires | ||||||||||||||||
| const timeout = setTimeout(() => {}, 100) | ||||||||||||||||
| queue.set(sessionID, { timeout, data: dataMap }) | ||||||||||||||||
|
Comment on lines
+253
to
+255
|
||||||||||||||||
| // Use short timeout for tests - this is a no-op callback that won't cause issues if it fires | |
| const timeout = setTimeout(() => {}, 100) | |
| queue.set(sessionID, { timeout, data: dataMap }) | |
| // Use timeout handle for testing dispose cleanup, but clear it immediately so it never fires | |
| const timeout = setTimeout(() => {}, 100) | |
| queue.set(sessionID, { timeout, data: dataMap }) | |
| clearTimeout(timeout) |
hendem marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,21 +9,33 @@ export namespace Share { | |
|
|
||
| let queue: Promise<void> = Promise.resolve() | ||
| const pending = new Map<string, any>() | ||
| let disposed = false | ||
|
|
||
| // Store unsubscribe functions for cleanup | ||
| const unsubscribers: Array<() => void> = [] | ||
|
|
||
| export async function sync(key: string, content: any) { | ||
| // Skip if disposed - check at entry point | ||
| if (disposed) return | ||
| const [root, ...splits] = key.split("/") | ||
| if (root !== "session") return | ||
| const [sub, sessionID] = splits | ||
| if (sub === "share") return | ||
| const share = await Session.getShare(sessionID).catch(() => {}) | ||
| if (!share) return | ||
| // Re-check disposed after async operation | ||
| if (disposed) return | ||
| const { secret } = share | ||
| pending.set(key, content) | ||
| queue = queue | ||
| .then(async () => { | ||
| // Check disposed at start of queued operation | ||
| if (disposed) return | ||
| const content = pending.get(key) | ||
| if (content === undefined) return | ||
| pending.delete(key) | ||
| // Final check before network request | ||
| if (disposed) return | ||
hendem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| return fetch(`${URL}/share_sync`, { | ||
| method: "POST", | ||
|
|
@@ -46,13 +58,17 @@ export namespace Share { | |
| } | ||
|
|
||
| export function init() { | ||
| Bus.subscribe(Session.Event.Updated, async (evt) => { | ||
| // Clean up any existing subscriptions before adding new ones | ||
| dispose() | ||
| disposed = false | ||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+63
to
+65
|
||
|
|
||
| const unsub1 = Bus.subscribe(Session.Event.Updated, async (evt) => { | ||
| await sync("session/info/" + evt.properties.info.id, evt.properties.info) | ||
| }) | ||
| Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||
| const unsub2 = Bus.subscribe(MessageV2.Event.Updated, async (evt) => { | ||
| await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) | ||
| }) | ||
| Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||
| const unsub3 = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { | ||
| await sync( | ||
| "session/part/" + | ||
| evt.properties.part.sessionID + | ||
|
|
@@ -63,6 +79,18 @@ export namespace Share { | |
| evt.properties.part, | ||
| ) | ||
| }) | ||
| unsubscribers.push(unsub1, unsub2, unsub3) | ||
| } | ||
|
|
||
| export function dispose() { | ||
| disposed = true | ||
| for (const unsub of unsubscribers) { | ||
| unsub() | ||
| } | ||
| unsubscribers.splice(0) | ||
hendem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pending.clear() | ||
| queue = Promise.resolve() | ||
hendem marked this conversation as resolved.
Show resolved
Hide resolved
hendem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| log.info("disposed share subscriptions") | ||
| } | ||
|
|
||
| export const URL = | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent array clearing method compared to other dispose implementations. Lines 87-90 in share.ts and lines 80-83 in share-next.ts use splice(0), while this uses length = 0. For consistency across the codebase, consider using the same approach. Additionally, if an unsubscriber function throws an error during the loop on lines 145-147, subsequent unsubscribers won't be called. Consider wrapping each unsub() call in a try-catch to ensure all subscriptions are cleaned up even if some fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Fixed. Now uses
splice(0)for consistency with other modules and wraps each unsubscribe in try-catch for error resilience.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Fixed. Now uses
splice(0)for consistency with other modules and wraps each unsubscribe in try-catch for error resilience.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed! Updated Format.dispose() to use the same pattern:
splice(0)before iteration for safe array clearing, plus try-catch around each unsubscribe call for robustness.