Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/odd-signs-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/ai-chat": patch
---

Allow returning a non-streaming reponse from onChatMessage()
71 changes: 53 additions & 18 deletions packages/ai-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,33 @@ export class AIChatAgent<
this.broadcast(JSON.stringify(message), exclude);
}

/**
* Broadcasts a text event for non-SSE responses.
* This ensures plain text responses follow the AI SDK v5 stream protocol.
*
* @param streamId - The stream identifier for chunk storage
* @param event - The text event payload (text-start, text-delta with delta, or text-end)
* @param continuation - Whether this is a continuation of a previous stream
*/
private _broadcastTextEvent(
streamId: string,
event:
| { type: "text-start"; id: string }
| { type: "text-delta"; id: string; delta: string }
| { type: "text-end"; id: string },
continuation: boolean
) {
const body = JSON.stringify(event);
this._storeStreamChunk(streamId, body);
this._broadcastChatMessage({
body,
done: false,
id: event.id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
}

private _loadMessagesFromDb(): ChatMessage[] {
const rows =
this.sql`select * from cf_ai_chat_agent_messages order by created_at` ||
Expand Down Expand Up @@ -1458,11 +1485,32 @@ export class AIChatAgent<
}
}

// Determine response format based on content-type
const contentType = response.headers.get("content-type") || "";
const isSSE = contentType.includes("text/event-stream"); // AI SDK v5 SSE format

// if not AI SDK SSE format, we need to inject text-start and text-end events ourselves
if (!isSSE) {
this._broadcastTextEvent(
streamId,
{ type: "text-start", id },
continuation
);
}

let streamCompleted = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
if (!isSSE) {
this._broadcastTextEvent(
streamId,
{ type: "text-end", id },
continuation
);
}

// Mark the stream as completed
this._completeStream(streamId);
streamCompleted = true;
Expand All @@ -1479,10 +1527,6 @@ export class AIChatAgent<

const chunk = decoder.decode(value);

// Determine response format based on content-type
const contentType = response.headers.get("content-type") || "";
const isSSE = contentType.includes("text/event-stream");

// After streaming is complete, persist the complete assistant's response
if (isSSE) {
// Parse AI SDK v5 SSE format and extract text deltas
Expand Down Expand Up @@ -1892,20 +1936,11 @@ export class AIChatAgent<
// Treat the entire chunk as a text delta to preserve exact formatting
if (chunk.length > 0) {
message.parts.push({ type: "text", text: chunk });
// Synthesize a text-delta event so clients can stream-render
const chunkBody = JSON.stringify({
type: "text-delta",
delta: chunk
});
// Store chunk for replay on reconnection
this._storeStreamChunk(streamId, chunkBody);
this._broadcastChatMessage({
body: chunkBody,
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
this._broadcastTextEvent(
streamId,
{ type: "text-delta", id, delta: chunk },
continuation
);
}
}
}
Expand Down
21 changes: 2 additions & 19 deletions packages/ai-chat/src/tests/chat-context.test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,8 @@
import { createExecutionContext, env } from "cloudflare:test";
import { env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import { MessageType } from "../types";
import type { UIMessage as ChatMessage } from "ai";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

async function connectChatWS(path: string) {
const ctx = createExecutionContext();
const req = new Request(`http://example.com${path}`, {
headers: { Upgrade: "websocket" }
});
const res = await worker.fetch(req, env, ctx);
expect(res.status).toBe(101);
const ws = res.webSocket as WebSocket;
expect(ws).toBeDefined();
ws.accept();
return { ws, ctx };
}
import { connectChatWS } from "./test-utils";

describe("AIChatAgent Connection Context - Issue #711", () => {
it("getCurrentAgent() should return connection in onChatMessage and nested async functions (tool execute)", async () => {
Expand Down
40 changes: 11 additions & 29 deletions packages/ai-chat/src/tests/chat-persistence.test.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
import { createExecutionContext, env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import worker from "./worker";
import { MessageType } from "../types";
import type { UIMessage as ChatMessage } from "ai";
import { connectChatWS } from "./test-utils";

interface ToolCallPart {
type: string;
toolCallId: string;
state: "input-available" | "output-available";
input: Record<string, unknown>;
output?: unknown;
}

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

async function connectChatWS(path: string) {
const ctx = createExecutionContext();
const req = new Request(`http://example.com${path}`, {
headers: { Upgrade: "websocket" }
});
const res = await worker.fetch(req, env, ctx);
expect(res.status).toBe(101);
const ws = res.webSocket as WebSocket;
expect(ws).toBeDefined();
ws.accept();
return { ws, ctx };
}
// Type helper for tool call parts - extracts ToolUIPart from ChatMessage parts
type TestToolCallPart = Extract<
ChatMessage["parts"][number],
{ type: `tool-${string}` }
>;

describe("Chat Agent Persistence", () => {
it("persists new messages incrementally without deleting existing ones", async () => {
Expand Down Expand Up @@ -282,7 +264,7 @@ describe("Chat Agent Persistence", () => {
parts: [{ type: "text", text: "What time is it in London?" }]
};

const toolCallPart: ToolCallPart = {
const toolCallPart: TestToolCallPart = {
type: "tool-getLocalTime",
toolCallId: "call_456",
state: "input-available",
Expand All @@ -305,7 +287,7 @@ describe("Chat Agent Persistence", () => {
messagesAfterToolCall.find((m) => m.id === "assistant-1")
).toBeDefined();

const toolResultPart: ToolCallPart = {
const toolResultPart: TestToolCallPart = {
type: "tool-getLocalTime",
toolCallId: "call_456",
state: "output-available",
Expand Down Expand Up @@ -388,7 +370,7 @@ describe("Chat Agent Persistence", () => {
parts: [{ type: "text", text: "What time is it?" }]
};

const toolCallPart: ToolCallPart = {
const toolCallPart: TestToolCallPart = {
type: "tool-getLocalTime",
toolCallId: "call_123",
state: "input-available",
Expand All @@ -413,7 +395,7 @@ describe("Chat Agent Persistence", () => {
assistantResponse
]);

const toolResultPart: ToolCallPart = {
const toolResultPart: TestToolCallPart = {
type: "tool-getLocalTime",
toolCallId: "call_123",
state: "output-available",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import { createExecutionContext, env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import worker from "./worker";
import type { UIMessage as ChatMessage } from "ai";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

describe("Client-side tool duplicate message prevention", () => {
it("merges tool output into existing message by toolCallId", async () => {
const room = crypto.randomUUID();
Expand Down
20 changes: 1 addition & 19 deletions packages/ai-chat/src/tests/client-tools-broadcast.test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,7 @@
import { createExecutionContext, env } from "cloudflare:test";
import { describe, it, expect } from "vitest";
import worker, { type Env } from "./worker";
import { MessageType } from "../types";
import type { UIMessage as ChatMessage } from "ai";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}

async function connectChatWS(path: string) {
const ctx = createExecutionContext();
const req = new Request(`http://example.com${path}`, {
headers: { Upgrade: "websocket" }
});
const res = await worker.fetch(req, env, ctx);
expect(res.status).toBe(101);
const ws = res.webSocket as WebSocket;
expect(ws).toBeDefined();
ws.accept();
return { ws, ctx };
}
import { connectChatWS } from "./test-utils";

describe("Client Tools Broadcast", () => {
it("should not broadcast CF_AGENT_CHAT_MESSAGES back to the originating connection after chat request", async () => {
Expand Down
5 changes: 5 additions & 0 deletions packages/ai-chat/src/tests/cloudflare-test.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import type { Env } from "./worker";

declare module "cloudflare:test" {
interface ProvidedEnv extends Env {}
}
Loading
Loading