Skip to content

Commit b65c80b

Browse files
authored
feat(sdk): useStream with custom transport (#1675)
1 parent a87ae1d commit b65c80b

File tree

6 files changed

+993
-634
lines changed

6 files changed

+993
-634
lines changed

.changeset/heavy-wings-sing.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@langchain/langgraph-sdk": patch
3+
---
4+
5+
Add `transport` option to useStream, allowing custom endpoints, that emit compatible Server-Sent Events to be used with `useStream`.

libs/sdk/src/react/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,10 @@
11
export { useStream } from "./stream.js";
2-
export type { MessageMetadata, UseStream, UseStreamOptions } from "./types.js";
2+
export { FetchStreamTransport } from "./stream.custom.js";
3+
export type {
4+
MessageMetadata,
5+
UseStream,
6+
UseStreamOptions,
7+
UseStreamCustom,
8+
UseStreamCustomOptions,
9+
UseStreamTransport,
10+
} from "./types.js";
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/* __LC_ALLOW_ENTRYPOINT_SIDE_EFFECTS__ */
2+
3+
"use client";
4+
5+
import { useState, useSyncExternalStore } from "react";
6+
import { EventStreamEvent, StreamManager } from "./manager.js";
7+
import type {
8+
BagTemplate,
9+
GetUpdateType,
10+
GetCustomEventType,
11+
GetInterruptType,
12+
RunCallbackMeta,
13+
GetConfigurableType,
14+
UseStreamCustomOptions,
15+
UseStreamCustom,
16+
UseStreamTransport,
17+
CustomSubmitOptions,
18+
} from "./types.js";
19+
import type { Message } from "../types.messages.js";
20+
import { MessageTupleManager } from "./messages.js";
21+
import { Interrupt } from "../schema.js";
22+
import { BytesLineDecoder, SSEDecoder } from "../utils/sse.js";
23+
import { IterableReadableStream } from "../utils/stream.js";
24+
import { Command } from "../types.js";
25+
26+
interface FetchStreamTransportOptions {
27+
/**
28+
* The URL of the API to use.
29+
*/
30+
apiUrl: string;
31+
32+
/**
33+
* Default headers to send with requests.
34+
*/
35+
defaultHeaders?: HeadersInit;
36+
37+
/**
38+
* Specify a custom fetch implementation.
39+
*/
40+
fetch?: typeof fetch | ((...args: any[]) => any); // eslint-disable-line @typescript-eslint/no-explicit-any
41+
42+
/**
43+
* Callback that is called before the request is made.
44+
*/
45+
onRequest?: (
46+
url: string,
47+
init: RequestInit
48+
) => Promise<RequestInit> | RequestInit;
49+
}
50+
51+
export class FetchStreamTransport<
52+
StateType extends Record<string, unknown> = Record<string, unknown>,
53+
Bag extends BagTemplate = BagTemplate
54+
> implements UseStreamTransport<StateType, Bag>
55+
{
56+
constructor(private readonly options: FetchStreamTransportOptions) {}
57+
58+
async stream(payload: {
59+
input: GetUpdateType<Bag, StateType> | null | undefined;
60+
context: GetConfigurableType<Bag> | undefined;
61+
command: Command | undefined;
62+
signal: AbortSignal;
63+
}): Promise<AsyncGenerator<{ id?: string; event: string; data: unknown }>> {
64+
const { signal, ...body } = payload;
65+
66+
let requestInit: RequestInit = {
67+
method: "POST",
68+
headers: {
69+
"Content-Type": "application/json",
70+
...this.options.defaultHeaders,
71+
},
72+
body: JSON.stringify(body),
73+
signal,
74+
};
75+
76+
if (this.options.onRequest) {
77+
requestInit = await this.options.onRequest(
78+
this.options.apiUrl,
79+
requestInit
80+
);
81+
}
82+
const fetchFn = this.options.fetch ?? fetch;
83+
84+
const response = await fetchFn(this.options.apiUrl, requestInit);
85+
if (!response.ok) {
86+
throw new Error(`Failed to stream: ${response.statusText}`);
87+
}
88+
89+
const stream = (
90+
response.body || new ReadableStream({ start: (ctrl) => ctrl.close() })
91+
)
92+
.pipeThrough(BytesLineDecoder())
93+
.pipeThrough(SSEDecoder());
94+
95+
return IterableReadableStream.fromReadableStream(stream);
96+
}
97+
}
98+
99+
export function useStreamCustom<
100+
StateType extends Record<string, unknown> = Record<string, unknown>,
101+
Bag extends {
102+
ConfigurableType?: Record<string, unknown>;
103+
InterruptType?: unknown;
104+
CustomEventType?: unknown;
105+
UpdateType?: unknown;
106+
} = BagTemplate
107+
>(
108+
options: UseStreamCustomOptions<StateType, Bag>
109+
): UseStreamCustom<StateType, Bag> {
110+
type UpdateType = GetUpdateType<Bag, StateType>;
111+
type CustomType = GetCustomEventType<Bag>;
112+
type InterruptType = GetInterruptType<Bag>;
113+
type ConfigurableType = GetConfigurableType<Bag>;
114+
115+
const [messageManager] = useState(() => new MessageTupleManager());
116+
const [stream] = useState(
117+
() => new StreamManager<StateType, Bag>(messageManager)
118+
);
119+
120+
useSyncExternalStore(
121+
stream.subscribe,
122+
stream.getSnapshot,
123+
stream.getSnapshot
124+
);
125+
126+
const getMessages = (value: StateType): Message[] => {
127+
const messagesKey = options.messagesKey ?? "messages";
128+
return Array.isArray(value[messagesKey]) ? value[messagesKey] : [];
129+
};
130+
131+
const setMessages = (current: StateType, messages: Message[]): StateType => {
132+
const messagesKey = options.messagesKey ?? "messages";
133+
return { ...current, [messagesKey]: messages };
134+
};
135+
136+
const historyValues = options.initialValues ?? ({} as StateType);
137+
138+
const stop = () => stream.stop(historyValues, { onStop: options.onStop });
139+
140+
const submit = async (
141+
values: UpdateType | null | undefined,
142+
submitOptions?: CustomSubmitOptions<StateType, ConfigurableType>
143+
) => {
144+
let callbackMeta: RunCallbackMeta | undefined;
145+
146+
stream.setStreamValues(() => {
147+
if (submitOptions?.optimisticValues != null) {
148+
return {
149+
...historyValues,
150+
...(typeof submitOptions.optimisticValues === "function"
151+
? submitOptions.optimisticValues(historyValues)
152+
: submitOptions.optimisticValues),
153+
};
154+
}
155+
156+
return { ...historyValues };
157+
});
158+
159+
await stream.start(
160+
async (signal: AbortSignal) =>
161+
options.transport.stream({
162+
input: values,
163+
context: submitOptions?.context,
164+
command: submitOptions?.command,
165+
signal,
166+
}) as Promise<
167+
AsyncGenerator<EventStreamEvent<StateType, UpdateType, CustomType>>
168+
>,
169+
{
170+
getMessages,
171+
setMessages,
172+
173+
initialValues: {} as StateType,
174+
callbacks: options,
175+
176+
onSuccess: () => undefined,
177+
onError(error) {
178+
options.onError?.(error, callbackMeta);
179+
},
180+
}
181+
);
182+
};
183+
184+
return {
185+
get values() {
186+
return stream.values ?? ({} as StateType);
187+
},
188+
189+
error: stream.error,
190+
isLoading: stream.isLoading,
191+
192+
stop,
193+
submit,
194+
195+
get interrupt(): Interrupt<InterruptType> | undefined {
196+
if (
197+
stream.values != null &&
198+
"__interrupt__" in stream.values &&
199+
Array.isArray(stream.values.__interrupt__)
200+
) {
201+
const valueInterrupts = stream.values.__interrupt__;
202+
if (valueInterrupts.length === 0) return { when: "breakpoint" };
203+
if (valueInterrupts.length === 1) return valueInterrupts[0];
204+
205+
// TODO: fix the typing of interrupts if multiple interrupts are returned
206+
return valueInterrupts as Interrupt<InterruptType>;
207+
}
208+
209+
return undefined;
210+
},
211+
212+
get messages() {
213+
if (!stream.values) return [];
214+
return getMessages(stream.values);
215+
},
216+
};
217+
}

0 commit comments

Comments
 (0)