Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions agents-api/__snapshots__/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -41355,6 +41355,120 @@
}
}
},
"/run/v1/conversations/{conversationId}/stream": {
"get": {
"description": "Reconnects to an active in-progress stream for the conversation. Returns 204 if no active stream exists.",
"operationId": "resume-conversation-stream",
"parameters": [
{
"in": "path",
"name": "conversationId",
"required": true,
"schema": {
"type": "string"
}
},
{
"description": "Resume from after this chunk index (omit for full replay)",
"in": "query",
"name": "afterIdx",
"required": false,
"schema": {
"description": "Resume from after this chunk index (omit for full replay)",
"nullable": true,
"type": "integer"
}
}
],
"responses": {
"200": {
"content": {
"text/event-stream": {
"schema": {
"type": "string"
}
}
},
"description": "Active stream — replays from given index or beginning"
},
"204": {
"description": "No active stream"
},
"400": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/BadRequest"
}
}
},
"description": "Bad Request"
},
"401": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/Unauthorized"
}
}
},
"description": "Unauthorized"
},
"403": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/Forbidden"
}
}
},
"description": "Forbidden"
},
"404": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/NotFound"
}
}
},
"description": "Not Found"
},
"422": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/UnprocessableEntity"
}
}
},
"description": "Unprocessable Entity"
},
"500": {
"content": {
"application/problem+json": {
"schema": {
"$ref": "#/components/schemas/InternalServerError"
}
}
},
"description": "Internal Server Error"
}
},
"security": [
{
"bearerAuth": []
}
],
"summary": "Resume Conversation Stream",
"tags": [
"Conversations"
],
"x-authz": {
"description": "Requires a valid API key (Bearer token). Auth is enforced by runApiKeyAuth middleware in createApp.ts."
}
}
},
"/run/v1/mcp": {
"post": {
"description": "Handles Model Context Protocol (MCP) JSON-RPC requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,10 @@ describe('App Credential Authentication', () => {
verifyPoWMock.mockResolvedValueOnce({ ok: false, error: 'pow_required' });

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));
app.post('/', (c) => c.text('OK'));

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -392,9 +393,10 @@ describe('App Credential Authentication', () => {
verifyPoWMock.mockResolvedValueOnce({ ok: false, error: 'pow_invalid' });

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));
app.post('/', (c) => c.text('OK'));

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -414,9 +416,10 @@ describe('App Credential Authentication', () => {
verifyPoWMock.mockResolvedValueOnce({ ok: false, error: 'pow_expired' });

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));
app.post('/', (c) => c.text('OK'));

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -431,7 +434,7 @@ describe('App Credential Authentication', () => {
);
});

it('should succeed when PoW passes', async () => {
it('should succeed when PoW passes on POST', async () => {
const appRecord = makeWebClientApp();
getAppByIdMock.mockReturnValue(vi.fn().mockResolvedValue(appRecord));
validateOriginMock.mockReturnValue(true);
Expand All @@ -447,12 +450,13 @@ describe('App Credential Authentication', () => {
});

app.use('*', apiKeyAuth());
app.get('/', (c) => {
app.post('/', (c) => {
const ctx = (c as any).get('executionContext');
return c.json(ctx);
});

const res = await app.request('/', {
method: 'POST',
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
Expand All @@ -465,6 +469,36 @@ describe('App Credential Authentication', () => {
expect(verifyPoWMock).toHaveBeenCalled();
});

it('should skip PoW for GET requests (e.g. stream resume)', async () => {
const appRecord = makeWebClientApp();
getAppByIdMock.mockReturnValue(vi.fn().mockResolvedValue(appRecord));
validateOriginMock.mockReturnValue(true);
jwtVerifyMock.mockResolvedValueOnce({
payload: {
sub: 'anon_test-uuid',
app: 'app-id-1',
tid: 'tenant_1',
pid: 'project_1',
type: 'anonymous',
},
});

app.use('*', apiKeyAuth());
app.get('/', (c) => c.text('OK'));

const res = await app.request('/', {
headers: {
Authorization: `Bearer ${VALID_ANON_JWT}`,
'x-inkeep-app-id': 'app-id-1',
'x-inkeep-agent-id': 'agent-1',
Origin: 'https://help.customer.com',
},
});

expect(res.status).toBe(200);
expect(verifyPoWMock).not.toHaveBeenCalled();
});

it('should not call verifyPoW for non-web_client app types', async () => {
getAppByIdMock.mockReturnValue(
vi.fn().mockResolvedValue({
Expand Down
37 changes: 30 additions & 7 deletions agents-api/src/domains/run/routes/chatDataStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { ExecutionHandler } from '../handlers/executionHandler';
import { buildPersistedMessageContent } from '../services/blob-storage/file-upload-helpers';
import { pendingToolApprovalManager } from '../session/PendingToolApprovalManager';
import { toolApprovalUiBus } from '../session/ToolApprovalUiBus';
import { streamBufferRegistry } from '../stream/stream-buffer-registry';
import { createBufferingStreamHelper, createVercelStreamHelper } from '../stream/stream-helpers';
import { VercelMessageSchema } from '../types/chat';
import { errorOp } from '../utils/agent-operations';
Expand Down Expand Up @@ -427,12 +428,16 @@ app.openapi(chatDataStreamRoute, async (c) => {
c.header('connection', 'keep-alive');
c.header('x-vercel-ai-data-stream', 'v2');
c.header('x-accel-buffering', 'no');
streamBufferRegistry.register({ tenantId, projectId, conversationId });
return stream(c, async (s) => {
try {
const encoder = new TextEncoder();
const reader = run.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const encoded = typeof value === 'string' ? encoder.encode(value) : value;
streamBufferRegistry.push(conversationId, encoded);
await s.write(value);
}
} catch (error) {
Expand All @@ -441,6 +446,8 @@ app.openapi(chatDataStreamRoute, async (c) => {
'Error streaming durable execution via /chat'
);
await s.write(`event: error\ndata: ${JSON.stringify({ error: 'Stream error' })}\n\n`);
} finally {
await streamBufferRegistry.complete(conversationId);
}
});
}
Expand Down Expand Up @@ -608,13 +615,29 @@ app.openapi(chatDataStreamRoute, async (c) => {
c.header('x-vercel-ai-data-stream', 'v2');
c.header('x-accel-buffering', 'no'); // disable nginx buffering

return stream(c, (stream) =>
stream.pipe(
dataStream
.pipeThrough(new JsonToSseTransformStream())
.pipeThrough(new TextEncoderStream())
)
);
const encodedStream = dataStream
.pipeThrough(new JsonToSseTransformStream())
.pipeThrough(new TextEncoderStream());

const [clientStream, bufferStream] = encodedStream.tee();

streamBufferRegistry.register({ tenantId, projectId, conversationId });
(async () => {
const reader = bufferStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
streamBufferRegistry.push(conversationId, value);
}
} catch (error) {
logger.error({ error, conversationId }, 'Error buffering stream for resumption');
} finally {
await streamBufferRegistry.complete(conversationId);
}
})();

return stream(c, (s) => s.pipe(clientStream));
});
} catch (error) {
if (error instanceof HTTPException) {
Expand Down
Loading