Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/odd-signs-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/ai-chat": patch
---

Allow returning a non-streaming reponse from onChatMessage()
71 changes: 53 additions & 18 deletions packages/ai-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,33 @@ export class AIChatAgent<
this.broadcast(JSON.stringify(message), exclude);
}

/**
* Broadcasts a text event for non-SSE responses.
* This ensures plain text responses follow the AI SDK v5 stream protocol.
*
* @param streamId - The stream identifier for chunk storage
* @param event - The text event payload (text-start, text-delta with delta, or text-end)
* @param continuation - Whether this is a continuation of a previous stream
*/
private _broadcastTextEvent(
streamId: string,
event:
| { type: "text-start"; id: string }
| { type: "text-delta"; id: string; delta: string }
| { type: "text-end"; id: string },
continuation: boolean
) {
const body = JSON.stringify(event);
this._storeStreamChunk(streamId, body);
this._broadcastChatMessage({
body,
done: false,
id: event.id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
}

private _loadMessagesFromDb(): ChatMessage[] {
const rows =
this.sql`select * from cf_ai_chat_agent_messages order by created_at` ||
Expand Down Expand Up @@ -1458,11 +1485,32 @@ export class AIChatAgent<
}
}

// Determine response format based on content-type
const contentType = response.headers.get("content-type") || "";
const isSSE = contentType.includes("text/event-stream"); // AI SDK v5 SSE format

// if not AI SDK SSE format, we need to inject text-start and text-end events ourselves
if (!isSSE) {
this._broadcastTextEvent(
streamId,
{ type: "text-start", id },
continuation
);
}

let streamCompleted = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
if (!isSSE) {
this._broadcastTextEvent(
streamId,
{ type: "text-end", id },
continuation
);
}

// Mark the stream as completed
this._completeStream(streamId);
streamCompleted = true;
Expand All @@ -1479,10 +1527,6 @@ export class AIChatAgent<

const chunk = decoder.decode(value);

// Determine response format based on content-type
const contentType = response.headers.get("content-type") || "";
const isSSE = contentType.includes("text/event-stream");

// After streaming is complete, persist the complete assistant's response
if (isSSE) {
// Parse AI SDK v5 SSE format and extract text deltas
Expand Down Expand Up @@ -1892,20 +1936,11 @@ export class AIChatAgent<
// Treat the entire chunk as a text delta to preserve exact formatting
if (chunk.length > 0) {
message.parts.push({ type: "text", text: chunk });
// Synthesize a text-delta event so clients can stream-render
const chunkBody = JSON.stringify({
type: "text-delta",
delta: chunk
});
// Store chunk for replay on reconnection
this._storeStreamChunk(streamId, chunkBody);
this._broadcastChatMessage({
body: chunkBody,
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
this._broadcastTextEvent(
streamId,
{ type: "text-delta", id, delta: chunk },
continuation
);
}
}
}
Expand Down
21 changes: 2 additions & 19 deletions packages/ai-chat/src/tests/chat-context.test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,8 @@
import { createExecutionContext, env } from "cloudflare:test";
import { env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import { MessageType } from "../types";
import type { UIMessage as ChatMessage } from "ai";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

async function connectChatWS(path: string) {
const ctx = createExecutionContext();
const req = new Request(`http://example.com${path}`, {
headers: { Upgrade: "websocket" }
});
const res = await worker.fetch(req, env, ctx);
expect(res.status).toBe(101);
const ws = res.webSocket as WebSocket;
expect(ws).toBeDefined();
ws.accept();
return { ws, ctx };
}
import { connectChatWS } from "./test-utils";

describe("AIChatAgent Connection Context - Issue #711", () => {
it("getCurrentAgent() should return connection in onChatMessage and nested async functions (tool execute)", async () => {
Expand Down
20 changes: 2 additions & 18 deletions packages/ai-chat/src/tests/chat-persistence.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { createExecutionContext, env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import worker from "./worker";
import { MessageType } from "../types";
import type { UIMessage as ChatMessage } from "ai";
import { connectChatWS } from "./test-utils";

interface ToolCallPart {
type: string;
Expand All @@ -12,23 +13,6 @@ interface ToolCallPart {
output?: unknown;
}

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

async function connectChatWS(path: string) {
const ctx = createExecutionContext();
const req = new Request(`http://example.com${path}`, {
headers: { Upgrade: "websocket" }
});
const res = await worker.fetch(req, env, ctx);
expect(res.status).toBe(101);
const ws = res.webSocket as WebSocket;
expect(ws).toBeDefined();
ws.accept();
return { ws, ctx };
}

describe("Chat Agent Persistence", () => {
it("persists new messages incrementally without deleting existing ones", async () => {
const room = crypto.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import { createExecutionContext, env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import worker from "./worker";
import type { UIMessage as ChatMessage } from "ai";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

describe("Client-side tool duplicate message prevention", () => {
it("merges tool output into existing message by toolCallId", async () => {
const room = crypto.randomUUID();
Expand Down
20 changes: 1 addition & 19 deletions packages/ai-chat/src/tests/client-tools-broadcast.test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,7 @@
import { createExecutionContext, env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import { MessageType } from "../types";
import type { UIMessage as ChatMessage } from "ai";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

async function connectChatWS(path: string) {
const ctx = createExecutionContext();
const req = new Request(`http://example.com${path}`, {
headers: { Upgrade: "websocket" }
});
const res = await worker.fetch(req, env, ctx);
expect(res.status).toBe(101);
const ws = res.webSocket as WebSocket;
expect(ws).toBeDefined();
ws.accept();
return { ws, ctx };
}
import { connectChatWS } from "./test-utils";

describe("Client Tools Broadcast", () => {
it("should not broadcast CF_AGENT_CHAT_MESSAGES back to the originating connection after chat request", async () => {
Expand Down
5 changes: 5 additions & 0 deletions packages/ai-chat/src/tests/cloudflare-test.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import type { Env } from "./worker";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}
Loading