Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/content/docs/api-reference/react-headless.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ function openAIReadableStreamAdapter(): StreamProtocolAdapter; // OpenAI Readabl
function agUIAdapter(): StreamProtocolAdapter; // AG-UI protocol stream
```

`agUIAdapter()` expects standard **SSE framing** (blank-line delimited events, one or more `data:` lines per event). It is resilient to arbitrary network chunk boundaries (partial lines / partial JSON), CRLF line endings, and comment keepalive lines.

If your server emits a single JSON object per newline (NDJSON) rather than SSE, use `openAIReadableStreamAdapter()` (or implement a custom adapter).


Related type:

```ts
Expand Down
111 changes: 101 additions & 10 deletions packages/react-headless/src/store/__tests__/createChatStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ describe("createChatStore", () => {

expect(result).toEqual(newThread);
expect(store.getState().threads).toHaveLength(2);
expect(store.getState().threads.map((t) => t.id)).toContain("t-new");
expect(store.getState().threads.map((t: Thread) => t.id)).toContain("t-new");
});
});

Expand Down Expand Up @@ -362,18 +362,20 @@ describe("createChatStore", () => {
describe("cancelMessage", () => {
it("aborts in-flight request", async () => {
let capturedAbort: AbortController;
const processMessage = vi.fn().mockImplementation(({ abortController }) => {
capturedAbort = abortController;
return new Promise(() => {}); // never resolves
});
const processMessage = vi.fn().mockImplementation(
({ abortController }: { abortController: AbortController }) => {
capturedAbort = abortController;
return new Promise(() => {}); // never resolves
},
);

const store = createChatStore({
processMessage,
streamProtocol: { parse: async function* () {} },
});
store.setState({ selectedThreadId: "t1" });

const _promise = store.getState().processMessage({ role: "user", content: "hello" });
store.getState().processMessage({ role: "user", content: "hello" });

await flushPromises();
expect(store.getState().isRunning).toBe(true);
Expand Down Expand Up @@ -458,6 +460,93 @@ describe("createChatStore", () => {
expect((store.getState().messages[1] as any).content).toBe("response text");
});

it("handles SSE events split across arbitrary chunks", async () => {
const encoder = new TextEncoder();

const part1 = "data: {\"type\": \"TEXT_MESSAGE_CONTENT\", \"delta\": \"split";
const part2 = " over chunks\"}\n\n";
const done = "data: [DONE]\n\n";

const stream = new ReadableStream({
start(c) {
c.enqueue(encoder.encode(part1));
c.enqueue(encoder.encode(part2));
c.enqueue(encoder.encode(done));
c.close();
},
});

fetchSpy.mockResolvedValue(new Response(stream));

const store = createChatStore({ apiUrl: "/api/chat" });
store.setState({ selectedThreadId: "t1" });

await store.getState().processMessage({ role: "user", content: "hello" });

expect(store.getState().messages).toHaveLength(2);
expect((store.getState().messages[1] as any).content).toBe("split over chunks");
});

it("supports CRLF + multi-line data fields", async () => {
const encoder = new TextEncoder();

// SSE spec: multiple `data:` lines are joined with `\n`.
// Newlines between JSON tokens are valid whitespace.
const sse =
"data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\r\n" +
"data: \"delta\":\"multiline\"}\r\n" +
"\r\n" +
"data: [DONE]\r\n\r\n";

const stream = new ReadableStream({
start(c) {
c.enqueue(encoder.encode(sse));
c.close();
},
});

fetchSpy.mockResolvedValue(new Response(stream));

const store = createChatStore({ apiUrl: "/api/chat" });
store.setState({ selectedThreadId: "t1" });

await store.getState().processMessage({ role: "user", content: "hello" });

expect(store.getState().messages).toHaveLength(2);
expect((store.getState().messages[1] as any).content).toBe("multiline");
});

it("skips malformed events and continues streaming", async () => {
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
const encoder = new TextEncoder();

const sse =
"data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\"delta\":\"ok\"}\n\n" +
// malformed JSON
"data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\"delta\":\"bad\"\n\n" +
"data: {\"type\":\"TEXT_MESSAGE_CONTENT\",\"delta\":\"still ok\"}\n\n" +
"data: [DONE]\n\n";

const stream = new ReadableStream({
start(c) {
c.enqueue(encoder.encode(sse));
c.close();
},
});

fetchSpy.mockResolvedValue(new Response(stream));

const store = createChatStore({ apiUrl: "/api/chat" });
store.setState({ selectedThreadId: "t1" });

await store.getState().processMessage({ role: "user", content: "hello" });

expect(store.getState().messages).toHaveLength(2);
expect((store.getState().messages[1] as any).content).toBe("okstill ok");

warnSpy.mockRestore();
});

it("throws when neither apiUrl nor processMessage provided", async () => {
const store = createChatStore({});
store.setState({ selectedThreadId: "t1" });
Expand Down Expand Up @@ -801,10 +890,12 @@ describe("createChatStore", () => {
describe("selectThread while streaming", () => {
it("cancels current stream and loads new thread", async () => {
let capturedAbort: AbortController;
const processMessage = vi.fn().mockImplementation(({ abortController }) => {
capturedAbort = abortController;
return new Promise(() => {}); // never resolves
});
const processMessage = vi.fn().mockImplementation(
({ abortController }: { abortController: AbortController }) => {
capturedAbort = abortController;
return new Promise(() => {}); // never resolves
},
);
const newMessages = [makeMessage("new-m1")];
const loadThread = vi.fn().mockResolvedValue(newMessages);

Expand Down
61 changes: 52 additions & 9 deletions packages/react-headless/src/stream/adapters/ag-ui.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,72 @@
import { AGUIEvent, StreamProtocolAdapter } from "../../types";

const normalizeEols = (chunk: string) => chunk.replace(/\r\n/g, "\n");

const stripOptionalLeadingSpace = (value: string) => (value.startsWith(" ") ? value.slice(1) : value);

type SSEBlock = {
data: string;
};

const parseSSEBlock = (block: string): SSEBlock => {
const dataLines: string[] = [];

for (const rawLine of block.split("\n")) {
if (!rawLine) continue;
if (rawLine.startsWith(":")) continue; // comment / keepalive

if (rawLine.startsWith("data:")) {
dataLines.push(stripOptionalLeadingSpace(rawLine.slice(5)));
}
}

return { data: dataLines.join("\n") };
};

export const agUIAdapter = (): StreamProtocolAdapter => ({
async *parse(response: Response): AsyncIterable<AGUIEvent> {
const reader = response.body?.getReader();
if (!reader) throw new Error("No response body");

const decoder = new TextDecoder();
let buffer = "";

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split("\n");
buffer += normalizeEols(decoder.decode(value, { stream: true }));

// SSE events are separated by a blank line
const blocks = buffer.split("\n\n");
buffer = blocks.pop() ?? "";

for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (!data || data === "[DONE]") continue;
for (const block of blocks) {
const { data } = parseSSEBlock(block);
const payload = data.trim();
if (!payload) continue;
if (payload === "[DONE]") return;

try {
const event = JSON.parse(data);
yield event as AGUIEvent;
yield JSON.parse(payload) as AGUIEvent;
} catch (e) {
console.error("Failed to parse SSE event", e);
// Best-effort: malformed events should not kill streaming.
// (Servers can occasionally emit partial JSON due to upstream bugs.)
console.warn("[OpenUI] Failed to parse AG-UI SSE event", e);
}
}
}

// Flush any final complete block if the stream ended without an extra delimiter.
const final = buffer.trim();
if (final) {
const { data } = parseSSEBlock(final);
const payload = data.trim();
if (payload && payload !== "[DONE]") {
try {
yield JSON.parse(payload) as AGUIEvent;
} catch {
// ignore
}
}
}
Expand Down