diff --git a/docs/content/docs/api-reference/react-headless.mdx b/docs/content/docs/api-reference/react-headless.mdx index 22b2906bf..229290e31 100644 --- a/docs/content/docs/api-reference/react-headless.mdx +++ b/docs/content/docs/api-reference/react-headless.mdx @@ -108,6 +108,11 @@ function openAIReadableStreamAdapter(): StreamProtocolAdapter; // OpenAI Readabl function agUIAdapter(): StreamProtocolAdapter; // AG-UI protocol stream ``` +`agUIAdapter()` expects standard **SSE framing** (blank-line delimited events, one or more `data:` lines per event). It is resilient to arbitrary network chunk boundaries (partial lines / partial JSON), CRLF line endings, and comment keepalive lines. + +If your server emits a single JSON object per newline (NDJSON) rather than SSE, use `openAIReadableStreamAdapter()` (or implement a custom adapter). + + Related type: ```ts diff --git a/packages/react-headless/src/store/__tests__/createChatStore.test.ts b/packages/react-headless/src/store/__tests__/createChatStore.test.ts index c50d0dd1c..37e60e152 100644 --- a/packages/react-headless/src/store/__tests__/createChatStore.test.ts +++ b/packages/react-headless/src/store/__tests__/createChatStore.test.ts @@ -174,7 +174,7 @@ describe("createChatStore", () => { expect(result).toEqual(newThread); expect(store.getState().threads).toHaveLength(2); - expect(store.getState().threads.map((t) => t.id)).toContain("t-new"); + expect(store.getState().threads.map((t: Thread) => t.id)).toContain("t-new"); }); }); @@ -362,10 +362,12 @@ describe("createChatStore", () => { describe("cancelMessage", () => { it("aborts in-flight request", async () => { let capturedAbort: AbortController; - const processMessage = vi.fn().mockImplementation(({ abortController }) => { - capturedAbort = abortController; - return new Promise(() => {}); // never resolves - }); + const processMessage = vi.fn().mockImplementation( + ({ abortController }: { abortController: AbortController }) => { + capturedAbort = abortController; + return new Promise(() => {}); // never resolves + }, + ); const store = createChatStore({ processMessage, @@ -373,7 +375,7 @@ describe("createChatStore", () => { }); store.setState({ selectedThreadId: "t1" }); - const _promise = store.getState().processMessage({ role: "user", content: "hello" }); + store.getState().processMessage({ role: "user", content: "hello" }); await flushPromises(); expect(store.getState().isRunning).toBe(true); @@ -458,6 +460,93 @@ describe("createChatStore", () => { expect((store.getState().messages[1] as any).content).toBe("response text"); }); + it("handles SSE events split across arbitrary chunks", async () => { + const encoder = new TextEncoder(); + + const part1 = "data: {\"type\": \"TEXT_MESSAGE_CONTENT\", \"delta\": \"split"; + const part2 = " over chunks\"}\n\n"; + const done = "data: [DONE]\n\n"; + + const stream = new ReadableStream({ + start(c) { + c.enqueue(encoder.encode(part1)); + c.enqueue(encoder.encode(part2)); + c.enqueue(encoder.encode(done)); + c.close(); + }, + }); + + fetchSpy.mockResolvedValue(new Response(stream)); + + const store = createChatStore({ apiUrl: "/api/chat" }); + store.setState({ selectedThreadId: "t1" }); + + await store.getState().processMessage({ role: "user", content: "hello" }); + + expect(store.getState().messages).toHaveLength(2); + expect((store.getState().messages[1] as any).content).toBe("split over chunks"); + }); + + it("supports CRLF + multi-line data fields", async () => { + const encoder = new TextEncoder(); + + // SSE spec: multiple `data:` lines are joined with `\n`. + // Newlines between JSON tokens are valid whitespace. + const sse = + "data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\r\n" + + "data: \"delta\":\"multiline\"}\r\n" + + "\r\n" + + "data: [DONE]\r\n\r\n"; + + const stream = new ReadableStream({ + start(c) { + c.enqueue(encoder.encode(sse)); + c.close(); + }, + }); + + fetchSpy.mockResolvedValue(new Response(stream)); + + const store = createChatStore({ apiUrl: "/api/chat" }); + store.setState({ selectedThreadId: "t1" }); + + await store.getState().processMessage({ role: "user", content: "hello" }); + + expect(store.getState().messages).toHaveLength(2); + expect((store.getState().messages[1] as any).content).toBe("multiline"); + }); + + it("skips malformed events and continues streaming", async () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const encoder = new TextEncoder(); + + const sse = + "data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\"delta\":\"ok\"}\n\n" + + // malformed JSON + "data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\"delta\":\"bad\"\n\n" + + "data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\"delta\":\"still ok\"}\n\n" + + "data: [DONE]\n\n"; + + const stream = new ReadableStream({ + start(c) { + c.enqueue(encoder.encode(sse)); + c.close(); + }, + }); + + fetchSpy.mockResolvedValue(new Response(stream)); + + const store = createChatStore({ apiUrl: "/api/chat" }); + store.setState({ selectedThreadId: "t1" }); + + await store.getState().processMessage({ role: "user", content: "hello" }); + + expect(store.getState().messages).toHaveLength(2); + expect((store.getState().messages[1] as any).content).toBe("okstill ok"); + + warnSpy.mockRestore(); + }); + it("throws when neither apiUrl nor processMessage provided", async () => { const store = createChatStore({}); store.setState({ selectedThreadId: "t1" }); @@ -801,10 +890,12 @@ describe("createChatStore", () => { describe("selectThread while streaming", () => { it("cancels current stream and loads new thread", async () => { let capturedAbort: AbortController; - const processMessage = vi.fn().mockImplementation(({ abortController }) => { - capturedAbort = abortController; - return new Promise(() => {}); // never resolves - }); + const processMessage = vi.fn().mockImplementation( + ({ abortController }: { abortController: AbortController }) => { + capturedAbort = abortController; + return new Promise(() => {}); // never resolves + }, + ); const newMessages = [makeMessage("new-m1")]; const loadThread = vi.fn().mockResolvedValue(newMessages); diff --git a/packages/react-headless/src/stream/adapters/ag-ui.ts b/packages/react-headless/src/stream/adapters/ag-ui.ts index e934b0726..783fb1e17 100644 --- a/packages/react-headless/src/stream/adapters/ag-ui.ts +++ b/packages/react-headless/src/stream/adapters/ag-ui.ts @@ -1,29 +1,72 @@ import { AGUIEvent, StreamProtocolAdapter } from "../../types"; +const normalizeEols = (chunk: string) => chunk.replace(/\r\n/g, "\n"); + +const stripOptionalLeadingSpace = (value: string) => (value.startsWith(" ") ? value.slice(1) : value); + +type SSEBlock = { + data: string; +}; + +const parseSSEBlock = (block: string): SSEBlock => { + const dataLines: string[] = []; + + for (const rawLine of block.split("\n")) { + if (!rawLine) continue; + if (rawLine.startsWith(":")) continue; // comment / keepalive + + if (rawLine.startsWith("data:")) { + dataLines.push(stripOptionalLeadingSpace(rawLine.slice(5))); + } + } + + return { data: dataLines.join("\n") }; +}; + export const agUIAdapter = (): StreamProtocolAdapter => ({ async *parse(response: Response): AsyncIterable { const reader = response.body?.getReader(); if (!reader) throw new Error("No response body"); const decoder = new TextDecoder(); + let buffer = ""; while (true) { const { done, value } = await reader.read(); if (done) break; - const chunk = decoder.decode(value, { stream: true }); - const lines = chunk.split("\n"); + buffer += normalizeEols(decoder.decode(value, { stream: true })); + + // SSE events are separated by a blank line + const blocks = buffer.split("\n\n"); + buffer = blocks.pop() ?? ""; - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (!data || data === "[DONE]") continue; + for (const block of blocks) { + const { data } = parseSSEBlock(block); + const payload = data.trim(); + if (!payload) continue; + if (payload === "[DONE]") return; try { - const event = JSON.parse(data); - yield event as AGUIEvent; + yield JSON.parse(payload) as AGUIEvent; } catch (e) { - console.error("Failed to parse SSE event", e); + // Best-effort: malformed events should not kill streaming. + // (Servers can occasionally emit partial JSON due to upstream bugs.) + console.warn("[OpenUI] Failed to parse AG-UI SSE event", e); + } + } + } + + // Flush any final complete block if the stream ended without an extra delimiter. + const final = buffer.trim(); + if (final) { + const { data } = parseSSEBlock(final); + const payload = data.trim(); + if (payload && payload !== "[DONE]") { + try { + yield JSON.parse(payload) as AGUIEvent; + } catch { + // ignore } } }