diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 70b65834705..dc495981d50 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -98,4 +98,33 @@ export class ACPSessionManager { this.sessions.set(sessionId, session) return session } + + /** + * Remove a session from the manager to free memory. + * Should be called when a session is terminated or the connection closes. + */ + remove(sessionId: string): boolean { + const existed = this.sessions.has(sessionId) + if (existed) { + this.sessions.delete(sessionId) + log.info("removed_session", { sessionId }) + } + return existed + } + + /** + * Get the count of active sessions (useful for monitoring/debugging). + */ + size(): number { + return this.sessions.size + } + + /** + * Clear all sessions. Used during cleanup/dispose. + */ + clear(): void { + const count = this.sessions.size + this.sessions.clear() + log.info("cleared_all_sessions", { count }) + } } diff --git a/packages/opencode/src/file/time.ts b/packages/opencode/src/file/time.ts index 770427abe96..6a1c4a09dc7 100644 --- a/packages/opencode/src/file/time.ts +++ b/packages/opencode/src/file/time.ts @@ -31,6 +31,27 @@ export namespace FileTime { return state().read[sessionID]?.[file] } + /** + * Clear all read timestamps for a session to free memory. + * Should be called when a session ends. + */ + export function clearSession(sessionID: string): boolean { + const s = state() + if (sessionID in s.read) { + delete s.read[sessionID] + log.info("cleared session read times", { sessionID }) + return true + } + return false + } + + /** + * Get the count of tracked sessions (useful for monitoring/debugging). + */ + export function sessionCount(): number { + return Object.keys(state().read).length + } + export async function withLock(filepath: string, fn: () => Promise): Promise { const current = state() const currentLock = current.locks.get(filepath) ?? Promise.resolve() diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index aca0c663152..3631a60560b 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -141,8 +141,27 @@ export namespace MCP { } // Store transports for OAuth servers to allow finishing auth + // Each entry includes a timestamp for TTL-based cleanup type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport - const pendingOAuthTransports = new Map() + type PendingOAuthEntry = { + transport: TransportWithAuth + createdAt: number + } + const pendingOAuthTransports = new Map() + + /** TTL for pending OAuth transports (10 minutes) */ + const OAUTH_TRANSPORT_TTL = 10 * 60 * 1000 + + /** Clean up expired OAuth transports */ + function cleanupExpiredOAuthTransports() { + const now = Date.now() + for (const [key, entry] of pendingOAuthTransports) { + if (now - entry.createdAt > OAUTH_TRANSPORT_TTL) { + log.info("cleaning up expired oauth transport", { key, age: now - entry.createdAt }) + pendingOAuthTransports.delete(key) + } + } + } // Prompt cache types type PromptInfo = Awaited>["prompts"][number] @@ -364,7 +383,8 @@ export namespace MCP { }).catch((e) => log.debug("failed to show toast", { error: e })) } else { // Store transport for later finishAuth call - pendingOAuthTransports.set(key, transport) + cleanupExpiredOAuthTransports() + pendingOAuthTransports.set(key, { transport, createdAt: Date.now() }) status = { status: "needs_auth" as const } // Show toast for needs_auth Bus.publish(TuiEvent.ToastShow, { @@ -739,7 +759,8 @@ export namespace MCP { } catch (error) { if (error instanceof UnauthorizedError && capturedUrl) { // Store transport for finishAuth - pendingOAuthTransports.set(mcpName, transport) + cleanupExpiredOAuthTransports() + pendingOAuthTransports.set(mcpName, { transport, createdAt: Date.now() }) return { authorizationUrl: capturedUrl.toString() } } throw error @@ -790,15 +811,15 @@ export namespace MCP { * Complete OAuth authentication with the authorization code. */ export async function finishAuth(mcpName: string, authorizationCode: string): Promise { - const transport = pendingOAuthTransports.get(mcpName) + const entry = pendingOAuthTransports.get(mcpName) - if (!transport) { + if (!entry) { throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`) } try { // Call finishAuth on the transport - await transport.finishAuth(authorizationCode) + await entry.transport.finishAuth(authorizationCode) // Clear the code verifier after successful auth await McpAuth.clearCodeVerifier(mcpName) diff --git a/packages/opencode/src/util/rpc.ts b/packages/opencode/src/util/rpc.ts index 57c695c480e..f9fa7da242b 100644 --- a/packages/opencode/src/util/rpc.ts +++ b/packages/opencode/src/util/rpc.ts @@ -3,6 +3,9 @@ export namespace Rpc { [method: string]: (input: any) => any } + /** Default timeout for RPC calls in milliseconds (30 seconds) */ + const DEFAULT_TIMEOUT = 30_000 + export function listen(rpc: Definition) { onmessage = async (evt) => { const parsed = JSON.parse(evt.data) @@ -13,30 +16,67 @@ export namespace Rpc { } } - export function client(target: { - postMessage: (data: string) => void | null - onmessage: ((this: Worker, ev: MessageEvent) => any) | null - }) { - const pending = new Map void>() + type PendingRequest = { + resolve: (result: any) => void + reject: (error: Error) => void + timeout: ReturnType + } + + export function client( + target: { + postMessage: (data: string) => void | null + onmessage: ((this: Worker, ev: MessageEvent) => any) | null + }, + options?: { timeout?: number }, + ) { + const pending = new Map() + const timeout = options?.timeout ?? DEFAULT_TIMEOUT let id = 0 + target.onmessage = async (evt) => { const parsed = JSON.parse(evt.data) if (parsed.type === "rpc.result") { - const resolve = pending.get(parsed.id) - if (resolve) { - resolve(parsed.result) + const request = pending.get(parsed.id) + if (request) { + clearTimeout(request.timeout) pending.delete(parsed.id) + request.resolve(parsed.result) } } } + return { call(method: Method, input: Parameters[0]): Promise> { const requestId = id++ - return new Promise((resolve) => { - pending.set(requestId, resolve) + return new Promise((resolve, reject) => { + const timeoutHandle = setTimeout(() => { + const request = pending.get(requestId) + if (request) { + pending.delete(requestId) + reject(new Error(`RPC call '${String(method)}' timed out after ${timeout}ms`)) + } + }, timeout) + + pending.set(requestId, { + resolve, + reject, + timeout: timeoutHandle, + }) target.postMessage(JSON.stringify({ type: "rpc.request", method, input, id: requestId })) }) }, + /** Get count of pending requests (for testing/monitoring) */ + pendingCount(): number { + return pending.size + }, + /** Clear all pending requests (for cleanup) */ + dispose(): void { + for (const [requestId, request] of pending) { + clearTimeout(request.timeout) + request.reject(new Error("RPC client disposed")) + } + pending.clear() + }, } } } diff --git a/packages/opencode/test/acp/session.test.ts b/packages/opencode/test/acp/session.test.ts new file mode 100644 index 00000000000..fba73429a9f --- /dev/null +++ b/packages/opencode/test/acp/session.test.ts @@ -0,0 +1,89 @@ +import { describe, expect, test, beforeEach, afterEach } from "bun:test" +import { ACPSessionManager } from "../../src/acp/session" + +describe("ACPSessionManager", () => { + // Create a mock SDK + const mockSdk = { + session: { + create: async () => ({ data: { id: "test-session-1" } }), + get: async () => ({ data: { id: "test-session-1", time: { created: new Date().toISOString() } } }), + }, + } as any + + describe("remove", () => { + test("removes existing session and returns true", async () => { + const manager = new ACPSessionManager(mockSdk) + + // Create a session + await manager.create("/test/path", [], undefined) + expect(manager.size()).toBe(1) + + // Remove it + const result = manager.remove("test-session-1") + expect(result).toBe(true) + expect(manager.size()).toBe(0) + }) + + test("returns false for non-existent session", () => { + const manager = new ACPSessionManager(mockSdk) + + const result = manager.remove("non-existent") + expect(result).toBe(false) + }) + }) + + describe("size", () => { + test("returns correct count of sessions", async () => { + const manager = new ACPSessionManager(mockSdk) + expect(manager.size()).toBe(0) + + // Create sessions with different IDs + const sdk1 = { + session: { create: async () => ({ data: { id: "session-1" } }) }, + } as any + const sdk2 = { + session: { create: async () => ({ data: { id: "session-2" } }) }, + } as any + + const manager1 = new ACPSessionManager(sdk1) + await manager1.create("/path1", [], undefined) + expect(manager1.size()).toBe(1) + + // Create another in same manager via load + const loadSdk = { + session: { + create: async () => ({ data: { id: "session-a" } }), + get: async () => ({ data: { id: "session-b", time: { created: new Date().toISOString() } } }), + }, + } as any + const manager2 = new ACPSessionManager(loadSdk) + await manager2.create("/path", [], undefined) + await manager2.load("session-b", "/path", [], undefined) + expect(manager2.size()).toBe(2) + }) + }) + + describe("clear", () => { + test("removes all sessions", async () => { + const sdk = { + session: { + create: async () => ({ data: { id: `session-${Math.random()}` } }), + get: async (params: any) => ({ + data: { id: params.sessionID, time: { created: new Date().toISOString() } }, + }), + }, + } as any + + const manager = new ACPSessionManager(sdk) + + await manager.create("/path1", [], undefined) + await manager.create("/path2", [], undefined) + await manager.create("/path3", [], undefined) + + expect(manager.size()).toBe(3) + + manager.clear() + expect(manager.size()).toBe(0) + }) + }) +}) diff --git a/packages/opencode/test/file/time.test.ts b/packages/opencode/test/file/time.test.ts new file mode 100644 index 00000000000..aa16ef4a50f --- /dev/null +++ b/packages/opencode/test/file/time.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, test } from "bun:test" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +describe("FileTime", () => { + describe("clearSession", () => { + test("clears read times for a session and returns true", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { FileTime } = await import("../../src/file/time") + + // Record some reads + FileTime.read("session-1", "/path/to/file1.ts") + FileTime.read("session-1", "/path/to/file2.ts") + FileTime.read("session-2", "/path/to/file3.ts") + + expect(FileTime.sessionCount()).toBe(2) + expect(FileTime.get("session-1", "/path/to/file1.ts")).toBeDefined() + expect(FileTime.get("session-1", "/path/to/file2.ts")).toBeDefined() + expect(FileTime.get("session-2", "/path/to/file3.ts")).toBeDefined() + + // Clear session-1 + const result = FileTime.clearSession("session-1") + expect(result).toBe(true) + + // Verify session-1 data is gone + expect(FileTime.get("session-1", "/path/to/file1.ts")).toBeUndefined() + expect(FileTime.get("session-1", "/path/to/file2.ts")).toBeUndefined() + + // Verify session-2 data is still there + expect(FileTime.get("session-2", "/path/to/file3.ts")).toBeDefined() + + expect(FileTime.sessionCount()).toBe(1) + }, + }) + }) + + test("returns false for non-existent session", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { FileTime } = await import("../../src/file/time") + + const result = FileTime.clearSession("non-existent-session") + expect(result).toBe(false) + }, + }) + }) + }) + + describe("sessionCount", () => { + test("returns correct count of tracked sessions", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { FileTime } = await import("../../src/file/time") + + expect(FileTime.sessionCount()).toBe(0) + + FileTime.read("session-a", "/file1.ts") + expect(FileTime.sessionCount()).toBe(1) + + FileTime.read("session-b", "/file2.ts") + expect(FileTime.sessionCount()).toBe(2) + + FileTime.read("session-a", "/file3.ts") // Same session + expect(FileTime.sessionCount()).toBe(2) + + FileTime.clearSession("session-a") + expect(FileTime.sessionCount()).toBe(1) + }, + }) + }) + }) +}) diff --git a/packages/opencode/test/util/rpc.test.ts b/packages/opencode/test/util/rpc.test.ts new file mode 100644 index 00000000000..7e41ea7fa20 --- /dev/null +++ b/packages/opencode/test/util/rpc.test.ts @@ -0,0 +1,126 @@ +import { describe, expect, test, beforeEach, mock } from "bun:test" +import { Rpc } from "../../src/util/rpc" + +describe("Rpc", () => { + describe("client", () => { + test("call resolves when response is received", async () => { + const messages: string[] = [] + const target = { + postMessage: (data: string) => { + messages.push(data) + }, + onmessage: null as ((this: Worker, ev: MessageEvent) => any) | null, + } + + const client = Rpc.client<{ add: (input: { a: number; b: number }) => number }>(target) + + const promise = client.call("add", { a: 1, b: 2 }) + + // Simulate response + const request = JSON.parse(messages[0]) + target.onmessage?.call( + {} as Worker, + new MessageEvent("message", { + data: JSON.stringify({ type: "rpc.result", result: 3, id: request.id }), + }), + ) + + const result = await promise + expect(result).toBe(3) + expect(client.pendingCount()).toBe(0) + }) + + test("call rejects with timeout error when no response", async () => { + const target = { + postMessage: () => {}, + onmessage: null as ((this: Worker, ev: MessageEvent) => any) | null, + } + + const client = Rpc.client<{ slow: (input: {}) => void }>(target, { timeout: 50 }) + + const start = Date.now() + await expect(client.call("slow", {})).rejects.toThrow("timed out after 50ms") + const elapsed = Date.now() - start + expect(elapsed).toBeGreaterThanOrEqual(45) + expect(elapsed).toBeLessThan(200) + expect(client.pendingCount()).toBe(0) + }) + + test("pendingCount tracks pending requests", async () => { + const target = { + postMessage: () => {}, + onmessage: null as ((this: Worker, ev: MessageEvent) => any) | null, + } + + const client = Rpc.client<{ op: (input: {}) => void }>(target, { timeout: 1000 }) + + expect(client.pendingCount()).toBe(0) + + // Start multiple calls without resolving + const p1 = client.call("op", {}).catch(() => {}) + expect(client.pendingCount()).toBe(1) + + const p2 = client.call("op", {}).catch(() => {}) + expect(client.pendingCount()).toBe(2) + + // Dispose should clear all + client.dispose() + expect(client.pendingCount()).toBe(0) + + await Promise.all([p1, p2]) + }) + + test("dispose rejects pending requests", async () => { + const target = { + postMessage: () => {}, + onmessage: null as ((this: Worker, ev: MessageEvent) => any) | null, + } + + const client = Rpc.client<{ op: (input: {}) => void }>(target, { timeout: 10000 }) + + const promise = client.call("op", {}) + client.dispose() + + await expect(promise).rejects.toThrow("RPC client disposed") + }) + + test("multiple calls get correct responses", async () => { + const messages: string[] = [] + const target = { + postMessage: (data: string) => { + messages.push(data) + }, + onmessage: null as ((this: Worker, ev: MessageEvent) => any) | null, + } + + const client = Rpc.client<{ double: (input: { n: number }) => number }>(target) + + const p1 = client.call("double", { n: 5 }) + const p2 = client.call("double", { n: 10 }) + + expect(client.pendingCount()).toBe(2) + + // Respond in reverse order + const req1 = JSON.parse(messages[0]) + const req2 = JSON.parse(messages[1]) + + target.onmessage?.call( + {} as Worker, + new MessageEvent("message", { + data: JSON.stringify({ type: "rpc.result", result: 20, id: req2.id }), + }), + ) + + target.onmessage?.call( + {} as Worker, + new MessageEvent("message", { + data: JSON.stringify({ type: "rpc.result", result: 10, id: req1.id }), + }), + ) + + expect(await p1).toBe(10) + expect(await p2).toBe(20) + expect(client.pendingCount()).toBe(0) + }) + }) +})