diff --git a/packages/web/src/hooks/__tests__/useAgentMessages-late-callback-dedup.test.ts b/packages/web/src/hooks/__tests__/useAgentMessages-late-callback-dedup.test.ts new file mode 100644 index 000000000..7a0452673 --- /dev/null +++ b/packages/web/src/hooks/__tests__/useAgentMessages-late-callback-dedup.test.ts @@ -0,0 +1,1091 @@ +import React, { act } from 'react'; +import { createRoot, type Root } from 'react-dom/client'; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { useAgentMessages } from '@/hooks/useAgentMessages'; + +const mockAddMessage = vi.fn(); +const mockAppendToMessage = vi.fn(); +const mockAppendToolEvent = vi.fn(); +const mockAppendRichBlock = vi.fn(); +const mockSetStreaming = vi.fn((id: string, streaming: boolean) => { + storeState.messages = storeState.messages.map((m) => (m.id === id ? { ...m, isStreaming: streaming } : m)); +}); +const mockSetLoading = vi.fn(); +const mockSetHasActiveInvocation = vi.fn(); +const mockSetIntentMode = vi.fn(); +const mockSetCatStatus = vi.fn(); +const mockClearCatStatuses = vi.fn(); +const mockRemoveActiveInvocation = vi.fn(); +const mockClearAllActiveInvocations = vi.fn(); +const mockSetCatInvocation = vi.fn((catId: string, info: Record) => { + storeState.catInvocations = { + ...storeState.catInvocations, + [catId]: { ...storeState.catInvocations[catId], ...info }, + }; +}); +const mockSetMessageUsage = vi.fn(); +const mockSetMessageMetadata = vi.fn(); +const mockSetMessageThinking = vi.fn(); +const mockRequestStreamCatchUp = vi.fn(); +const mockReplaceMessageId = vi.fn(); +const mockPatchMessage = vi.fn(); +const mockRemoveMessage = vi.fn(); + +const mockAddMessageToThread = vi.fn(); +const mockClearThreadActiveInvocation = vi.fn(); +const mockResetThreadInvocationState = vi.fn(); +const mockSetThreadMessageStreaming = vi.fn(); +const mockGetThreadState = vi.fn(() => ({ messages: [] })); + +const storeState = { + messages: [] as Array<{ + id: string; + type: string; + catId?: string; + content: string; + isStreaming?: boolean; + origin?: string; + extra?: { stream?: { invocationId?: string }; callbackBridge?: { skipDedup?: boolean } }; + replyTo?: string; + replyPreview?: { senderCatId: string | null; content: string; deleted?: true }; + timestamp: number; + }>, + addMessage: mockAddMessage, + appendToMessage: mockAppendToMessage, + appendToolEvent: mockAppendToolEvent, + appendRichBlock: mockAppendRichBlock, + setStreaming: mockSetStreaming, + setLoading: mockSetLoading, + setHasActiveInvocation: mockSetHasActiveInvocation, + removeActiveInvocation: mockRemoveActiveInvocation, + clearAllActiveInvocations: mockClearAllActiveInvocations, + setIntentMode: mockSetIntentMode, + setCatStatus: mockSetCatStatus, + clearCatStatuses: mockClearCatStatuses, + setCatInvocation: mockSetCatInvocation, + setMessageUsage: mockSetMessageUsage, + requestStreamCatchUp: mockRequestStreamCatchUp, + setMessageMetadata: mockSetMessageMetadata, + setMessageThinking: mockSetMessageThinking, + replaceMessageId: mockReplaceMessageId, + patchMessage: mockPatchMessage, + removeMessage: mockRemoveMessage, + + addMessageToThread: mockAddMessageToThread, + clearThreadActiveInvocation: mockClearThreadActiveInvocation, + resetThreadInvocationState: mockResetThreadInvocationState, + setThreadMessageStreaming: mockSetThreadMessageStreaming, + getThreadState: mockGetThreadState, + currentThreadId: 'thread-1', + catInvocations: {} as Record, + activeInvocations: {} as Record, +}; + +let captured: ReturnType | undefined; + +vi.mock('@/stores/chatStore', () => { + const useChatStoreMock = Object.assign(() => storeState, { getState: () => storeState }); + return { + useChatStore: useChatStoreMock, + }; +}); + +function Harness() { + captured = useAgentMessages(); + return null; +} + +describe('useAgentMessages late callback dedup (finalizedStreamRef across invocation boundary)', () => { + let container: HTMLDivElement; + let root: Root; + + beforeAll(() => { + (globalThis as { React?: typeof React }).React = React; + (globalThis as { IS_REACT_ACT_ENVIRONMENT?: boolean }).IS_REACT_ACT_ENVIRONMENT = true; + }); + + afterAll(() => { + delete (globalThis as { React?: typeof React }).React; + delete (globalThis as { IS_REACT_ACT_ENVIRONMENT?: boolean }).IS_REACT_ACT_ENVIRONMENT; + }); + + beforeEach(() => { + container = document.createElement('div'); + document.body.appendChild(container); + root = createRoot(container); + captured = undefined; + storeState.messages = []; + storeState.catInvocations = {}; + storeState.activeInvocations = {}; + vi.clearAllMocks(); + }); + + afterEach(() => { + act(() => { + root.unmount(); + }); + container.remove(); + }); + + it('late callback with SAME content merges into finalized bubble after invocation_created', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Response from inv-1'; + + // Invocation 1: streaming message + const bubble1 = { + id: 'msg-inv1', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 3000, + }; + storeState.messages.push(bubble1); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + // Stream text so activeRefs is set + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + }); + }); + + // Invocation 1 done — finalizes the bubble, sets finalizedStreamRef + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + + // Invocation 2 starts — invocation_created arrives (fences finalizedStreamRef) + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + vi.clearAllMocks(); + + // Late callback from inv-1 arrives WITHOUT msg.invocationId but with + // SAME content as the finalized bubble — should merge (true late dup) + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + }); + }); + + // Key assertion: should patch the finalized bubble (msg-inv1) + const patchCalls = mockPatchMessage.mock.calls.filter(([id]) => id === 'msg-inv1'); + expect(patchCalls.length).toBeGreaterThanOrEqual(1); + + // No new assistant bubble should be created + const newBubbleCalls = mockAddMessage.mock.calls.filter( + ([msg]) => msg.type === 'assistant' && msg.catId === 'opus', + ); + expect(newBubbleCalls).toHaveLength(0); + }); + + it('does not suppress the new invocation after a fenced late callback merged into the previous bubble', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Response from inv-1'; + + storeState.messages.push({ + id: 'msg-inv1', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 3000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'Fresh stream from inv-2', + }); + }); + + expect(mockAppendToMessage).not.toHaveBeenCalled(); + const streamBubble = mockAddMessage.mock.calls.find(([msg]) => msg.origin === 'stream')?.[0]; + expect(streamBubble?.content).toBe('Fresh stream from inv-2'); + expect(streamBubble?.extra).toEqual({ stream: { invocationId: 'inv-2' } }); + }); + + it('P2 regression: suppresses late stream chunks from finalized_fallback invocation after callback merge', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Response from inv-1'; + + storeState.messages.push({ + id: 'msg-inv1', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 3000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + invocationId: 'inv-1', + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + invocationId: 'inv-1', + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'late stream chunk from inv-1', + invocationId: 'inv-1', + }); + }); + + expect(mockAddMessage).not.toHaveBeenCalled(); + expect(mockAppendToMessage).not.toHaveBeenCalled(); + }); + + it('P2 regression: suppresses late stream chunks when finalized_fallback infers invocationId from current state', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Late-bound response'; + + storeState.messages.push({ + id: 'msg-late-bound', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + timestamp: Date.now() - 3000, + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + + // Late invocation binding arrives after the stream bubble finalized. + storeState.catInvocations = { opus: { invocationId: 'inv-late-bound' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'late stream chunk from same late-bound invocation', + invocationId: 'inv-late-bound', + }); + }); + + expect(mockAddMessage).not.toHaveBeenCalled(); + expect(mockAppendToMessage).not.toHaveBeenCalled(); + }); + + it('P2 regression: fenced finalized_fallback must not suppress the new invocation stream', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Late-bound response before fence'; + + storeState.messages.push({ + id: 'msg-fenced-late-bound', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + timestamp: Date.now() - 3000, + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + + storeState.catInvocations = { opus: { invocationId: 'inv-new' } }; + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-new', + }), + }); + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'Fresh stream from inv-new', + invocationId: 'inv-new', + }); + }); + + expect(mockAppendToMessage).not.toHaveBeenCalled(); + const streamBubble = mockAddMessage.mock.calls.find(([msg]) => msg.origin === 'stream')?.[0]; + expect(streamBubble?.content).toBe('Fresh stream from inv-new'); + expect(streamBubble?.extra).toEqual({ stream: { invocationId: 'inv-new' } }); + }); + + it('P2 regression: finalized_fallback does not merge callback onto a different reply target', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Same text, different parent'; + + storeState.messages.push({ + id: 'msg-inv1', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + replyTo: 'msg-parent-old', + replyPreview: { senderCatId: 'user', content: 'old parent' }, + timestamp: Date.now() - 3000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + replyTo: 'msg-parent-old', + replyPreview: { senderCatId: 'user', content: 'old parent' }, + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + invocationId: 'inv-1', + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + messageId: 'cb-reply-mismatch', + replyTo: 'msg-parent-new', + replyPreview: { senderCatId: 'user', content: 'new parent' }, + }); + }); + + expect(mockPatchMessage).not.toHaveBeenCalled(); + expect(mockAddMessage).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'cb-reply-mismatch', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'callback', + replyTo: 'msg-parent-new', + replyPreview: { senderCatId: 'user', content: 'new parent' }, + }), + ); + }); + + it('P2 regression: finalized_fallback still merges when the finalized stream bubble lacks reply metadata', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Same text, reply metadata arrives late'; + + storeState.messages.push({ + id: 'msg-inv1-no-reply', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 3000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + invocationId: 'inv-1', + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + messageId: 'cb-late-reply-known', + replyTo: 'msg-parent-known-late', + replyPreview: { senderCatId: 'user', content: 'late-known parent' }, + }); + }); + + expect(mockAddMessage).not.toHaveBeenCalled(); + expect(mockPatchMessage).toHaveBeenCalledWith( + 'cb-late-reply-known', + expect.objectContaining({ + content: responseText, + origin: 'callback', + isStreaming: false, + replyTo: 'msg-parent-known-late', + replyPreview: { senderCatId: 'user', content: 'late-known parent' }, + }), + ); + }); + + it('P2 regression: finalized_fallback still merges when the callback reply target is missing', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + const responseText = 'Same text, callback reply metadata missing'; + + storeState.messages.push({ + id: 'msg-inv1-known-reply', + type: 'assistant', + catId: 'opus', + content: responseText, + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + replyTo: 'msg-parent-known', + replyPreview: { senderCatId: 'user', content: 'known parent' }, + timestamp: Date.now() - 3000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: responseText, + replyTo: 'msg-parent-known', + replyPreview: { senderCatId: 'user', content: 'known parent' }, + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + invocationId: 'inv-1', + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: responseText, + messageId: 'cb-missing-reply-evidence', + }); + }); + + expect(mockAddMessage).not.toHaveBeenCalled(); + expect(mockPatchMessage).toHaveBeenCalledWith( + 'cb-missing-reply-evidence', + expect.objectContaining({ + content: responseText, + origin: 'callback', + isStreaming: false, + }), + ); + }); + + it('P1 regression: callback with DIFFERENT content does NOT merge across invocation boundary', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + // Invocation 1: streaming + finalize + const bubble1 = { + id: 'msg-inv1', + type: 'assistant', + catId: 'opus', + content: 'Response from inv-1', + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 3000, + }; + storeState.messages.push(bubble1); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'Response from inv-1', + }); + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + + // Invocation 2: callback-only (e.g. post_message) — invocation_created fences the ref + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + vi.clearAllMocks(); + + // Inv-2's callback with DIFFERENT content — must NOT overwrite inv-1's bubble + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: 'Scheduled task created', + }); + }); + + // Should NOT patch the finalized bubble + const patchCalls = mockPatchMessage.mock.calls.filter(([id]) => id === 'msg-inv1'); + expect(patchCalls).toHaveLength(0); + + // Should create a new bubble for inv-2's output + const newBubbleCalls = mockAddMessage.mock.calls.filter( + ([msg]) => msg.type === 'assistant' && msg.catId === 'opus', + ); + expect(newBubbleCalls).toHaveLength(1); + expect(newBubbleCalls[0][0].content).toBe('Scheduled task created'); + }); + + it('invalidates a fenced finalized ref when a second invocation boundary arrives', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + storeState.messages.push({ + id: 'msg-inv1', + type: 'assistant', + catId: 'opus', + content: 'Shared content', + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 4000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'Shared content', + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + + // First boundary fences the inv-1 finalized ref. + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + // inv-2 is callback-only with different content, so the fenced inv-1 ref survives. + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: 'Different inv-2 content', + }); + }); + + vi.clearAllMocks(); + + // Second boundary must invalidate the stale inv-1 ref. + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-3', + }), + }); + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: 'Shared content', + }); + }); + + const patchCalls = mockPatchMessage.mock.calls.filter(([id]) => id === 'msg-inv1'); + expect(patchCalls).toHaveLength(0); + + const newBubbleCalls = mockAddMessage.mock.calls.filter( + ([msg]) => msg.type === 'assistant' && msg.catId === 'opus', + ); + expect(newBubbleCalls).toHaveLength(1); + expect(newBubbleCalls[0][0].content).toBe('Shared content'); + expect(newBubbleCalls[0][0].extra).toEqual({ callbackBridge: { skipDedup: true } }); + }); + + it('keeps suppressing late stream chunks when callback creates a bubble after a non-matchable finalized ref', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + storeState.messages.push({ + id: 'msg-inv1', + type: 'assistant', + catId: 'opus', + content: 'Response from inv-1', + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 4000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'Response from inv-1', + }); + }); + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'system_info', + catId: 'opus', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-2', + }), + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: 'Delayed callback from inv-1', + }); + }); + + expect(mockAddMessage).toHaveBeenCalledTimes(1); + const callbackBubble = mockAddMessage.mock.calls[0]?.[0]; + expect(callbackBubble?.origin).toBe('callback'); + expect(callbackBubble?.extra?.stream?.invocationId).toBeUndefined(); + expect(callbackBubble?.extra?.callbackBridge).toEqual({ skipDedup: true }); + + storeState.messages.push({ + ...callbackBubble, + timestamp: Date.now(), + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'Fresh stream from inv-2', + }); + }); + + expect(mockAppendToMessage).not.toHaveBeenCalled(); + expect(mockAddMessage).not.toHaveBeenCalled(); + }); + + it('keeps suppressing late stream chunks for callback-first invocations without explicit invocationId', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + storeState.catInvocations = { opus: { invocationId: 'inv-callback-first' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: 'Callback arrived before stream', + }); + }); + + expect(mockAddMessage).toHaveBeenCalledTimes(1); + expect(mockAddMessage.mock.calls[0]?.[0].extra).toEqual({ callbackBridge: { skipDedup: true } }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'late stream chunk from same invocation', + }); + }); + + expect(mockAddMessage).not.toHaveBeenCalled(); + expect(mockAppendToMessage).not.toHaveBeenCalled(); + }); + + it('keeps suppressing late stream chunks after callback replaces an invocationless placeholder', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + storeState.catInvocations = { opus: { invocationId: 'inv-invocationless-placeholder' } }; + storeState.messages.push({ + id: 'msg-invocationless-placeholder', + type: 'assistant', + catId: 'opus', + content: 'thinking...', + isStreaming: true, + origin: 'stream', + timestamp: Date.now() - 1000, + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'thinking...', + }); + }); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: 'final answer', + }); + }); + + expect(mockPatchMessage).toHaveBeenCalledWith( + 'msg-invocationless-placeholder', + expect.objectContaining({ + content: 'final answer', + origin: 'callback', + isStreaming: false, + }), + ); + + vi.clearAllMocks(); + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: ' late chunk from same invocation', + }); + }); + + expect(mockAddMessage).not.toHaveBeenCalled(); + expect(mockAppendToMessage).not.toHaveBeenCalled(); + }); + + it('thread switch clears finalizedStreamRef (resetRefs still works)', () => { + act(() => { + root.render(React.createElement(Harness)); + }); + + // Invocation 1: streaming + done + storeState.messages.push({ + id: 'msg-finalized', + type: 'assistant', + catId: 'opus', + content: 'Done', + isStreaming: true, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: Date.now() - 2000, + }); + storeState.catInvocations = { opus: { invocationId: 'inv-1' } }; + + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + content: 'Done', + }); + }); + + act(() => { + captured?.handleAgentMessage({ + type: 'done', + catId: 'opus', + isFinal: true, + }); + }); + + // Thread switch — resetRefs should clear finalizedStreamRef + act(() => { + captured?.resetRefs(); + }); + + vi.clearAllMocks(); + + // Late callback arrives after thread switch — should NOT find finalized bubble + act(() => { + captured?.handleAgentMessage({ + type: 'text', + catId: 'opus', + origin: 'callback', + content: 'Late callback after thread switch', + }); + }); + + // Should create a new bubble (no finalized ref available after reset) + const newBubbleCalls = mockAddMessage.mock.calls.filter( + ([msg]) => msg.type === 'assistant' && msg.catId === 'opus', + ); + expect(newBubbleCalls).toHaveLength(1); + }); +}); diff --git a/packages/web/src/hooks/__tests__/useSocket-background.test.ts b/packages/web/src/hooks/__tests__/useSocket-background.test.ts index e5b347b68..abfd780ab 100644 --- a/packages/web/src/hooks/__tests__/useSocket-background.test.ts +++ b/packages/web/src/hooks/__tests__/useSocket-background.test.ts @@ -21,7 +21,7 @@ import { let testBgSeq = 0; const testBgStreamRefs = new Map(); const testBgReplacedInvocations = new Map(); -const testBgFinalizedRefs = new Map(); +const testBgFinalizedRefs = new Map(); /** #80 fix-C: Track clearDoneTimeout calls */ let clearDoneTimeoutCalls: Array = []; @@ -257,6 +257,60 @@ describe('background thread socket handling', () => { expect(ts.messages[0]?.origin).toBe('callback'); }); + it('P1 regression: stale explicit callback bubble must NOT suppress live background stream chunks', () => { + const now = Date.now(); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-live', + type: 'assistant', + catId: 'opus', + content: 'Live response', + origin: 'stream', + isStreaming: true, + extra: { stream: {} }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-live', threadId: 'thread-bg', catId: 'opus' }); + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: undefined }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'Old callback', + invocationId: 'inv-stale', + messageId: 'bg-stale-cb', + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: ' more live text', + timestamp: now + 2, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + id: 'bg-live', + content: 'Live response more live text', + origin: 'stream', + isStreaming: true, + }), + expect.objectContaining({ + id: 'bg-stale-cb', + content: 'Old callback', + origin: 'callback', + extra: { stream: { invocationId: 'inv-stale' } }, + }), + ]), + ); + }); + it('callback-origin text replaces overlapping background stream bubble from the same invocation', () => { const now = Date.now(); useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-1' }); @@ -331,6 +385,889 @@ describe('background thread socket handling', () => { ]); }); + it('P1 regression: callback with DIFFERENT content does NOT merge across bg invocation boundary', () => { + const now = Date.now(); + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-p1' }); + + // Stream bubble finalized — stopTrackedStream will set finalizedBgRefs + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-p1', + type: 'assistant', + catId: 'opus', + content: 'Response from inv-1', + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-p1' } }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-p1', threadId: 'thread-bg', catId: 'opus' }); + + // Done event → finalizes and sets finalizedBgRefs + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + timestamp: now + 1, + }); + + // invocation_created for inv-2 → fences finalizedBgRefs + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-p1-v2', + }), + timestamp: now + 2, + }); + + // Callback from inv-2 with DIFFERENT content + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'Scheduled task created', + messageId: 'bg-cb-p1-new', + timestamp: now + 3, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + // Both messages should exist — inv-2 callback must NOT overwrite inv-1's bubble + expect(ts.messages).toHaveLength(2); + expect(ts.messages[0]!.content).toBe('Response from inv-1'); + expect(ts.messages[1]!.content).toBe('Scheduled task created'); + }); + + it('P2 parity: late callback with SAME content merges across bg invocation boundary', () => { + const now = Date.now(); + const responseText = 'Response from inv-1'; + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-p2' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-p2', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-p2' } }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-p2', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-p2-v2', + }), + timestamp: now + 2, + }); + + // Late callback with SAME content — should merge (true late dup) + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-cb-p2-dup', + timestamp: now + 3, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + // Should merge — only 1 message + expect(ts.messages).toHaveLength(1); + expect(ts.messages[0]!.id).toBe('bg-cb-p2-dup'); + expect(ts.messages[0]!.origin).toBe('callback'); + }); + + it('does not suppress the new bg invocation after a fenced late callback merged into the previous bubble', () => { + const now = Date.now(); + const responseText = 'Response from inv-1'; + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-p2' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-p2', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-p2' } }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-p2', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-p2-v2', + }), + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-cb-p2-dup', + timestamp: now + 3, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: 'Fresh stream from inv-bg-p2-v2', + timestamp: now + 4, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(2); + expect(ts.messages[0]!.id).toBe('bg-cb-p2-dup'); + expect(ts.messages[1]).toEqual( + expect.objectContaining({ + type: 'assistant', + catId: 'opus', + content: 'Fresh stream from inv-bg-p2-v2', + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-p2-v2' } }, + }), + ); + }); + + it('P2 regression: suppresses late background stream chunks from finalized_fallback invocation after callback merge', () => { + const now = Date.now(); + const responseText = 'Response from inv-1'; + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-old' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-old', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-old' } }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-old', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + invocationId: 'inv-bg-old', + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-new', + }), + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-callback-old', + timestamp: now + 3, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: 'late stream chunk from old invocation', + invocationId: 'inv-bg-old', + timestamp: now + 4, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(1); + expect(ts.messages[0]).toEqual( + expect.objectContaining({ + id: 'bg-callback-old', + catId: 'opus', + content: responseText, + origin: 'callback', + isStreaming: false, + }), + ); + }); + + it('P2 regression: suppresses late bg stream chunks when finalized_fallback infers invocationId from thread state', () => { + const now = Date.now(); + const responseText = 'Late-bound bg response'; + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-late-bound', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { + id: 'bg-stream-late-bound', + threadId: 'thread-bg', + catId: 'opus', + }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + timestamp: now + 1, + }); + + // Late invocation binding arrives after finalization, before the callback. + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-late-bound' }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-callback-late-bound', + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: 'late stream chunk from same late-bound invocation', + invocationId: 'inv-bg-late-bound', + timestamp: now + 3, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(1); + expect(ts.messages[0]).toEqual( + expect.objectContaining({ + id: 'bg-callback-late-bound', + catId: 'opus', + content: responseText, + origin: 'callback', + isStreaming: false, + }), + ); + }); + + it('P2 regression: fenced background finalized_fallback must not suppress the new invocation stream', () => { + const now = Date.now(); + const responseText = 'Late-bound bg response before fence'; + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-fenced-late-bound', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { + id: 'bg-stream-fenced-late-bound', + threadId: 'thread-bg', + catId: 'opus', + }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + timestamp: now + 1, + }); + + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-new' }); + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-new', + }), + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-callback-fenced-late-bound', + timestamp: now + 3, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: 'Fresh bg stream from inv-bg-new', + invocationId: 'inv-bg-new', + timestamp: now + 4, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(2); + expect(ts.messages[0]).toEqual( + expect.objectContaining({ + id: 'bg-callback-fenced-late-bound', + catId: 'opus', + content: responseText, + origin: 'callback', + isStreaming: false, + }), + ); + expect(ts.messages[1]).toEqual( + expect.objectContaining({ + type: 'assistant', + catId: 'opus', + content: 'Fresh bg stream from inv-bg-new', + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-new' } }, + }), + ); + }); + + it('P2 regression: background finalized_fallback does not merge callback onto a different reply target', () => { + const now = Date.now(); + const responseText = 'Same text, different parent'; + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-old' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-old', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-old' } }, + replyTo: 'msg-parent-old', + replyPreview: { senderCatId: 'user', content: 'old parent' }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-old', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + invocationId: 'inv-bg-old', + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-new', + }), + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-callback-reply-mismatch', + replyTo: 'msg-parent-new', + replyPreview: { senderCatId: 'user', content: 'new parent' }, + timestamp: now + 3, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(2); + expect(ts.messages[0]).toEqual( + expect.objectContaining({ + id: 'bg-stream-old', + catId: 'opus', + content: responseText, + origin: 'stream', + replyTo: 'msg-parent-old', + replyPreview: { senderCatId: 'user', content: 'old parent' }, + }), + ); + expect(ts.messages[1]).toEqual( + expect.objectContaining({ + id: 'bg-callback-reply-mismatch', + catId: 'opus', + content: responseText, + origin: 'callback', + replyTo: 'msg-parent-new', + replyPreview: { senderCatId: 'user', content: 'new parent' }, + }), + ); + }); + + it('P2 regression: background finalized_fallback still merges when stream reply target is unknown', () => { + const now = Date.now(); + const responseText = 'Same text, reply metadata arrives late'; + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-old' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-no-reply', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-old' } }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-no-reply', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + invocationId: 'inv-bg-old', + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-new', + }), + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-callback-reply-known', + replyTo: 'msg-parent-known-late', + replyPreview: { senderCatId: 'user', content: 'late-known parent' }, + timestamp: now + 3, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(1); + expect(ts.messages[0]).toEqual( + expect.objectContaining({ + id: 'bg-callback-reply-known', + catId: 'opus', + content: responseText, + origin: 'callback', + isStreaming: false, + replyTo: 'msg-parent-known-late', + replyPreview: { senderCatId: 'user', content: 'late-known parent' }, + }), + ); + }); + + it('P2 regression: background finalized_fallback still merges when callback reply target is missing', () => { + const now = Date.now(); + const responseText = 'Same text, callback reply metadata missing'; + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-old' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-known-reply', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-old' } }, + replyTo: 'msg-parent-known', + replyPreview: { senderCatId: 'user', content: 'known parent' }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-known-reply', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + invocationId: 'inv-bg-old', + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-new', + }), + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: responseText, + messageId: 'bg-callback-missing-reply', + timestamp: now + 3, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(1); + expect(ts.messages[0]).toEqual( + expect.objectContaining({ + id: 'bg-callback-missing-reply', + catId: 'opus', + content: responseText, + origin: 'callback', + isStreaming: false, + replyTo: 'msg-parent-known', + replyPreview: { senderCatId: 'user', content: 'known parent' }, + }), + ); + }); + + it('invalidates a fenced bg finalized ref after a second invocation boundary', () => { + const now = Date.now(); + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-stale-1' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-stale', + type: 'assistant', + catId: 'opus', + content: 'Shared content', + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-stale-1' } }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-stale', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-stale-2', + }), + timestamp: now + 2, + }); + + // Callback-only inv-2 leaves the inv-1 finalized ref fenced but unconsumed. + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'Different inv-2 content', + messageId: 'bg-cb-stale-2', + timestamp: now + 3, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-stale-3', + }), + timestamp: now + 4, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'Shared content', + messageId: 'bg-cb-stale-3', + timestamp: now + 5, + }); + + const ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(3); + expect(ts.messages.map((m) => m.content)).toEqual([ + 'Shared content', + 'Different inv-2 content', + 'Shared content', + ]); + }); + + it('keeps suppressing late background stream chunks when callback creates a bubble after a non-matchable finalized ref', () => { + const now = Date.now(); + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-old' }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-old-final', + type: 'assistant', + catId: 'opus', + content: 'Old finalized content', + origin: 'stream', + isStreaming: true, + extra: { stream: { invocationId: 'inv-bg-old' } }, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { id: 'bg-stream-old-final', threadId: 'thread-bg', catId: 'opus' }); + + simulateBackgroundMessage({ + type: 'done', + catId: 'opus', + threadId: 'thread-bg', + isFinal: true, + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'system_info', + catId: 'opus', + threadId: 'thread-bg', + content: JSON.stringify({ + type: 'invocation_created', + catId: 'opus', + invocationId: 'inv-bg-new', + }), + timestamp: now + 2, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'Delayed callback from old invocation', + messageId: 'bg-callback-unmatched', + timestamp: now + 3, + }); + + let ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(2); + expect(ts.messages[1]?.origin).toBe('callback'); + expect(ts.messages[1]?.extra?.stream?.invocationId).toBeUndefined(); + expect(ts.messages[1]?.extra?.callbackBridge).toEqual({ skipDedup: true }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: 'Fresh stream from new invocation', + timestamp: now + 4, + }); + + ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(2); + expect(ts.messages[1]).toEqual( + expect.objectContaining({ + id: 'bg-callback-unmatched', + type: 'assistant', + catId: 'opus', + content: 'Delayed callback from old invocation', + origin: 'callback', + extra: { callbackBridge: { skipDedup: true } }, + }), + ); + }); + + it('keeps suppressing late background stream chunks for callback-first invocations without explicit invocationId', () => { + const now = Date.now(); + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { invocationId: 'inv-bg-callback-first' }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'Callback arrived before stream', + messageId: 'bg-callback-first', + timestamp: now, + }); + + let ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(1); + expect(ts.messages[0]?.extra).toEqual({ callbackBridge: { skipDedup: true } }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: 'late stream chunk from same invocation', + timestamp: now + 1, + }); + + ts = useChatStore.getState().getThreadState('thread-bg'); + expect(ts.messages).toHaveLength(1); + expect(ts.messages[0]?.origin).toBe('callback'); + expect(ts.messages[0]?.content).toBe('Callback arrived before stream'); + }); + + it('keeps suppressing late background stream chunks after callback replaces an invocationless placeholder', () => { + const now = Date.now(); + useChatStore.getState().setThreadCatInvocation('thread-bg', 'opus', { + invocationId: 'inv-bg-invocationless-placeholder', + }); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-invocationless-placeholder', + type: 'assistant', + catId: 'opus', + content: 'thinking...', + origin: 'stream', + isStreaming: true, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { + id: 'bg-stream-invocationless-placeholder', + threadId: 'thread-bg', + catId: 'opus', + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'final answer', + messageId: 'bg-callback-invocationless-placeholder', + timestamp: now + 1, + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + content: ' late chunk from same invocation', + timestamp: now + 2, + }); + + expect(useChatStore.getState().getThreadState('thread-bg').messages).toEqual([ + expect.objectContaining({ + id: 'bg-callback-invocationless-placeholder', + catId: 'opus', + content: 'final answer', + origin: 'callback', + isStreaming: false, + }), + ]); + }); + + it('does not fallback-match an invocationless placeholder when callback has explicit invocationId', () => { + const now = Date.now(); + + useChatStore.getState().addMessageToThread('thread-bg', { + id: 'bg-stream-explicit-fallback-guard', + type: 'assistant', + catId: 'opus', + content: 'thinking...', + origin: 'stream', + isStreaming: true, + timestamp: now, + }); + testBgStreamRefs.set('thread-bg::opus', { + id: 'bg-stream-explicit-fallback-guard', + threadId: 'thread-bg', + catId: 'opus', + }); + + simulateBackgroundMessage({ + type: 'text', + catId: 'opus', + threadId: 'thread-bg', + origin: 'callback', + content: 'explicit callback result', + invocationId: 'inv-bg-explicit-callback', + messageId: 'bg-callback-explicit-fallback-guard', + timestamp: now + 1, + }); + + expect(useChatStore.getState().getThreadState('thread-bg').messages).toEqual([ + expect.objectContaining({ + id: 'bg-stream-explicit-fallback-guard', + catId: 'opus', + content: 'thinking...', + origin: 'stream', + isStreaming: true, + }), + expect.objectContaining({ + id: 'bg-callback-explicit-fallback-guard', + catId: 'opus', + content: 'explicit callback result', + origin: 'callback', + extra: { stream: { invocationId: 'inv-bg-explicit-callback' } }, + }), + ]); + }); + it('drops late background stream chunks after callback replacement', () => { configureDebug({ enabled: true }); ensureWindowDebugApi(); diff --git a/packages/web/src/hooks/reply-target-compat.ts b/packages/web/src/hooks/reply-target-compat.ts new file mode 100644 index 000000000..f39f7cf75 --- /dev/null +++ b/packages/web/src/hooks/reply-target-compat.ts @@ -0,0 +1,6 @@ +export function isReplyTargetCompatible(existingReplyTo?: string, incomingReplyTo?: string): boolean { + if (existingReplyTo == null || incomingReplyTo == null) { + return true; + } + return existingReplyTo === incomingReplyTo; +} diff --git a/packages/web/src/hooks/useAgentMessages.ts b/packages/web/src/hooks/useAgentMessages.ts index 581b20a56..58f1bf9c7 100644 --- a/packages/web/src/hooks/useAgentMessages.ts +++ b/packages/web/src/hooks/useAgentMessages.ts @@ -4,6 +4,7 @@ import { useCallback, useEffect, useRef } from 'react'; import { recordDebugEvent } from '@/debug/invocationEventDebug'; import { useChatStore } from '@/stores/chatStore'; import { compactToolResultDetail } from '@/utils/toolPreview'; +import { isReplyTargetCompatible } from './reply-target-compat'; /** Timeout for done(isFinal) - 5 minutes */ const DONE_TIMEOUT_MS = 5 * 60 * 1000; @@ -106,8 +107,12 @@ export function useAgentMessages() { /** #586 follow-up: Track just-finalized stream bubble per cat. Set on done when * activeRefs entry existed, consumed by callback replacement or next invocation start. - * Prevents the greedy scan from matching arbitrary historical messages. */ - const finalizedStreamRef = useRef>(new Map()); + * Prevents the greedy scan from matching arbitrary historical messages. + * fencedAt: set by invocation_created to mark the boundary — allows late callbacks + * within a grace period but blocks new-invocation callbacks (P1 guard). */ + const finalizedStreamRef = useRef>( + new Map(), + ); /** Bug C P2: Track whether stream data was received per cat (avoids false catch-up on callback-only flows) */ const sawStreamDataRef = useRef>(new Set()); @@ -256,69 +261,97 @@ export function useAgentMessages() { [getCurrentInvocationIdForCat], ); - const findCallbackReplacementTarget = useCallback((catId: string, invocationId: string): { id: string } | null => { - const currentMessages = useChatStore.getState().messages; - for (let i = currentMessages.length - 1; i >= 0; i -= 1) { - const msg = currentMessages[i]; - if ( - msg?.type === 'assistant' && - msg.catId === catId && - msg.origin === 'stream' && - msg.extra?.stream?.invocationId === invocationId - ) { - return { id: msg.id }; + const findCallbackReplacementTarget = useCallback( + (catId: string, invocationId: string): { id: string; matchKind: 'exact' } | null => { + const currentMessages = useChatStore.getState().messages; + for (let i = currentMessages.length - 1; i >= 0; i -= 1) { + const msg = currentMessages[i]; + if ( + msg?.type === 'assistant' && + msg.catId === catId && + msg.origin === 'stream' && + msg.extra?.stream?.invocationId === invocationId + ) { + return { id: msg.id, matchKind: 'exact' }; + } } - } - return null; - }, []); + return null; + }, + [], + ); - const findInvocationlessStreamPlaceholder = useCallback((catId: string): { id: string } | null => { - const currentMessages = useChatStore.getState().messages; - const activeId = activeRefs.current.get(catId)?.id; + const findInvocationlessStreamPlaceholder = useCallback( + ( + catId: string, + callbackContent?: string, + callbackReplyTo?: string, + ): + | { id: string; matchKind: 'active_invocationless' } + | { id: string; matchKind: 'finalized_fallback'; replacementInvocationId?: string; fencedAt?: number } + | null => { + const currentMessages = useChatStore.getState().messages; + const activeId = activeRefs.current.get(catId)?.id; - if (activeId) { - const activeMessage = currentMessages.find( - (msg) => - msg.id === activeId && - msg.type === 'assistant' && - msg.catId === catId && - msg.origin === 'stream' && - !msg.extra?.stream?.invocationId, - ); - if (activeMessage) { - return { id: activeMessage.id }; + if (activeId) { + const activeMessage = currentMessages.find( + (msg) => + msg.id === activeId && + msg.type === 'assistant' && + msg.catId === catId && + msg.origin === 'stream' && + !msg.extra?.stream?.invocationId, + ); + if (activeMessage) { + return { id: activeMessage.id, matchKind: 'active_invocationless' }; + } } - } - // First pass: find actively-streaming invocationless bubble - for (let i = currentMessages.length - 1; i >= 0; i -= 1) { - const msg = currentMessages[i]; - if ( - msg?.type === 'assistant' && - msg.catId === catId && - msg.origin === 'stream' && - msg.isStreaming && - !msg.extra?.stream?.invocationId - ) { - return { id: msg.id }; + // First pass: find actively-streaming invocationless bubble + for (let i = currentMessages.length - 1; i >= 0; i -= 1) { + const msg = currentMessages[i]; + if ( + msg?.type === 'assistant' && + msg.catId === catId && + msg.origin === 'stream' && + msg.isStreaming && + !msg.extra?.stream?.invocationId + ) { + return { id: msg.id, matchKind: 'active_invocationless' }; + } } - } - // #586 follow-up: Check finalizedStreamRef — the done handler records the - // exact message ID of the just-finalized stream bubble. This avoids the - // greedy scan that could match arbitrary historical messages (P1 from review). - const finalizedId = finalizedStreamRef.current.get(catId); - if (finalizedId) { - const finalized = currentMessages.find( - (m) => m.id === finalizedId && m.type === 'assistant' && m.catId === catId && m.origin === 'stream', - ); - if (finalized) { - return { id: finalized.id }; + // #586 follow-up: Check finalizedStreamRef — the done handler records the + // exact message ID of the just-finalized stream bubble. This avoids the + // greedy scan that could match arbitrary historical messages (P1 from review). + const finalizedEntry = finalizedStreamRef.current.get(catId); + if (finalizedEntry) { + const finalized = currentMessages.find( + (m) => + m.id === finalizedEntry.messageId && m.type === 'assistant' && m.catId === catId && m.origin === 'stream', + ); + if (finalized) { + // #586 P1 guard: if invocation_created has fired since finalization + // (fencedAt is set), apply time + content fence to prevent a callback-only + // inv-2 from silently overwriting inv-1's finalized bubble. + if (finalizedEntry.fencedAt) { + const LATE_CALLBACK_GRACE_MS = 5_000; + if (Date.now() - finalizedEntry.fencedAt > LATE_CALLBACK_GRACE_MS) return null; + if (callbackContent !== undefined && finalized.content !== callbackContent) return null; + } + if (!isReplyTargetCompatible(finalized.replyTo, callbackReplyTo)) return null; + return { + id: finalized.id, + matchKind: 'finalized_fallback', + replacementInvocationId: finalizedEntry.invocationId ?? finalized.extra?.stream?.invocationId, + fencedAt: finalizedEntry.fencedAt, + }; + } } - } - return null; - }, []); + return null; + }, + [], + ); const getOrRecoverActiveAssistantMessageId = useCallback( (catId: string, metadata?: AgentMsg['metadata'], options?: { ensureStreaming?: boolean }): string | null => { @@ -426,12 +459,15 @@ export function useAgentMessages() { } if (msg.origin === 'callback') { - const invocationId = msg.invocationId ?? getCurrentInvocationIdForCat(msg.catId); - const hasExplicitInvocationId = !!msg.invocationId; - const replacementTarget = invocationId - ? (findCallbackReplacementTarget(msg.catId, invocationId) ?? - (hasExplicitInvocationId ? null : findInvocationlessStreamPlaceholder(msg.catId))) - : findInvocationlessStreamPlaceholder(msg.catId); + const explicitInvocationId = msg.invocationId; + const inferredInvocationId = getCurrentInvocationIdForCat(msg.catId); + const invocationId = explicitInvocationId ?? inferredInvocationId; + const hasExplicitInvocationId = !!explicitInvocationId; + const exactReplacementTarget = invocationId ? findCallbackReplacementTarget(msg.catId, invocationId) : null; + const fallbackReplacementTarget = hasExplicitInvocationId + ? null + : findInvocationlessStreamPlaceholder(msg.catId, msg.content, msg.replyTo); + const replacementTarget = exactReplacementTarget ?? fallbackReplacementTarget; if (replacementTarget) { const finalId = msg.messageId ?? replacementTarget.id; @@ -451,15 +487,32 @@ export function useAgentMessages() { activeRefs.current.delete(msg.catId); // Consume the finalized ref — callback successfully replaced the bubble finalizedStreamRef.current.delete(msg.catId); - if (invocationId) { - replacedInvocationsRef.current.set(msg.catId, invocationId); + if (explicitInvocationId) { + replacedInvocationsRef.current.set(msg.catId, explicitInvocationId); + } else if (replacementTarget?.matchKind === 'finalized_fallback') { + const suppressionInvocationId = + replacementTarget.replacementInvocationId ?? + (replacementTarget.fencedAt ? undefined : inferredInvocationId); + if (suppressionInvocationId) { + // finalized_fallback prefers the replaced bubble's own invocationId. + // Only unfenced late-bind fallback may borrow the current + // invocationId; once invocation_created fenced the ref, the + // current invocation belongs to the next run and must not be + // suppressed. + replacedInvocationsRef.current.set(msg.catId, suppressionInvocationId); + } + } else if (inferredInvocationId) { + // Exact or active invocationless placeholder matches still belong to + // the current invocation, so later stream chunks should be suppressed. + replacedInvocationsRef.current.set(msg.catId, inferredInvocationId); } } else { // Use backend messageId when available for rich_block correlation (#83 P2) const id = msg.messageId ?? `msg-${Date.now()}-${msg.catId}-cb-${++cbSeq}`; const extraForAdd = { ...(msg.extra?.crossPost ? { crossPost: msg.extra.crossPost } : {}), - ...(hasExplicitInvocationId && msg.invocationId ? { stream: { invocationId: msg.invocationId } } : {}), + ...(explicitInvocationId ? { stream: { invocationId: explicitInvocationId } } : {}), + ...(!explicitInvocationId ? { callbackBridge: { skipDedup: true } } : {}), }; addMessage({ id, @@ -477,12 +530,19 @@ export function useAgentMessages() { // #586 Bug 1 (TD112): Callback created a new bubble because no stream // placeholder existed yet. Mark the invocation as replaced so that // late-arriving stream chunks for the same invocation are suppressed - // instead of spawning a second bubble. - const shouldLockReplacement = - invocationId && - (!hasExplicitInvocationId || getCurrentInvocationIdForCat(msg.catId) === msg.invocationId); - if (shouldLockReplacement) { - replacedInvocationsRef.current.set(msg.catId, invocationId); + // instead of spawning a second bubble. Explicit callback invocationIds + // are only safe when the current invocation is already bound to the + // same ID; otherwise a stale callback can suppress live stream chunks + // before invocation_created/catInvocations catches up. + if (explicitInvocationId) { + if (inferredInvocationId && inferredInvocationId === explicitInvocationId) { + replacedInvocationsRef.current.set(msg.catId, explicitInvocationId); + } + } else if (inferredInvocationId) { + // Invocationless callback-only bubbles still need the inferred lock + // even when an older finalized ref was not matchable; otherwise the + // current invocation can still spawn a duplicate late stream bubble. + replacedInvocationsRef.current.set(msg.catId, inferredInvocationId); } } } else { @@ -588,7 +648,11 @@ export function useAgentMessages() { // #586 follow-up: Record the finalized bubble so callback can find it // even after isStreaming=false + activeRefs cleared. Unlike a greedy // scan, this is scoped to the exact just-finalized message only. - finalizedStreamRef.current.set(msg.catId, messageId); + const finalizedMsg = useChatStore.getState().messages.find((m) => m.id === messageId); + finalizedStreamRef.current.set(msg.catId, { + messageId, + invocationId: finalizedMsg?.extra?.stream?.invocationId, + }); activeRefs.current.delete(msg.catId); } // Bugfix: clear stale invocationId so findRecoverableAssistantMessage @@ -685,11 +749,19 @@ export function useAgentMessages() { sysContent = mentions.map((m) => `${m.mentionedBy} @了 ${m.catId}`).join('、'); sysVariant = 'a2a_followup'; } else if (parsed?.type === 'invocation_created') { - // New invocation boundary: clear stale task snapshot + finalized ref for this cat. - // #586: Without clearing finalizedStreamRef here, a stale ref from the - // previous invocation could cause the next callback to overwrite the old message. + // #586 P3+P1: First invocation boundary fences finalizedStreamRef so + // immediate late callbacks from the previous invocation can still merge. + // If a second boundary arrives before the ref was consumed, the ref is + // stale across multiple invocations and must be invalidated. const targetCatId = parsed.catId ?? msg.catId; - finalizedStreamRef.current.delete(targetCatId); + const existingEntry = finalizedStreamRef.current.get(targetCatId); + if (existingEntry) { + if (existingEntry.fencedAt) { + finalizedStreamRef.current.delete(targetCatId); + } else { + existingEntry.fencedAt = Date.now(); + } + } const invocationId = typeof parsed.invocationId === 'string' ? parsed.invocationId : undefined; if (targetCatId && invocationId) { setCatInvocation(targetCatId, { diff --git a/packages/web/src/hooks/useSocket-background-system-info.ts b/packages/web/src/hooks/useSocket-background-system-info.ts index 2535ada7a..0dee14da7 100644 --- a/packages/web/src/hooks/useSocket-background-system-info.ts +++ b/packages/web/src/hooks/useSocket-background-system-info.ts @@ -44,10 +44,18 @@ export function consumeBackgroundSystemInfo( if (parsed?.type === 'invocation_created') { const targetCatId = parsed.catId ?? msg.catId; const invocationId = typeof parsed.invocationId === 'string' ? parsed.invocationId : undefined; - // #586: Clear stale finalizedBgRef so previous invocation's finalized bubble - // can't be overwritten by the next invocation's callback. + // #586 P1: First boundary fences finalizedBgRefs so immediate late callbacks + // can still merge; a second boundary means the ref is stale and must be + // invalidated before another callback-only invocation reuses it. const bgStreamKey = `${msg.threadId}::${targetCatId}`; - options.finalizedBgRefs.delete(bgStreamKey); + const existingBgEntry = options.finalizedBgRefs.get(bgStreamKey); + if (existingBgEntry) { + if (existingBgEntry.fencedAt) { + options.finalizedBgRefs.delete(bgStreamKey); + } else { + existingBgEntry.fencedAt = Date.now(); + } + } if (targetCatId && invocationId) { options.store.setThreadCatInvocation(msg.threadId, targetCatId, { invocationId, diff --git a/packages/web/src/hooks/useSocket-background.ts b/packages/web/src/hooks/useSocket-background.ts index 1e664659c..a2ceefa5e 100644 --- a/packages/web/src/hooks/useSocket-background.ts +++ b/packages/web/src/hooks/useSocket-background.ts @@ -1,6 +1,7 @@ import { recordDebugEvent } from '@/debug/invocationEventDebug'; import type { CatStatusType } from '@/stores/chat-types'; import { compactToolResultDetail } from '@/utils/toolPreview'; +import { isReplyTargetCompatible } from './reply-target-compat'; import type { ActiveRoutedAgentMessage, BackgroundAgentMessage, @@ -76,9 +77,14 @@ function stopTrackedStream( const existing = options.bgStreamRefs.get(streamKey); if (!existing) return undefined; options.store.setThreadMessageStreaming(msg.threadId, existing.id, false); - // #586 follow-up: Record finalized bubble ID so callback can find it + // #586 follow-up: Record finalized bubble so callback can find it // after bgStreamRefs is cleared and isStreaming is false. - options.finalizedBgRefs.set(streamKey, existing.id); + const threadMsgs = options.store.getThreadState(msg.threadId).messages; + const finalizedMsg = threadMsgs.find((m) => m.id === existing.id); + options.finalizedBgRefs.set(streamKey, { + messageId: existing.id, + invocationId: finalizedMsg?.extra?.stream?.invocationId, + }); options.bgStreamRefs.delete(streamKey); return existing; } @@ -148,8 +154,14 @@ function recoverStreamingMessage( function findBackgroundCallbackReplacementTarget( msg: BackgroundAgentMessage, options: HandleBackgroundMessageOptions, -): { id: string; invocationId: string | null } | null { - const invocationId = msg.invocationId ?? getThreadInvocationId(msg, options); +): { + id: string; + invocationId: string | null; + matchKind: 'exact' | 'active_invocationless' | 'finalized_fallback'; + fencedAt?: number; +} | null { + const explicitInvocationId = msg.invocationId; + const invocationId = explicitInvocationId ?? getThreadInvocationId(msg, options); const threadMessages = options.store.getThreadState(msg.threadId).messages; @@ -163,11 +175,15 @@ function findBackgroundCallbackReplacementTarget( m.origin === 'stream' && m.extra?.stream?.invocationId === invocationId ) { - return { id: m.id, invocationId }; + return { id: m.id, invocationId, matchKind: 'exact' }; } } } + if (explicitInvocationId) { + return null; + } + // #586 Bug 1: Fallback — find invocationless stream placeholder from the same cat. // Background system-info creates bg-rich/bg-think placeholders without invocationId; // without this fallback, callback creates a duplicate bubble alongside the placeholder. @@ -185,20 +201,34 @@ function findBackgroundCallbackReplacementTarget( m.isStreaming && !m.extra?.stream?.invocationId ) { - return { id: m.id, invocationId: invocationId ?? null }; + return { id: m.id, invocationId: invocationId ?? null, matchKind: 'active_invocationless' }; } } // #586 follow-up: Check finalizedBgRefs — the done handler records the exact // message ID of the just-finalized stream bubble. This avoids the greedy scan // that could match arbitrary historical messages (P1 from review). const streamKey = `${msg.threadId}::${msg.catId}`; - const finalizedId = options.finalizedBgRefs.get(streamKey); - if (finalizedId) { + const finalizedEntry = options.finalizedBgRefs.get(streamKey); + if (finalizedEntry) { const finalized = threadMessages.find( - (m) => m.id === finalizedId && m.type === 'assistant' && m.catId === msg.catId && m.origin === 'stream', + (m) => + m.id === finalizedEntry.messageId && m.type === 'assistant' && m.catId === msg.catId && m.origin === 'stream', ); if (finalized) { - return { id: finalized.id, invocationId: invocationId ?? null }; + // #586 P1 guard: time + content fence across invocation boundary + if (finalizedEntry.fencedAt) { + const LATE_CALLBACK_GRACE_MS = 5_000; + if (Date.now() - finalizedEntry.fencedAt > LATE_CALLBACK_GRACE_MS) return null; + if (msg.content !== undefined && finalized.content !== undefined && finalized.content !== msg.content) + return null; + } + if (!isReplyTargetCompatible(finalized.replyTo, msg.replyTo)) return null; + return { + id: finalized.id, + invocationId: finalizedEntry.invocationId ?? finalized.extra?.stream?.invocationId ?? null, + matchKind: 'finalized_fallback', + fencedAt: finalizedEntry.fencedAt, + }; } } @@ -334,6 +364,7 @@ export function handleBackgroundAgentMessage( let finalMsgId: string | undefined; if (msg.origin === 'callback') { + const explicitInvocationId = msg.invocationId; const replacementTarget = findBackgroundCallbackReplacementTarget(msg, options); if (replacementTarget) { const cbId = msg.messageId ?? replacementTarget.id; @@ -353,22 +384,37 @@ export function handleBackgroundAgentMessage( options.bgStreamRefs.delete(streamKey); // Consume finalized ref — callback successfully replaced options.finalizedBgRefs.delete(streamKey); - // #586 P1-2 fix: Only set replacedInvocations when we have a real invocationId. - // Fallback matches return null — writing a pseudo ID would permanently suppress - // future invocationless stream chunks via shouldSuppressLateBackgroundStreamChunk. - if (replacementTarget.invocationId) { - options.replacedInvocations.set(streamKey, replacementTarget.invocationId); + if (explicitInvocationId) { + options.replacedInvocations.set(streamKey, explicitInvocationId); + } else { + const suppressionInvocationId = + replacementTarget.invocationId ?? + (replacementTarget.matchKind === 'finalized_fallback' && !replacementTarget.fencedAt + ? getThreadInvocationId(msg, options) + : undefined); + if (suppressionInvocationId) { + // finalized_fallback prefers the replaced bubble's own invocationId. + // Only unfenced late-bind fallback may reuse the current thread + // invocation; once invocation_created fenced the ref, current state + // belongs to the next invocation and must not be suppressed. + options.replacedInvocations.set(streamKey, suppressionInvocationId); + } } finalMsgId = cbId; } else { const cbId = msg.messageId ?? `bg-cb-${msg.timestamp}-${msg.catId}-${options.nextBgSeq()}`; + const extraForAdd = { + ...(msg.extra?.crossPost ? { crossPost: msg.extra.crossPost } : {}), + ...(explicitInvocationId ? { stream: { invocationId: explicitInvocationId } } : {}), + ...(!explicitInvocationId ? { callbackBridge: { skipDedup: true } } : {}), + }; options.store.addMessageToThread(msg.threadId, { id: cbId, type: 'assistant', catId: msg.catId, content: msg.content, ...(msg.metadata ? { metadata: msg.metadata } : {}), - ...(msg.extra?.crossPost ? { extra: { crossPost: msg.extra.crossPost } } : {}), + ...(Object.keys(extraForAdd).length > 0 ? { extra: extraForAdd } : {}), ...(msg.mentionsUser ? { mentionsUser: true } : {}), ...(msg.replyTo ? { replyTo: msg.replyTo } : {}), ...(msg.replyPreview ? { replyPreview: msg.replyPreview } : {}), @@ -377,10 +423,20 @@ export function handleBackgroundAgentMessage( }); // #586 Bug 1 (TD112): Callback created new bubble without finding a stream // placeholder. Mark invocation as replaced so late background stream chunks - // are suppressed instead of spawning a duplicate bubble. - const bgInvocationId = msg.invocationId ?? getThreadInvocationId(msg, options); - if (bgInvocationId) { - options.replacedInvocations.set(streamKey, bgInvocationId); + // are suppressed instead of spawning a duplicate bubble. Explicit + // callback invocationIds are only safe when the current thread + // invocation is already bound to the same ID; otherwise a stale callback + // can suppress live background stream chunks before binding catches up. + const inferredInvocationId = getThreadInvocationId(msg, options); + if (explicitInvocationId) { + if (inferredInvocationId && inferredInvocationId === explicitInvocationId) { + options.replacedInvocations.set(streamKey, explicitInvocationId); + } + } else if (inferredInvocationId) { + // Invocationless callback-only bubbles still need the inferred lock + // even when an older finalized ref was not matchable for this callback; + // otherwise the current invocation can still spawn a duplicate bubble. + options.replacedInvocations.set(streamKey, inferredInvocationId); } finalMsgId = cbId; } diff --git a/packages/web/src/hooks/useSocket-background.types.ts b/packages/web/src/hooks/useSocket-background.types.ts index b33428e71..4b822d473 100644 --- a/packages/web/src/hooks/useSocket-background.types.ts +++ b/packages/web/src/hooks/useSocket-background.types.ts @@ -95,8 +95,9 @@ export interface HandleBackgroundMessageOptions { addToast: (toast: BackgroundToastInput) => void; /** #80 fix-C: Clear the done-timeout guard when a background thread completes */ clearDoneTimeout?: (threadId?: string) => void; - /** #586 follow-up: Just-finalized stream bubble IDs keyed by streamKey */ - finalizedBgRefs: Map; + /** #586 follow-up: Just-finalized stream bubble IDs keyed by streamKey. + * fencedAt: set by invocation_created to mark the boundary (P1 guard). */ + finalizedBgRefs: Map; } export type ActiveRoutedAgentMessage = { diff --git a/packages/web/src/hooks/useSocket.ts b/packages/web/src/hooks/useSocket.ts index 37abba0c5..6799d90c0 100644 --- a/packages/web/src/hooks/useSocket.ts +++ b/packages/web/src/hooks/useSocket.ts @@ -231,7 +231,9 @@ export function useSocket(callbacks: SocketCallbacks, threadId?: string) { const joinedRoomsRef = useRef>(new Set()); const bgStreamRefsRef = useRef>(new Map()); const bgReplacedInvocationsRef = useRef>(new Map()); - const bgFinalizedRefsRef = useRef>(new Map()); + const bgFinalizedRefsRef = useRef>( + new Map(), + ); const bgSeqRef = useRef(0); const userIdRef = useRef(getUserId()); const threadIdRef = useRef(threadId); diff --git a/packages/web/src/stores/__tests__/chatStore-multithread.test.ts b/packages/web/src/stores/__tests__/chatStore-multithread.test.ts index f113f4454..f10a22926 100644 --- a/packages/web/src/stores/__tests__/chatStore-multithread.test.ts +++ b/packages/web/src/stores/__tests__/chatStore-multithread.test.ts @@ -208,6 +208,153 @@ describe('chatStore multi-thread state', () => { ]); }); + it('TD112 Phase 3 P2: callback with SAME content does NOT merge without invocation evidence', () => { + const now = Date.now(); + const responseText = 'Stream output from inv-1'; + // Finalized stream bubble from a previous invocation (has invocationId, not streaming) + useChatStore.getState().addMessage({ + id: 'stream-inv1', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: now - 2000, + isStreaming: false, + }); + + // Late callback without invocationId, same content — should remain separate + // because the store no longer guesses cross-invocation identity. + useChatStore.getState().addMessage({ + id: 'late-callback', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'callback', + timestamp: now, + }); + + expect(useChatStore.getState().messages).toHaveLength(2); + expect(useChatStore.getState().messages[0]!.id).toBe('stream-inv1'); + expect(useChatStore.getState().messages[1]!.id).toBe('late-callback'); + }); + + it('TD112 Phase 3 P1: callback with DIFFERENT content does NOT merge (cross-invocation guard)', () => { + const now = Date.now(); + // Finalized stream from inv-1 + useChatStore.getState().addMessage({ + id: 'stream-inv1', + type: 'assistant', + catId: 'opus', + content: 'Response from inv-1', + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: now - 2000, + isStreaming: false, + }); + + // Callback from inv-2 (different content) — must NOT overwrite inv-1 + useChatStore.getState().addMessage({ + id: 'inv2-callback', + type: 'assistant', + catId: 'opus', + content: 'Scheduled task created', + origin: 'callback', + timestamp: now, + }); + + // Content differs — Phase 3 should NOT merge + expect(useChatStore.getState().messages).toHaveLength(2); + expect(useChatStore.getState().messages[0]!.content).toBe('Response from inv-1'); + expect(useChatStore.getState().messages[1]!.content).toBe('Scheduled task created'); + }); + + it('TD112 Phase 3: does NOT merge callback into old (>30s) finalized stream', () => { + const now = Date.now(); + useChatStore.getState().addMessage({ + id: 'old-stream', + type: 'assistant', + catId: 'opus', + content: 'Old stream', + origin: 'stream', + extra: { stream: { invocationId: 'inv-old' } }, + timestamp: now - 60_000, // 60s ago — too old + isStreaming: false, + }); + + useChatStore.getState().addMessage({ + id: 'new-callback', + type: 'assistant', + catId: 'opus', + content: 'New callback', + origin: 'callback', + timestamp: now, + }); + + // Should NOT merge — time gap too large + expect(useChatStore.getState().messages).toHaveLength(2); + }); + + it('TD112 Phase 3: does NOT merge callback into still-streaming bubble (with invocationId)', () => { + const now = Date.now(); + // Bubble has invocationId (so Phase 2 soft rule is skipped) + // and is still streaming (Phase 3 should skip it) + useChatStore.getState().addMessage({ + id: 'active-stream', + type: 'assistant', + catId: 'opus', + content: 'Still streaming...', + origin: 'stream', + extra: { stream: { invocationId: 'inv-active' } }, + timestamp: now - 1000, + isStreaming: true, // Still active + }); + + useChatStore.getState().addMessage({ + id: 'cb-msg', + type: 'assistant', + catId: 'opus', + content: 'Callback from different source', + origin: 'callback', + timestamp: now, + }); + + // Phase 1: no match (callback has no invocationId) + // Phase 2: skipped (existing has invocationId) + // Phase 3: skipped (existing is still streaming) + expect(useChatStore.getState().messages).toHaveLength(2); + }); + + it('TD112 Phase 3 P2: background callback with SAME content does NOT merge without invocation evidence', () => { + const now = Date.now(); + const responseText = 'Stream output from inv-1'; + + useChatStore.getState().addMessageToThread('thread-b', { + id: 'bg-stream-inv1', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'stream', + extra: { stream: { invocationId: 'inv-1' } }, + timestamp: now - 2000, + isStreaming: false, + }); + + useChatStore.getState().addMessageToThread('thread-b', { + id: 'bg-late-callback', + type: 'assistant', + catId: 'opus', + content: responseText, + origin: 'callback', + timestamp: now, + }); + + const messages = useChatStore.getState().threadStates['thread-b']?.messages ?? []; + expect(messages).toHaveLength(2); + expect(messages[0]!.id).toBe('bg-stream-inv1'); + expect(messages[1]!.id).toBe('bg-late-callback'); + }); + it('replaces an optimistic background-thread message id in place', () => { useChatStore.getState().addMessageToThread('thread-b', makeMsg('temp-user-2', 'background')); diff --git a/packages/web/src/stores/chat-types.ts b/packages/web/src/stores/chat-types.ts index 5dd29ab79..5709f844b 100644 --- a/packages/web/src/stores/chat-types.ts +++ b/packages/web/src/stores/chat-types.ts @@ -240,6 +240,8 @@ export interface ChatMessage { crossPost?: { sourceThreadId: string; sourceInvocationId?: string }; /** F081: Stream identity for continuity / hydration reconcile */ stream?: { invocationId?: string }; + /** #586: Hook already ruled out callback→stream bridge; skip store dedup bridge too. */ + callbackBridge?: { skipDedup?: boolean }; /** F098-C1: Explicit target cats from post_message API */ targetCats?: string[]; /** F118 AC-C3: Timeout diagnostics for enhanced error display */ diff --git a/packages/web/src/stores/chatStore.ts b/packages/web/src/stores/chatStore.ts index 72a1b7ae8..1080d624d 100644 --- a/packages/web/src/stores/chatStore.ts +++ b/packages/web/src/stores/chatStore.ts @@ -316,6 +316,7 @@ function findAssistantDuplicate(messages: ChatMessage[], incoming: ChatMessage): if (incoming.type !== 'assistant' || !incoming.catId) return -1; const incomingInvId = getBubbleInvocationId(incoming); + const skipCallbackBridge = incoming.extra?.callbackBridge?.skipDedup === true; // Phase 1: Hard rule — scan ALL same-cat assistants for exact invocationId match. // Must run first because bridge/soft rules on a newer message would mis-associate. @@ -335,6 +336,7 @@ function findAssistantDuplicate(messages: ChatMessage[], incoming: ChatMessage): // must not merge into an invocationless stream from a different invocation. if (incoming.origin !== 'callback') return -1; if (incomingInvId) return -1; + if (skipCallbackBridge) return -1; for (let i = messages.length - 1; i >= 0; i--) { const existing = messages[i]!; @@ -358,6 +360,11 @@ function findAssistantDuplicate(messages: ChatMessage[], incoming: ChatMessage): break; } + // Phase 3 intentionally disabled: when a callback arrives without invocation + // evidence, the store must not merge it into a finalized stream bubble based + // on cat/reply/visibility/content heuristics alone. The hook-level finalized + // refs provide the precise late-callback bridge; the store should fall back to + // showing a separate bubble instead of risking a false cross-invocation merge. return -1; }