Skip to content

Commit 41bdef7

Browse files
authored
Add LangGraph streaming adapter (#381)
* feat: add LangGraph streaming adapter and message format converter
1 parent 7cb2a43 commit 41bdef7

File tree

6 files changed

+818
-1
lines changed

6 files changed

+818
-1
lines changed

packages/react-headless/src/index.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ export { ArtifactContext, useArtifactStore } from "./store/ArtifactContext";
88
export { ChatProvider } from "./store/ChatProvider";
99
export {
1010
agUIAdapter,
11+
langGraphAdapter,
1112
openAIAdapter,
1213
openAIReadableStreamAdapter,
1314
openAIResponsesAdapter,
1415
} from "./stream/adapters";
15-
export { openAIConversationMessageFormat, openAIMessageFormat } from "./stream/formats";
16+
export {
17+
langGraphMessageFormat,
18+
openAIConversationMessageFormat,
19+
openAIMessageFormat,
20+
} from "./stream/formats";
1621
export { processStreamedMessage } from "./stream/processStreamedMessage";
1722

1823
export type { ArtifactActions, ArtifactState } from "./store/artifactTypes";
@@ -44,6 +49,8 @@ export type {
4449
UserMessage,
4550
} from "./types/message";
4651

52+
export type { LangGraphAdapterOptions } from "./stream/adapters/langgraph";
53+
export type { LangGraphMessageFormat } from "./stream/formats/langgraph-message-format";
4754
export { identityMessageFormat } from "./types/messageFormat";
4855
export type { MessageFormat } from "./types/messageFormat";
4956
export { EventType } from "./types/stream";
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import { EventType } from "../../../types";
3+
import { langGraphAdapter } from "../langgraph";
4+
5+
// ── Helpers ──
6+
7+
/**
8+
* Create a Response with an SSE body from a raw string.
9+
*/
10+
function makeSSEResponse(body: string): Response {
11+
const stream = new ReadableStream({
12+
start(controller) {
13+
controller.enqueue(new TextEncoder().encode(body));
14+
controller.close();
15+
},
16+
});
17+
return new Response(stream);
18+
}
19+
20+
/**
21+
* Build a named SSE block: `event: <name>\ndata: <json>\n\n`
22+
*/
23+
function sse(event: string, data: unknown): string {
24+
return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
25+
}
26+
27+
/**
28+
* Collect all events from an async iterable.
29+
*/
30+
async function collect(iter: AsyncIterable<unknown>): Promise<unknown[]> {
31+
const events: unknown[] = [];
32+
for await (const event of iter) {
33+
events.push(event);
34+
}
35+
return events;
36+
}
37+
38+
// ── Tests ──
39+
40+
describe("langGraphAdapter", () => {
41+
it("throws when response has no body", async () => {
42+
const adapter = langGraphAdapter();
43+
const response = new Response(null);
44+
45+
await expect(async () => {
46+
for await (const _ of adapter.parse(response)) {
47+
/* drain */
48+
}
49+
}).rejects.toThrow("No response body");
50+
});
51+
52+
describe("text streaming", () => {
53+
it("emits TEXT_MESSAGE_START, CONTENT, and END for AI message chunks", async () => {
54+
const body =
55+
sse("messages", [
56+
{ type: "AIMessageChunk", content: "Hello", id: "msg-1" },
57+
{ langgraph_node: "agent" },
58+
]) +
59+
sse("messages", [
60+
{ type: "AIMessageChunk", content: " world", id: "msg-1" },
61+
{ langgraph_node: "agent" },
62+
]) +
63+
sse("end", null);
64+
65+
const adapter = langGraphAdapter();
66+
const events = await collect(adapter.parse(makeSSEResponse(body)));
67+
68+
expect(events[0]).toMatchObject({
69+
type: EventType.TEXT_MESSAGE_START,
70+
role: "assistant",
71+
});
72+
expect(events[1]).toMatchObject({
73+
type: EventType.TEXT_MESSAGE_CONTENT,
74+
delta: "Hello",
75+
});
76+
expect(events[2]).toMatchObject({
77+
type: EventType.TEXT_MESSAGE_CONTENT,
78+
delta: " world",
79+
});
80+
expect(events[3]).toMatchObject({
81+
type: EventType.TEXT_MESSAGE_END,
82+
});
83+
});
84+
85+
it("handles content as array of typed blocks", async () => {
86+
const body =
87+
sse("messages", [
88+
{
89+
type: "AIMessageChunk",
90+
content: [
91+
{ type: "text", text: "block one" },
92+
{ type: "text", text: " block two" },
93+
],
94+
id: "msg-1",
95+
},
96+
{ langgraph_node: "agent" },
97+
]) + sse("end", null);
98+
99+
const adapter = langGraphAdapter();
100+
const events = await collect(adapter.parse(makeSSEResponse(body)));
101+
102+
expect(events[1]).toMatchObject({
103+
type: EventType.TEXT_MESSAGE_CONTENT,
104+
delta: "block one block two",
105+
});
106+
});
107+
108+
it("handles non-tuple message format (plain object)", async () => {
109+
const body =
110+
sse("messages", { type: "ai", content: "plain", id: "msg-1" }) + sse("end", null);
111+
112+
const adapter = langGraphAdapter();
113+
const events = await collect(adapter.parse(makeSSEResponse(body)));
114+
115+
expect(events[0]).toMatchObject({ type: EventType.TEXT_MESSAGE_START });
116+
expect(events[1]).toMatchObject({
117+
type: EventType.TEXT_MESSAGE_CONTENT,
118+
delta: "plain",
119+
});
120+
});
121+
122+
it("ignores non-AI message types", async () => {
123+
const body =
124+
sse("messages", [
125+
{ type: "human", content: "user input", id: "hmsg-1" },
126+
{ langgraph_node: "agent" },
127+
]) + sse("end", null);
128+
129+
const adapter = langGraphAdapter();
130+
const events = await collect(adapter.parse(makeSSEResponse(body)));
131+
132+
// Should only have the end event (no message start/content)
133+
expect(events).toHaveLength(0);
134+
});
135+
});
136+
137+
describe("tool calls", () => {
138+
it("emits tool call events for tool_call_chunks", async () => {
139+
const body =
140+
sse("messages", [
141+
{
142+
type: "AIMessageChunk",
143+
content: "",
144+
tool_call_chunks: [{ id: "tc-1", name: "get_weather", args: '{"loc', index: 0 }],
145+
},
146+
{ langgraph_node: "agent" },
147+
]) +
148+
sse("messages", [
149+
{
150+
type: "AIMessageChunk",
151+
content: "",
152+
tool_call_chunks: [{ id: undefined, name: undefined, args: 'ation":"NYC"}', index: 0 }],
153+
},
154+
{ langgraph_node: "agent" },
155+
]) +
156+
sse("end", null);
157+
158+
const adapter = langGraphAdapter();
159+
const events = await collect(adapter.parse(makeSSEResponse(body)));
160+
161+
// TEXT_MESSAGE_START, TOOL_CALL_START, TOOL_CALL_ARGS, TOOL_CALL_ARGS, TOOL_CALL_END, TEXT_MESSAGE_END
162+
const toolStart = events.find((e: any) => e.type === EventType.TOOL_CALL_START);
163+
expect(toolStart).toMatchObject({
164+
type: EventType.TOOL_CALL_START,
165+
toolCallId: "tc-1",
166+
toolCallName: "get_weather",
167+
});
168+
169+
const toolArgs = events.filter((e: any) => e.type === EventType.TOOL_CALL_ARGS);
170+
expect(toolArgs).toHaveLength(2);
171+
expect((toolArgs[0] as any).delta).toBe('{"loc');
172+
expect((toolArgs[1] as any).delta).toBe('ation":"NYC"}');
173+
});
174+
175+
it("emits tool call events for complete tool_calls (non-streaming)", async () => {
176+
const body =
177+
sse("messages", [
178+
{
179+
type: "AIMessageChunk",
180+
content: "",
181+
tool_calls: [{ id: "tc-1", name: "search", args: { query: "test" } }],
182+
},
183+
{ langgraph_node: "agent" },
184+
]) + sse("end", null);
185+
186+
const adapter = langGraphAdapter();
187+
const events = await collect(adapter.parse(makeSSEResponse(body)));
188+
189+
const toolStart = events.find((e: any) => e.type === EventType.TOOL_CALL_START);
190+
expect(toolStart).toMatchObject({
191+
toolCallId: "tc-1",
192+
toolCallName: "search",
193+
});
194+
195+
const toolArgs = events.find((e: any) => e.type === EventType.TOOL_CALL_ARGS);
196+
expect((toolArgs as any).delta).toBe('{"query":"test"}');
197+
198+
const toolEnd = events.filter((e: any) => e.type === EventType.TOOL_CALL_END);
199+
// One from the complete tool_calls handling + one from the "end" event cleanup
200+
expect(toolEnd.length).toBeGreaterThanOrEqual(1);
201+
});
202+
});
203+
204+
describe("error handling", () => {
205+
it("emits RUN_ERROR for error events", async () => {
206+
const body = sse("error", {
207+
error: "InternalError",
208+
message: "Something went wrong",
209+
});
210+
211+
const adapter = langGraphAdapter();
212+
const events = await collect(adapter.parse(makeSSEResponse(body)));
213+
214+
expect(events[0]).toMatchObject({
215+
type: EventType.RUN_ERROR,
216+
message: "Something went wrong",
217+
code: "InternalError",
218+
});
219+
});
220+
221+
it("handles malformed JSON gracefully", async () => {
222+
const body = "event: messages\ndata: {invalid json}\n\n" + sse("end", null);
223+
224+
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
225+
const adapter = langGraphAdapter();
226+
const events = await collect(adapter.parse(makeSSEResponse(body)));
227+
228+
expect(consoleSpy).toHaveBeenCalledWith(
229+
"Failed to parse LangGraph SSE data",
230+
expect.any(SyntaxError),
231+
);
232+
consoleSpy.mockRestore();
233+
234+
// Should still have processed remaining events (no events since "end"
235+
// with no message started yields nothing)
236+
expect(events).toHaveLength(0);
237+
});
238+
});
239+
240+
describe("interrupts", () => {
241+
it("calls onInterrupt when updates contain __interrupt__", async () => {
242+
const onInterrupt = vi.fn();
243+
const body =
244+
sse("messages", [
245+
{ type: "AIMessageChunk", content: "thinking...", id: "msg-1" },
246+
{ langgraph_node: "agent" },
247+
]) +
248+
sse("updates", {
249+
agent: { messages: [] },
250+
__interrupt__: { value: "need input", resumable: true },
251+
}) +
252+
sse("end", null);
253+
254+
const adapter = langGraphAdapter({ onInterrupt });
255+
await collect(adapter.parse(makeSSEResponse(body)));
256+
257+
expect(onInterrupt).toHaveBeenCalledWith({
258+
value: "need input",
259+
resumable: true,
260+
});
261+
});
262+
263+
it("does not throw when onInterrupt is not provided", async () => {
264+
const body =
265+
sse("updates", {
266+
__interrupt__: { value: "need input" },
267+
}) + sse("end", null);
268+
269+
const adapter = langGraphAdapter();
270+
// Should not throw
271+
await collect(adapter.parse(makeSSEResponse(body)));
272+
});
273+
});
274+
275+
describe("metadata event", () => {
276+
it("does not emit events for metadata", async () => {
277+
const body =
278+
sse("metadata", { run_id: "run-123", thread_id: "thread-456" }) + sse("end", null);
279+
280+
const adapter = langGraphAdapter();
281+
const events = await collect(adapter.parse(makeSSEResponse(body)));
282+
283+
// No events should be emitted for metadata alone
284+
expect(events).toHaveLength(0);
285+
});
286+
});
287+
288+
describe("stream end", () => {
289+
it("closes message on explicit end event", async () => {
290+
const body =
291+
sse("messages", [
292+
{ type: "AIMessageChunk", content: "done", id: "msg-1" },
293+
{ langgraph_node: "agent" },
294+
]) + sse("end", null);
295+
296+
const adapter = langGraphAdapter();
297+
const events = await collect(adapter.parse(makeSSEResponse(body)));
298+
299+
const lastEvent = events[events.length - 1] as any;
300+
expect(lastEvent.type).toBe(EventType.TEXT_MESSAGE_END);
301+
});
302+
303+
it("closes message when stream ends without end event", async () => {
304+
const body = sse("messages", [
305+
{ type: "AIMessageChunk", content: "abrupt", id: "msg-1" },
306+
{ langgraph_node: "agent" },
307+
]);
308+
309+
const adapter = langGraphAdapter();
310+
const events = await collect(adapter.parse(makeSSEResponse(body)));
311+
312+
const lastEvent = events[events.length - 1] as any;
313+
expect(lastEvent.type).toBe(EventType.TEXT_MESSAGE_END);
314+
});
315+
});
316+
317+
describe("multi-chunk delivery", () => {
318+
it("handles SSE blocks split across multiple chunks", async () => {
319+
// Simulate a response where SSE data arrives in two chunks,
320+
// with the split happening mid-block.
321+
const part1 = "event: messages\ndata: ";
322+
const part2 =
323+
JSON.stringify([
324+
{ type: "AIMessageChunk", content: "split", id: "msg-1" },
325+
{ langgraph_node: "agent" },
326+
]) +
327+
"\n\n" +
328+
sse("end", null);
329+
330+
const stream = new ReadableStream({
331+
start(controller) {
332+
controller.enqueue(new TextEncoder().encode(part1));
333+
controller.enqueue(new TextEncoder().encode(part2));
334+
controller.close();
335+
},
336+
});
337+
338+
const adapter = langGraphAdapter();
339+
const events = await collect(adapter.parse(new Response(stream)));
340+
341+
expect(events[0]).toMatchObject({ type: EventType.TEXT_MESSAGE_START });
342+
expect(events[1]).toMatchObject({
343+
type: EventType.TEXT_MESSAGE_CONTENT,
344+
delta: "split",
345+
});
346+
});
347+
});
348+
});
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from "./ag-ui";
2+
export * from "./langgraph";
23
export * from "./openai-completions";
34
export * from "./openai-readable-stream";
45
export * from "./openai-responses";

0 commit comments

Comments
 (0)