Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/api/src/routes/callback-document-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ export function registerCallbackDocumentRoutes(

const isNew = getRichBlockBuffer().add(record.threadId, record.catId as string, fileBlock, invocationId);

// #454: include invocationId so frontend can exact-match callback to stream bubble
if (isNew) {
deps.socketManager.broadcastAgentMessage(
{
type: 'system_info' as const,
catId: record.catId,
content: JSON.stringify({ type: 'rich_block', block: fileBlock }),
invocationId,
timestamp: Date.now(),
},
record.threadId,
Expand Down
6 changes: 5 additions & 1 deletion packages/api/src/routes/callbacks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ export const callbacksRoutes: FastifyPluginAsync<CallbackRoutesOptions> = async
content: storedContent,
origin: 'callback',
messageId: storedMsg.id,
...(invocationId ? { invocationId } : {}),
invocationId, // #454: always propagate — required by callback auth
// F52+F098-C1: Include crossPost + targetCats in real-time broadcast
...(isCrossThread || validExplicitTargets.length
? {
Expand All @@ -536,12 +536,14 @@ export const callbacksRoutes: FastifyPluginAsync<CallbackRoutesOptions> = async

// #83: Broadcast each extracted rich block as SSE event for live rendering
// P2 cloud-review: include messageId for frontend correlation
// #454: include invocationId so frontend can exact-match callback to stream bubble
for (const block of richBlocks) {
socketManager.broadcastAgentMessage(
{
type: 'system_info' as const,
catId: record.catId,
content: JSON.stringify({ type: 'rich_block', block, messageId: storedMsg.id }),
invocationId,
timestamp: Date.now(),
},
effectiveThreadId,
Expand Down Expand Up @@ -1126,12 +1128,14 @@ export const callbacksRoutes: FastifyPluginAsync<CallbackRoutesOptions> = async
const isNew = getRichBlockBuffer().add(record.threadId, record.catId as string, resolvedBlock, invocationId);

// Only broadcast new blocks (dedup retries at server to prevent frontend duplicates)
// #454: include invocationId so frontend can exact-match callback to stream bubble
if (isNew) {
socketManager.broadcastAgentMessage(
{
type: 'system_info' as const,
catId: record.catId,
content: JSON.stringify({ type: 'rich_block', block: resolvedBlock }),
invocationId,
timestamp: Date.now(),
},
record.threadId,
Expand Down
94 changes: 94 additions & 0 deletions packages/api/test/callback-routes.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,100 @@ describe('Callback Routes', () => {
assert.equal(typeof parsed.messageId, 'string');
});

// ---- #454: All callback broadcasts must include invocationId ----

test('#454: text broadcast always includes invocationId', async () => {
const app = await createApp();
const { invocationId, callbackToken } = registry.create('user-1', 'opus', 'thread-454-text');

await app.inject({
method: 'POST',
url: '/api/callbacks/post-message',
payload: { invocationId, callbackToken, content: 'Hello' },
});

const msgs = socketManager.getMessages();
const textMsg = msgs.find((m) => m.type === 'text');
assert.ok(textMsg, 'text broadcast should exist');
assert.equal(textMsg.invocationId, invocationId, 'text broadcast must include invocationId');
});

test('#454: rich_block system_info broadcast includes invocationId', async () => {
const app = await createApp();
const { invocationId, callbackToken } = registry.create('user-1', 'opus', 'thread-454-rich');

const richPayload = JSON.stringify({
v: 1,
blocks: [{ id: 'diff-454', kind: 'diff', v: 1, filePath: 'src/bar.ts', diff: '- a\n+ b' }],
});
const content = `Fix:\n\`\`\`cc_rich\n${richPayload}\n\`\`\``;

await app.inject({
method: 'POST',
url: '/api/callbacks/post-message',
payload: { invocationId, callbackToken, content },
});

const msgs = socketManager.getMessages();
const richMsg = msgs.find((m) => m.type === 'system_info');
assert.ok(richMsg, 'rich_block system_info broadcast should exist');
assert.equal(richMsg.invocationId, invocationId, 'rich_block system_info broadcast must include invocationId');
});

test('#454: create-rich-block broadcast includes invocationId', async () => {
const app = await createApp();
const { invocationId, callbackToken } = registry.create('user-1', 'opus', 'thread-454-crb');

await app.inject({
method: 'POST',
url: '/api/callbacks/create-rich-block',
payload: {
invocationId,
callbackToken,
block: { id: 'card-454', kind: 'card', v: 1, title: 'Test', bodyMarkdown: 'hi' },
},
});

const msgs = socketManager.getMessages();
assert.ok(msgs.length >= 1, 'should have at least 1 broadcast');
assert.equal(msgs[0].invocationId, invocationId, 'create-rich-block broadcast must include invocationId');
});

test('#454: generate-document broadcast includes invocationId', async () => {
const { tmpdir } = await import('node:os');
const { rm } = await import('node:fs/promises');
const uploadDir = `${tmpdir()}/cat-cafe-test-uploads-454`;
process.env.UPLOAD_DIR = uploadDir;
try {
const app = await createApp();
const { invocationId, callbackToken } = registry.create('user-1', 'opus', 'thread-454-doc');

const res = await app.inject({
method: 'POST',
url: '/api/callbacks/generate-document',
payload: {
invocationId,
callbackToken,
markdown: '# Test Doc\nHello from #454',
format: 'md',
baseName: 'test-454',
},
});

assert.equal(res.statusCode, 200);
const body = JSON.parse(res.body);
assert.equal(body.status, 'ok');

const msgs = socketManager.getMessages();
const docMsg = msgs.find((m) => m.type === 'system_info' && JSON.parse(m.content).type === 'rich_block');
assert.ok(docMsg, 'generate-document should broadcast system_info with rich_block');
assert.equal(docMsg.invocationId, invocationId, 'generate-document broadcast must include invocationId');
} finally {
delete process.env.UPLOAD_DIR;
await rm(uploadDir, { recursive: true, force: true }).catch(() => {});
}
});

test('POST post-message without cc_rich blocks stores content as-is (no extra.rich)', async () => {
const app = await createApp();
const { invocationId, callbackToken } = registry.create('user-1', 'opus', 'thread-rb3');
Expand Down
Loading