Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions agents-api/__snapshots__/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -41355,6 +41355,109 @@
}
}
},
"/run/v1/conversations/{conversationId}/stream": {
"get": {
"description": "Reconnects to an active in-progress stream for the conversation. Returns 204 if no active stream exists.",
"operationId": "resume-conversation-stream",
"parameters": [
{
"in": "path",
"name": "conversationId",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"content": {
"text/event-stream": {
"schema": {
"type": "string"
}
}
},
"description": "Active stream — replays from beginning"
},
"204": {
"description": "No active stream"
},
"400": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/BadRequest"
}
}
},
"description": "Bad Request"
},
"401": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/Unauthorized"
}
}
},
"description": "Unauthorized"
},
"403": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/Forbidden"
}
}
},
"description": "Forbidden"
},
"404": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/NotFound"
}
}
},
"description": "Not Found"
},
"422": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/UnprocessableEntity"
}
}
},
"description": "Unprocessable Entity"
},
"500": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/InternalServerError"
}
}
},
"description": "Internal Server Error"
}
},
"security": [
{
"bearerAuth": []
}
],
"summary": "Resume Conversation Stream",
"tags": [
"Conversations"
],
"x-authz": {
"description": "Requires a valid API key (Bearer token). Auth is enforced by runApiKeyAuth middleware in createApp.ts."
}
}
},
"/run/v1/mcp": {
"post": {
"description": "Handles Model Context Protocol (MCP) JSON-RPC requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,10 @@ describe('App Credential Authentication', () => {
verifyPoWMock.mockResolvedValueOnce({ ok: false, error: 'pow_required' });

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));
app.post('/', (c) => c.text('OK'));

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -392,9 +393,10 @@ describe('App Credential Authentication', () => {
verifyPoWMock.mockResolvedValueOnce({ ok: false, error: 'pow_invalid' });

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));
app.post('/', (c) => c.text('OK'));

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -414,9 +416,10 @@ describe('App Credential Authentication', () => {
verifyPoWMock.mockResolvedValueOnce({ ok: false, error: 'pow_expired' });

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));
app.post('/', (c) => c.text('OK'));

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -431,7 +434,7 @@ describe('App Credential Authentication', () => {
);
});

it('should succeed when PoW passes', async () => {
it('should succeed when PoW passes on POST', async () => {
const appRecord = makeWebClientApp();
getAppByIdMock.mockReturnValue(vi.fn().mockResolvedValue(appRecord));
validateOriginMock.mockReturnValue(true);
Expand All @@ -447,12 +450,13 @@ describe('App Credential Authentication', () => {
});

app.use('*', apiKeyAuth());
app.get('/', (c) => {
app.post('/', (c) => {
const ctx = (c as any).get('executionContext');
return c.json(ctx);
});

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -465,6 +469,36 @@ describe('App Credential Authentication', () => {
expect(verifyPoWMock).toHaveBeenCalled();
});

it('should skip PoW for GET requests (e.g. stream resume)', async () => {
const appRecord = makeWebClientApp();
getAppByIdMock.mockReturnValue(vi.fn().mockResolvedValue(appRecord));
validateOriginMock.mockReturnValue(true);
jwtVerifyMock.mockResolvedValueOnce({
payload: {
sub: 'anon_test-uuid',
app: 'app-id-1',
tid: 'tenant_1',
pid: 'project_1',
type: 'anonymous',
},
});

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));

const res = await app.request('/', {
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
'x-inkeep-agent-id': 'agent-1',
Origin: 'https://help.customer.com',
},
});

expect(res.status).toBe(200);
expect(verifyPoWMock).not.toHaveBeenCalled();
});

it('should not call verifyPoW for non-web_client app types', async () => {
getAppByIdMock.mockReturnValue(
vi.fn().mockResolvedValue({
Expand Down
37 changes: 30 additions & 7 deletions agents-api/src/domains/run/routes/chatDataStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { ExecutionHandler } from '../handlers/executionHandler';
import { buildPersistedMessageContent } from '../services/blob-storage/file-upload-helpers';
import { pendingToolApprovalManager } from '../session/PendingToolApprovalManager';
import { toolApprovalUiBus } from '../session/ToolApprovalUiBus';
import { streamBufferRegistry } from '../stream/stream-buffer-registry';
import { createBufferingStreamHelper, createVercelStreamHelper } from '../stream/stream-helpers';
import { VercelMessageSchema } from '../types/chat';
import { errorOp } from '../utils/agent-operations';
Expand Down Expand Up @@ -427,12 +428,16 @@ app.openapi(chatDataStreamRoute, async (c) => {
c.header('connection', 'keep-alive');
c.header('x-vercel-ai-data-stream', 'v2');
c.header('x-accel-buffering', 'no');
streamBufferRegistry.register(conversationId);
return stream(c, async (s) => {
try {
const encoder = new TextEncoder();
const reader = run.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const encoded = typeof value === 'string' ? encoder.encode(value) : value;
streamBufferRegistry.push(conversationId, encoded);
await s.write(value);
}
} catch (error) {
Expand All @@ -441,6 +446,8 @@ app.openapi(chatDataStreamRoute, async (c) => {
'Error streaming durable execution via /chat'
);
await s.write(`event: error\ndata: ${JSON.stringify({ error: 'Stream error' })}\n\n`);
} finally {
streamBufferRegistry.complete(conversationId);
}
});
}
Expand Down Expand Up @@ -608,13 +615,29 @@ app.openapi(chatDataStreamRoute, async (c) => {
c.header('x-vercel-ai-data-stream', 'v2');
c.header('x-accel-buffering', 'no'); // disable nginx buffering

return stream(c, (stream) =>
stream.pipe(
dataStream
.pipeThrough(new JsonToSseTransformStream())
.pipeThrough(new TextEncoderStream())
)
);
const encodedStream = dataStream
.pipeThrough(new JsonToSseTransformStream())
.pipeThrough(new TextEncoderStream());

const [clientStream, bufferStream] = encodedStream.tee();

streamBufferRegistry.register(conversationId);
(async () => {
const reader = bufferStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
streamBufferRegistry.push(conversationId, value);
}
} catch (error) {
logger.error({ error, conversationId }, 'Error buffering stream for resumption');
} finally {
streamBufferRegistry.complete(conversationId);
}
})();

return stream(c, (s) => s.pipe(clientStream));
});
} catch (error) {
if (error instanceof HTTPException) {
Expand Down
66 changes: 66 additions & 0 deletions agents-api/src/domains/run/routes/conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import {
toISODateString,
} from '@inkeep/agents-core';
import { createProtectedRoute, inheritedRunApiKeyAuth } from '@inkeep/agents-core/middleware';
import { stream } from 'hono/streaming';
import runDbClient from '../../../data/db/runDbClient';
import { getLogger } from '../../../logger';
import { resolveMessagesListBlobUris } from '../services/blob-storage/resolve-blob-uris';
import { streamBufferRegistry } from '../stream/stream-buffer-registry';

const logger = getLogger('run-conversations');

Expand Down Expand Up @@ -396,4 +398,68 @@ app.openapi(
}
);

const resumeConversationStreamRoute = createProtectedRoute({
method: 'get',
path: '/{conversationId}/stream',
summary: 'Resume Conversation Stream',
description:
'Reconnects to an active in-progress stream for the conversation. Returns 204 if no active stream exists.',
operationId: 'resume-conversation-stream',
tags: ['Conversations'],
security: [{ bearerAuth: [] }],
permission: inheritedRunApiKeyAuth(),
request: {
params: z.object({ conversationId: z.string() }),
},
responses: {
200: {
description: 'Active stream — replays from beginning',
content: { 'text/event-stream': { schema: z.string() } },
},
204: { description: 'No active stream' },
...commonGetErrorResponses,
},
});

app.openapi(resumeConversationStreamRoute, async (c) => {
const executionContext = c.get('executionContext');
const { tenantId, projectId } = executionContext;
const endUserId = requireEndUserId(executionContext);
const { conversationId } = c.req.valid('param');

const readable = streamBufferRegistry.createReadable(conversationId);

if (readable) {
const conversation = await getConversation(runDbClient)({
scopes: { tenantId, projectId },
conversationId,
});

if (conversation && conversation.userId !== endUserId) {
return new Response(null, { status: 204 });
}

logger.debug({ conversationId }, 'Resuming conversation stream');

c.header('content-type', 'text/event-stream');
c.header('cache-control', 'no-cache');
c.header('connection', 'keep-alive');
c.header('x-vercel-ai-data-stream', 'v2');
c.header('x-accel-buffering', 'no');

return stream(c, (s) => s.pipe(readable));
}

const conversation = await getConversation(runDbClient)({
scopes: { tenantId, projectId },
conversationId,
});

if (!conversation || conversation.userId !== endUserId) {
return new Response(null, { status: 204 });
}

return new Response(null, { status: 204 });
});

export default app;
Loading
Loading