diff --git a/components/frontend/src/hooks/agui/event-handlers.ts b/components/frontend/src/hooks/agui/event-handlers.ts index f0d717c9b..2d63267ca 100644 --- a/components/frontend/src/hooks/agui/event-handlers.ts +++ b/components/frontend/src/hooks/agui/event-handlers.ts @@ -44,16 +44,64 @@ import type { ReasoningMessageStartEvent, ReasoningMessageContentEvent, ReasoningMessageEndEvent, + PendingToolCall, + MessageFeedback, } from '@/types/agui' import { normalizeSnapshotMessages } from './normalize-snapshot' +import { MAX_MESSAGES } from './types' + +/** + * Trim messages array to MAX_MESSAGES limit to prevent unbounded memory growth. + * Keeps most recent messages based on timestamp order. + */ +function trimMessages(messages: PlatformMessage[]): PlatformMessage[] { + if (messages.length <= MAX_MESSAGES) return messages + + // Keep the most recent MAX_MESSAGES + return messages.slice(-MAX_MESSAGES) +} + +/** + * Clean up stale entries from pendingToolCalls Map. + * Removes tool calls that are no longer referenced in messages. + * Prevents memory leaks from abandoned tool calls. + */ +function cleanupPendingToolCalls( + pendingToolCalls: Map, + messages: PlatformMessage[] +): Map { + // Collect all tool call IDs that are currently referenced in messages + const activeToolCallIds = new Set() + for (const msg of messages) { + if (msg.toolCalls) { + for (const tc of msg.toolCalls) { + activeToolCallIds.add(tc.id) + } + } + } + + // Keep only tool calls that are still referenced in messages + const cleaned = new Map() + for (const [id, toolCall] of pendingToolCalls) { + if (activeToolCallIds.has(id)) { + cleaned.set(id, toolCall) + } + } + + return cleaned +} /** * Insert a message into the list in timestamp order. * Messages without timestamps are appended to the end. + * Automatically trims to MAX_MESSAGES to prevent memory leaks. */ function insertByTimestamp(messages: PlatformMessage[], msg: PlatformMessage): PlatformMessage[] { const msgTime = msg.timestamp ? new Date(msg.timestamp).getTime() : null - if (msgTime == null) return [...messages, msg] + + if (msgTime == null) { + return trimMessages([...messages, msg]) + } // Find the first message with a later timestamp and insert before it. for (let i = messages.length - 1; i >= 0; i--) { @@ -61,11 +109,12 @@ function insertByTimestamp(messages: PlatformMessage[], msg: PlatformMessage): P if (t != null && t <= msgTime) { const copy = [...messages] copy.splice(i + 1, 0, msg) - return copy + return trimMessages(copy) } } + // All existing messages are later (or have no timestamp) — prepend. - return [msg, ...messages] + return trimMessages([msg, ...messages]) } /** Callbacks that event handlers may invoke for side effects */ @@ -730,9 +779,12 @@ function handleMessagesSnapshot( // Normalize snapshot: reconstruct parent-child tool call hierarchy const normalizedMessages = normalizeSnapshotMessages(visibleMessages) + // Trim normalized snapshot to MAX_MESSAGES before processing + const trimmedNormalized = trimMessages(normalizedMessages) + // Merge normalized snapshot into existing messages while preserving // chronological order. - const snapshotMap = new Map(normalizedMessages.map(m => [m.id, m])) + const snapshotMap = new Map(trimmedNormalized.map(m => [m.id, m])) const existingIds = new Set(state.messages.map(m => m.id)) // Update existing messages in-place with snapshot data. @@ -773,8 +825,8 @@ function handleMessagesSnapshot( }) // Insert new snapshot messages at the correct position - for (let i = 0; i < normalizedMessages.length; i++) { - const msg = normalizedMessages[i] + for (let i = 0; i < trimmedNormalized.length; i++) { + const msg = trimmedNormalized[i] if (existingIds.has(msg.id)) continue let insertBeforeId: string | null = null @@ -798,9 +850,12 @@ function handleMessagesSnapshot( existingIds.add(msg.id) } + // Apply MAX_MESSAGES limit after merge to prevent unbounded growth + const trimmedMerged = trimMessages(merged) + // Recover tool names from streaming state before cleanup const toolNameMap = new Map() - for (const msg of merged) { + for (const msg of trimmedMerged) { if (msg.role === 'tool' && msg.toolCalls) { for (const tc of msg.toolCalls) { if (tc.id && tc.function.name && tc.function.name !== 'tool' && tc.function.name !== 'unknown_tool') { @@ -815,7 +870,7 @@ function handleMessagesSnapshot( } } // Apply recovered names - for (const msg of merged) { + for (const msg of trimmedMerged) { if (msg.role === 'assistant' && msg.toolCalls) { for (const tc of msg.toolCalls) { if ((!tc.function.name || tc.function.name === 'tool' || tc.function.name === 'unknown_tool') && @@ -831,14 +886,14 @@ function handleMessagesSnapshot( // Remove redundant standalone role=tool messages that are now nested const nestedToolCallIds = new Set() - for (const msg of merged) { + for (const msg of trimmedMerged) { if (msg.role === 'assistant' && msg.toolCalls) { for (const tc of msg.toolCalls) { nestedToolCallIds.add(tc.id) } } } - const filtered = merged.filter(msg => { + const filtered = trimmedMerged.filter(msg => { if (msg.role !== 'tool') return true if ('toolCallId' in msg && msg.toolCallId && nestedToolCallIds.has(msg.toolCallId)) return false if (msg.toolCalls?.some(tc => nestedToolCallIds.has(tc.id))) return false @@ -868,6 +923,20 @@ function handleMessagesSnapshot( // Clear pendingChildren -- the normalized snapshot subsumes any // pending child data from streaming state.pendingChildren = new Map() + + // Clean up stale pendingToolCalls entries + state.pendingToolCalls = cleanupPendingToolCalls(state.pendingToolCalls, state.messages) + + // Clean up stale messageFeedback entries (keep only for messages that still exist) + const existingMessageIds = new Set(state.messages.map(m => m.id)) + const cleanedFeedback = new Map() + for (const [msgId, feedback] of state.messageFeedback) { + if (existingMessageIds.has(msgId)) { + cleanedFeedback.set(msgId, feedback) + } + } + state.messageFeedback = cleanedFeedback + return state } diff --git a/components/frontend/src/hooks/agui/types.ts b/components/frontend/src/hooks/agui/types.ts index ad3bd458f..f345e5803 100644 --- a/components/frontend/src/hooks/agui/types.ts +++ b/components/frontend/src/hooks/agui/types.ts @@ -8,6 +8,13 @@ import type { PlatformMessage, } from '@/types/agui' +/** + * Maximum number of messages to retain in memory for long-running sessions. + * Prevents unbounded memory growth while maintaining sufficient context. + * Matches the pattern used in use-session-queue.ts. + */ +export const MAX_MESSAGES = 500 + export type UseAGUIStreamOptions = { projectName: string sessionName: string diff --git a/components/frontend/src/hooks/use-agui-stream.ts b/components/frontend/src/hooks/use-agui-stream.ts index 01f18508e..effa1a0df 100644 --- a/components/frontend/src/hooks/use-agui-stream.ts +++ b/components/frontend/src/hooks/use-agui-stream.ts @@ -23,7 +23,9 @@ export type { UseAGUIStreamOptions, UseAGUIStreamReturn } from './agui/types' export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamReturn { // Track hidden message IDs (auto-sent initial/workflow prompts) + // Periodically cleaned up to prevent unbounded growth const hiddenMessageIdsRef = useRef>(new Set()) + const hiddenMessageCleanupTimerRef = useRef(null) const { projectName, sessionName, @@ -57,6 +59,27 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur } }, []) + // Periodic cleanup of hidden message IDs to prevent unbounded growth + // Clean up every 5 minutes during long sessions + useEffect(() => { + const CLEANUP_INTERVAL = 5 * 60 * 1000 // 5 minutes + const MAX_HIDDEN_IDS = 200 // Keep most recent hidden IDs + + hiddenMessageCleanupTimerRef.current = setInterval(() => { + if (hiddenMessageIdsRef.current.size > MAX_HIDDEN_IDS) { + // Convert to array, keep most recent, convert back to Set + const idsArray = Array.from(hiddenMessageIdsRef.current) + hiddenMessageIdsRef.current = new Set(idsArray.slice(-MAX_HIDDEN_IDS)) + } + }, CLEANUP_INTERVAL) + + return () => { + if (hiddenMessageCleanupTimerRef.current) { + clearInterval(hiddenMessageCleanupTimerRef.current) + } + } + }, []) + // Process incoming AG-UI events const processEvent = useCallback( (event: PlatformEvent) => { @@ -169,20 +192,30 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur // Disconnect from the event stream const disconnect = useCallback(() => { + // Clear reconnect timeout if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current) reconnectTimeoutRef.current = null } + // Close EventSource connection if (eventSourceRef.current) { eventSourceRef.current.close() eventSourceRef.current = null } + // Clear periodic cleanup timer + if (hiddenMessageCleanupTimerRef.current) { + clearInterval(hiddenMessageCleanupTimerRef.current) + hiddenMessageCleanupTimerRef.current = null + } + // Reset state setState((prev) => ({ ...prev, status: 'idle', })) setIsRunActive(false) currentRunIdRef.current = null + // Reset reconnect attempts counter + reconnectAttemptsRef.current = 0 onDisconnected?.() }, [onDisconnected]) @@ -239,12 +272,20 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur ...userMessage, timestamp: new Date().toISOString(), } as PlatformMessage - setState((prev) => ({ - ...prev, - status: 'connected', - error: null, - messages: [...prev.messages, userMsgWithTimestamp], - })) + setState((prev) => { + // Apply MAX_MESSAGES limit to prevent unbounded growth + const updatedMessages = [...prev.messages, userMsgWithTimestamp] + const trimmedMessages = updatedMessages.length > 500 + ? updatedMessages.slice(-500) + : updatedMessages + + return { + ...prev, + status: 'connected', + error: null, + messages: trimmedMessages, + } + }) try { const response = await fetch(runUrl, {