Skip to content
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0b74f10
fix(web): late callback dedup — preserve finalizedStreamRef across in…
mindfn Apr 9, 2026
28ad49d
fix(web): add cross-invocation content fence to prevent silent histor…
mindfn Apr 9, 2026
770a8a3
chore: drop unrelated pnpm-lock.yaml churn (gpt52 P3)
mindfn Apr 9, 2026
17cb488
fix(web): use !== undefined instead of truthy for content guard (gpt5…
mindfn Apr 10, 2026
3929d8b
fix(ci): satisfy biome formatting for content guards [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
2c66ae8
fix(web): invalidate stale finalized refs across multiple invocation …
mindfn Apr 10, 2026
8f4204a
fix(ci): format IME guard test after main sync [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
5c67618
fix(web): keep unmatched callback bubbles invocationless [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
ad2f6b1
fix(web): restore callback-first late-stream suppression [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
f4c01ad
fix(web): avoid inferred lock on fenced callback fallback [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
13fb8ef
fix(web): preserve suppression for invocationless callback merges [砚砚…
mindfn Apr 10, 2026
f0b17dc
fix(web): skip bg fallback for explicit callback invocations [砚砚/GPT-…
mindfn Apr 10, 2026
859f552
fix(web): keep suppression after non-matchable callback bubbles [砚砚/G…
mindfn Apr 10, 2026
3c945b3
fix(web): drop store phase-3 callback bridge [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
063ea0c
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 10, 2026
90c5868
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 10, 2026
1f66cdb
fix(web): keep finalized fallback late-stream lock [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
166eeeb
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 10, 2026
ee95383
fix(web): restore late-bound finalized fallback suppression [砚砚/GPT-5…
mindfn Apr 10, 2026
d0c5d1a
fix(web): avoid stale explicit callback suppression [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
1a81c8f
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 10, 2026
7bf1bb5
fix(web): gate finalized fallback by reply target [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
ed28553
fix(web): tolerate unknown finalized reply target [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
f9bcc66
fix(web): treat missing reply target as unknown evidence [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
6914bb7
fix(web): fence finalized fallback suppression [砚砚/GPT-5.4🐾]
mindfn Apr 10, 2026
a033cd2
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 11, 2026
500393b
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 11, 2026
f8c6c79
fix(api): format observability coverage test [砚砚/GPT-5.4🐾]
mindfn Apr 11, 2026
58af3b4
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 11, 2026
7abae7f
Merge branch 'main' into fix/late-callback-dedup
mindfn Apr 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,091 changes: 1,091 additions & 0 deletions packages/web/src/hooks/__tests__/useAgentMessages-late-callback-dedup.test.ts

Large diffs are not rendered by default.

939 changes: 938 additions & 1 deletion packages/web/src/hooks/__tests__/useSocket-background.test.ts

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions packages/web/src/hooks/reply-target-compat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export function isReplyTargetCompatible(existingReplyTo?: string, incomingReplyTo?: string): boolean {
if (existingReplyTo == null || incomingReplyTo == null) {
return true;
}
return existingReplyTo === incomingReplyTo;
}
224 changes: 148 additions & 76 deletions packages/web/src/hooks/useAgentMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { useCallback, useEffect, useRef } from 'react';
import { recordDebugEvent } from '@/debug/invocationEventDebug';
import { useChatStore } from '@/stores/chatStore';
import { compactToolResultDetail } from '@/utils/toolPreview';
import { isReplyTargetCompatible } from './reply-target-compat';

/** Timeout for done(isFinal) - 5 minutes */
const DONE_TIMEOUT_MS = 5 * 60 * 1000;
Expand Down Expand Up @@ -106,8 +107,12 @@ export function useAgentMessages() {

/** #586 follow-up: Track just-finalized stream bubble per cat. Set on done when
* activeRefs entry existed, consumed by callback replacement or next invocation start.
* Prevents the greedy scan from matching arbitrary historical messages. */
const finalizedStreamRef = useRef<Map<string, string>>(new Map());
* Prevents the greedy scan from matching arbitrary historical messages.
* fencedAt: set by invocation_created to mark the boundary — allows late callbacks
* within a grace period but blocks new-invocation callbacks (P1 guard). */
const finalizedStreamRef = useRef<Map<string, { messageId: string; invocationId?: string; fencedAt?: number }>>(
new Map(),
);

/** Bug C P2: Track whether stream data was received per cat (avoids false catch-up on callback-only flows) */
const sawStreamDataRef = useRef<Set<string>>(new Set());
Expand Down Expand Up @@ -256,69 +261,97 @@ export function useAgentMessages() {
[getCurrentInvocationIdForCat],
);

const findCallbackReplacementTarget = useCallback((catId: string, invocationId: string): { id: string } | null => {
const currentMessages = useChatStore.getState().messages;
for (let i = currentMessages.length - 1; i >= 0; i -= 1) {
const msg = currentMessages[i];
if (
msg?.type === 'assistant' &&
msg.catId === catId &&
msg.origin === 'stream' &&
msg.extra?.stream?.invocationId === invocationId
) {
return { id: msg.id };
const findCallbackReplacementTarget = useCallback(
(catId: string, invocationId: string): { id: string; matchKind: 'exact' } | null => {
const currentMessages = useChatStore.getState().messages;
for (let i = currentMessages.length - 1; i >= 0; i -= 1) {
const msg = currentMessages[i];
if (
msg?.type === 'assistant' &&
msg.catId === catId &&
msg.origin === 'stream' &&
msg.extra?.stream?.invocationId === invocationId
) {
return { id: msg.id, matchKind: 'exact' };
}
}
}
return null;
}, []);
return null;
},
[],
);

const findInvocationlessStreamPlaceholder = useCallback((catId: string): { id: string } | null => {
const currentMessages = useChatStore.getState().messages;
const activeId = activeRefs.current.get(catId)?.id;
const findInvocationlessStreamPlaceholder = useCallback(
(
catId: string,
callbackContent?: string,
callbackReplyTo?: string,
):
| { id: string; matchKind: 'active_invocationless' }
| { id: string; matchKind: 'finalized_fallback'; replacementInvocationId?: string; fencedAt?: number }
| null => {
const currentMessages = useChatStore.getState().messages;
const activeId = activeRefs.current.get(catId)?.id;

if (activeId) {
const activeMessage = currentMessages.find(
(msg) =>
msg.id === activeId &&
msg.type === 'assistant' &&
msg.catId === catId &&
msg.origin === 'stream' &&
!msg.extra?.stream?.invocationId,
);
if (activeMessage) {
return { id: activeMessage.id };
if (activeId) {
const activeMessage = currentMessages.find(
(msg) =>
msg.id === activeId &&
msg.type === 'assistant' &&
msg.catId === catId &&
msg.origin === 'stream' &&
!msg.extra?.stream?.invocationId,
);
if (activeMessage) {
return { id: activeMessage.id, matchKind: 'active_invocationless' };
}
}
}

// First pass: find actively-streaming invocationless bubble
for (let i = currentMessages.length - 1; i >= 0; i -= 1) {
const msg = currentMessages[i];
if (
msg?.type === 'assistant' &&
msg.catId === catId &&
msg.origin === 'stream' &&
msg.isStreaming &&
!msg.extra?.stream?.invocationId
) {
return { id: msg.id };
// First pass: find actively-streaming invocationless bubble
for (let i = currentMessages.length - 1; i >= 0; i -= 1) {
const msg = currentMessages[i];
if (
msg?.type === 'assistant' &&
msg.catId === catId &&
msg.origin === 'stream' &&
msg.isStreaming &&
!msg.extra?.stream?.invocationId
) {
return { id: msg.id, matchKind: 'active_invocationless' };
}
}
}

// #586 follow-up: Check finalizedStreamRef — the done handler records the
// exact message ID of the just-finalized stream bubble. This avoids the
// greedy scan that could match arbitrary historical messages (P1 from review).
const finalizedId = finalizedStreamRef.current.get(catId);
if (finalizedId) {
const finalized = currentMessages.find(
(m) => m.id === finalizedId && m.type === 'assistant' && m.catId === catId && m.origin === 'stream',
);
if (finalized) {
return { id: finalized.id };
// #586 follow-up: Check finalizedStreamRef — the done handler records the
// exact message ID of the just-finalized stream bubble. This avoids the
// greedy scan that could match arbitrary historical messages (P1 from review).
const finalizedEntry = finalizedStreamRef.current.get(catId);
if (finalizedEntry) {
const finalized = currentMessages.find(
(m) =>
m.id === finalizedEntry.messageId && m.type === 'assistant' && m.catId === catId && m.origin === 'stream',
);
if (finalized) {
// #586 P1 guard: if invocation_created has fired since finalization
// (fencedAt is set), apply time + content fence to prevent a callback-only
// inv-2 from silently overwriting inv-1's finalized bubble.
if (finalizedEntry.fencedAt) {
const LATE_CALLBACK_GRACE_MS = 5_000;
if (Date.now() - finalizedEntry.fencedAt > LATE_CALLBACK_GRACE_MS) return null;
if (callbackContent !== undefined && finalized.content !== callbackContent) return null;
}
if (!isReplyTargetCompatible(finalized.replyTo, callbackReplyTo)) return null;
return {
id: finalized.id,
matchKind: 'finalized_fallback',
replacementInvocationId: finalizedEntry.invocationId ?? finalized.extra?.stream?.invocationId,
fencedAt: finalizedEntry.fencedAt,
};
}
}
}

return null;
}, []);
return null;
},
[],
);

const getOrRecoverActiveAssistantMessageId = useCallback(
(catId: string, metadata?: AgentMsg['metadata'], options?: { ensureStreaming?: boolean }): string | null => {
Expand Down Expand Up @@ -426,12 +459,15 @@ export function useAgentMessages() {
}

if (msg.origin === 'callback') {
const invocationId = msg.invocationId ?? getCurrentInvocationIdForCat(msg.catId);
const hasExplicitInvocationId = !!msg.invocationId;
const replacementTarget = invocationId
? (findCallbackReplacementTarget(msg.catId, invocationId) ??
(hasExplicitInvocationId ? null : findInvocationlessStreamPlaceholder(msg.catId)))
: findInvocationlessStreamPlaceholder(msg.catId);
const explicitInvocationId = msg.invocationId;
const inferredInvocationId = getCurrentInvocationIdForCat(msg.catId);
const invocationId = explicitInvocationId ?? inferredInvocationId;
const hasExplicitInvocationId = !!explicitInvocationId;
const exactReplacementTarget = invocationId ? findCallbackReplacementTarget(msg.catId, invocationId) : null;
const fallbackReplacementTarget = hasExplicitInvocationId
? null
: findInvocationlessStreamPlaceholder(msg.catId, msg.content, msg.replyTo);
const replacementTarget = exactReplacementTarget ?? fallbackReplacementTarget;

if (replacementTarget) {
const finalId = msg.messageId ?? replacementTarget.id;
Expand All @@ -451,15 +487,32 @@ export function useAgentMessages() {
activeRefs.current.delete(msg.catId);
// Consume the finalized ref — callback successfully replaced the bubble
finalizedStreamRef.current.delete(msg.catId);
if (invocationId) {
replacedInvocationsRef.current.set(msg.catId, invocationId);
if (explicitInvocationId) {
replacedInvocationsRef.current.set(msg.catId, explicitInvocationId);
} else if (replacementTarget?.matchKind === 'finalized_fallback') {
const suppressionInvocationId =
replacementTarget.replacementInvocationId ??
(replacementTarget.fencedAt ? undefined : inferredInvocationId);
if (suppressionInvocationId) {
// finalized_fallback prefers the replaced bubble's own invocationId.
// Only unfenced late-bind fallback may borrow the current
// invocationId; once invocation_created fenced the ref, the
// current invocation belongs to the next run and must not be
// suppressed.
replacedInvocationsRef.current.set(msg.catId, suppressionInvocationId);
}
} else if (inferredInvocationId) {
// Exact or active invocationless placeholder matches still belong to
// the current invocation, so later stream chunks should be suppressed.
replacedInvocationsRef.current.set(msg.catId, inferredInvocationId);
}
} else {
// Use backend messageId when available for rich_block correlation (#83 P2)
const id = msg.messageId ?? `msg-${Date.now()}-${msg.catId}-cb-${++cbSeq}`;
const extraForAdd = {
...(msg.extra?.crossPost ? { crossPost: msg.extra.crossPost } : {}),
...(hasExplicitInvocationId && msg.invocationId ? { stream: { invocationId: msg.invocationId } } : {}),
...(explicitInvocationId ? { stream: { invocationId: explicitInvocationId } } : {}),
...(!explicitInvocationId ? { callbackBridge: { skipDedup: true } } : {}),
};
addMessage({
id,
Expand All @@ -477,12 +530,19 @@ export function useAgentMessages() {
// #586 Bug 1 (TD112): Callback created a new bubble because no stream
// placeholder existed yet. Mark the invocation as replaced so that
// late-arriving stream chunks for the same invocation are suppressed
// instead of spawning a second bubble.
const shouldLockReplacement =
invocationId &&
(!hasExplicitInvocationId || getCurrentInvocationIdForCat(msg.catId) === msg.invocationId);
if (shouldLockReplacement) {
replacedInvocationsRef.current.set(msg.catId, invocationId);
// instead of spawning a second bubble. Explicit callback invocationIds
// are only safe when the current invocation is already bound to the
// same ID; otherwise a stale callback can suppress live stream chunks
// before invocation_created/catInvocations catches up.
if (explicitInvocationId) {
if (inferredInvocationId && inferredInvocationId === explicitInvocationId) {
replacedInvocationsRef.current.set(msg.catId, explicitInvocationId);
}
} else if (inferredInvocationId) {
// Invocationless callback-only bubbles still need the inferred lock
// even when an older finalized ref was not matchable; otherwise the
// current invocation can still spawn a duplicate late stream bubble.
replacedInvocationsRef.current.set(msg.catId, inferredInvocationId);
}
}
} else {
Expand Down Expand Up @@ -588,7 +648,11 @@ export function useAgentMessages() {
// #586 follow-up: Record the finalized bubble so callback can find it
// even after isStreaming=false + activeRefs cleared. Unlike a greedy
// scan, this is scoped to the exact just-finalized message only.
finalizedStreamRef.current.set(msg.catId, messageId);
const finalizedMsg = useChatStore.getState().messages.find((m) => m.id === messageId);
finalizedStreamRef.current.set(msg.catId, {
messageId,
invocationId: finalizedMsg?.extra?.stream?.invocationId,
});
activeRefs.current.delete(msg.catId);
}
// Bugfix: clear stale invocationId so findRecoverableAssistantMessage
Expand Down Expand Up @@ -685,11 +749,19 @@ export function useAgentMessages() {
sysContent = mentions.map((m) => `${m.mentionedBy} @了 ${m.catId}`).join('、');
sysVariant = 'a2a_followup';
} else if (parsed?.type === 'invocation_created') {
// New invocation boundary: clear stale task snapshot + finalized ref for this cat.
// #586: Without clearing finalizedStreamRef here, a stale ref from the
// previous invocation could cause the next callback to overwrite the old message.
// #586 P3+P1: First invocation boundary fences finalizedStreamRef so
// immediate late callbacks from the previous invocation can still merge.
// If a second boundary arrives before the ref was consumed, the ref is
// stale across multiple invocations and must be invalidated.
const targetCatId = parsed.catId ?? msg.catId;
finalizedStreamRef.current.delete(targetCatId);
const existingEntry = finalizedStreamRef.current.get(targetCatId);
if (existingEntry) {
if (existingEntry.fencedAt) {
finalizedStreamRef.current.delete(targetCatId);
} else {
existingEntry.fencedAt = Date.now();
}
}
const invocationId = typeof parsed.invocationId === 'string' ? parsed.invocationId : undefined;
if (targetCatId && invocationId) {
setCatInvocation(targetCatId, {
Expand Down
14 changes: 11 additions & 3 deletions packages/web/src/hooks/useSocket-background-system-info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,18 @@ export function consumeBackgroundSystemInfo(
if (parsed?.type === 'invocation_created') {
const targetCatId = parsed.catId ?? msg.catId;
const invocationId = typeof parsed.invocationId === 'string' ? parsed.invocationId : undefined;
// #586: Clear stale finalizedBgRef so previous invocation's finalized bubble
// can't be overwritten by the next invocation's callback.
// #586 P1: First boundary fences finalizedBgRefs so immediate late callbacks
// can still merge; a second boundary means the ref is stale and must be
// invalidated before another callback-only invocation reuses it.
const bgStreamKey = `${msg.threadId}::${targetCatId}`;
options.finalizedBgRefs.delete(bgStreamKey);
const existingBgEntry = options.finalizedBgRefs.get(bgStreamKey);
if (existingBgEntry) {
if (existingBgEntry.fencedAt) {
options.finalizedBgRefs.delete(bgStreamKey);
} else {
existingBgEntry.fencedAt = Date.now();
}
}
if (targetCatId && invocationId) {
options.store.setThreadCatInvocation(msg.threadId, targetCatId, {
invocationId,
Expand Down
Loading
Loading