diff --git a/examples/stream-transport-vite/.gitignore b/examples/stream-transport-vite/.gitignore
new file mode 100644
index 000000000..a547bf36d
--- /dev/null
+++ b/examples/stream-transport-vite/.gitignore
@@ -0,0 +1,24 @@
+# Logs
+logs
+*.log
+npm-debug.log*
+yarn-debug.log*
+yarn-error.log*
+pnpm-debug.log*
+lerna-debug.log*
+
+node_modules
+dist
+dist-ssr
+*.local
+
+# Editor directories and files
+.vscode/*
+!.vscode/extensions.json
+.idea
+.DS_Store
+*.suo
+*.ntvs*
+*.njsproj
+*.sln
+*.sw?
diff --git a/examples/stream-transport-vite/index.html b/examples/stream-transport-vite/index.html
new file mode 100644
index 000000000..45bd05603
--- /dev/null
+++ b/examples/stream-transport-vite/index.html
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+ Vite + React + TS
+
+
+
+
+
+
diff --git a/examples/stream-transport-vite/package.json b/examples/stream-transport-vite/package.json
new file mode 100644
index 000000000..049486c8d
--- /dev/null
+++ b/examples/stream-transport-vite/package.json
@@ -0,0 +1,34 @@
+{
+ "name": "stream-transport-vite",
+ "private": true,
+ "version": "0.0.0",
+ "type": "module",
+ "scripts": {
+ "dev:client": "vite",
+ "dev:server": "tsx watch --env-file=.env --clear-screen=false src/server.mts",
+ "build:internal": "tsc -b && vite build",
+ "format": "prettier --write src",
+ "lint": "prettier --check src",
+ "preview": "vite preview"
+ },
+ "dependencies": {
+ "@hono/node-server": "^1.12.0",
+ "@langchain/core": "^1.0.0-alpha",
+ "@langchain/langgraph": "workspace:*",
+ "@langchain/langgraph-sdk": "workspace:*",
+ "@langchain/openai": "^1.0.0-alpha",
+ "hono": "^4.8.2",
+ "react": "^19.0.0",
+ "react-dom": "^19.0.0",
+ "zod": "^3.23.8"
+ },
+ "devDependencies": {
+ "@types/react": "^19.0.8",
+ "@types/react-dom": "^19.0.3",
+ "@vitejs/plugin-react": "^4.4.1",
+ "prettier": "^2.8.3",
+ "tsx": "^4.19.3",
+ "typescript": "~5.8.3",
+ "vite": "^6.0.0"
+ }
+}
diff --git a/examples/stream-transport-vite/src/client.css b/examples/stream-transport-vite/src/client.css
new file mode 100644
index 000000000..f1d8c73cd
--- /dev/null
+++ b/examples/stream-transport-vite/src/client.css
@@ -0,0 +1 @@
+@import "tailwindcss";
diff --git a/examples/stream-transport-vite/src/client.tsx b/examples/stream-transport-vite/src/client.tsx
new file mode 100644
index 000000000..f613b8171
--- /dev/null
+++ b/examples/stream-transport-vite/src/client.tsx
@@ -0,0 +1,61 @@
+import "./client.css";
+import { StrictMode } from "react";
+import { createRoot } from "react-dom/client";
+
+import {
+ useStream,
+ FetchStreamTransport,
+} from "@langchain/langgraph-sdk/react";
+
+export function App() {
+ const stream = useStream({
+ transport: new FetchStreamTransport({
+ apiUrl: "/api/stream",
+ }),
+ });
+
+ return (
+
+
+ {stream.messages.map((message) => (
+
+ {message.content as string}
+
+ ))}
+
+
+
+ );
+}
+
+createRoot(document.getElementById("root")!).render(
+
+
+
+);
diff --git a/examples/stream-transport-vite/src/server.mts b/examples/stream-transport-vite/src/server.mts
new file mode 100644
index 000000000..42fef684c
--- /dev/null
+++ b/examples/stream-transport-vite/src/server.mts
@@ -0,0 +1,42 @@
+import type { BaseMessage } from "@langchain/core/messages";
+import { StateGraph, MessagesZodMeta, START } from "@langchain/langgraph";
+import { toLangGraphEventStreamResponse } from "@langchain/langgraph/ui";
+import { registry } from "@langchain/langgraph/zod";
+import { ChatOpenAI } from "@langchain/openai";
+import { z } from "zod/v4";
+
+import { serve } from "@hono/node-server";
+import { Hono } from "hono";
+
+const llm = new ChatOpenAI({ model: "gpt-4o-mini" });
+
+const graph = new StateGraph(
+ z.object({
+ messages: z.custom().register(registry, MessagesZodMeta),
+ })
+)
+ .addNode("agent", async ({ messages }) => ({
+ messages: await llm.invoke(messages),
+ }))
+ .addEdge(START, "agent")
+ .compile();
+
+export type GraphType = typeof graph;
+
+const app = new Hono();
+
+app.post("/api/stream", async (c) => {
+ type InputType = GraphType["~InputType"];
+ const { input } = await c.req.json<{ input: InputType }>();
+
+ return toLangGraphEventStreamResponse({
+ stream: graph.streamEvents(input, {
+ version: "v2",
+ streamMode: ["values", "messages"],
+ }),
+ });
+});
+
+serve({ fetch: app.fetch, port: 9123 }, (c) => {
+ console.log(`Server running at ${c.address}:${c.port}`);
+});
diff --git a/examples/stream-transport-vite/src/vite-env.d.ts b/examples/stream-transport-vite/src/vite-env.d.ts
new file mode 100644
index 000000000..11f02fe2a
--- /dev/null
+++ b/examples/stream-transport-vite/src/vite-env.d.ts
@@ -0,0 +1 @@
+///
diff --git a/examples/stream-transport-vite/tsconfig.app.json b/examples/stream-transport-vite/tsconfig.app.json
new file mode 100644
index 000000000..227a6c672
--- /dev/null
+++ b/examples/stream-transport-vite/tsconfig.app.json
@@ -0,0 +1,27 @@
+{
+ "compilerOptions": {
+ "tsBuildInfoFile": "./node_modules/.tmp/tsconfig.app.tsbuildinfo",
+ "target": "ES2022",
+ "useDefineForClassFields": true,
+ "lib": ["ES2022", "DOM", "DOM.Iterable"],
+ "module": "ESNext",
+ "skipLibCheck": true,
+
+ /* Bundler mode */
+ "moduleResolution": "bundler",
+ "allowImportingTsExtensions": true,
+ "verbatimModuleSyntax": true,
+ "moduleDetection": "force",
+ "noEmit": true,
+ "jsx": "react-jsx",
+
+ /* Linting */
+ "strict": true,
+ "noUnusedLocals": true,
+ "noUnusedParameters": true,
+ "erasableSyntaxOnly": true,
+ "noFallthroughCasesInSwitch": true,
+ "noUncheckedSideEffectImports": true
+ },
+ "include": ["src"]
+}
diff --git a/examples/stream-transport-vite/tsconfig.json b/examples/stream-transport-vite/tsconfig.json
new file mode 100644
index 000000000..1ffef600d
--- /dev/null
+++ b/examples/stream-transport-vite/tsconfig.json
@@ -0,0 +1,7 @@
+{
+ "files": [],
+ "references": [
+ { "path": "./tsconfig.app.json" },
+ { "path": "./tsconfig.node.json" }
+ ]
+}
diff --git a/examples/stream-transport-vite/tsconfig.node.json b/examples/stream-transport-vite/tsconfig.node.json
new file mode 100644
index 000000000..f85a39906
--- /dev/null
+++ b/examples/stream-transport-vite/tsconfig.node.json
@@ -0,0 +1,25 @@
+{
+ "compilerOptions": {
+ "tsBuildInfoFile": "./node_modules/.tmp/tsconfig.node.tsbuildinfo",
+ "target": "ES2023",
+ "lib": ["ES2023"],
+ "module": "ESNext",
+ "skipLibCheck": true,
+
+ /* Bundler mode */
+ "moduleResolution": "bundler",
+ "allowImportingTsExtensions": true,
+ "verbatimModuleSyntax": true,
+ "moduleDetection": "force",
+ "noEmit": true,
+
+ /* Linting */
+ "strict": true,
+ "noUnusedLocals": true,
+ "noUnusedParameters": true,
+ "erasableSyntaxOnly": true,
+ "noFallthroughCasesInSwitch": true,
+ "noUncheckedSideEffectImports": true
+ },
+ "include": ["vite.config.ts"]
+}
diff --git a/examples/stream-transport-vite/turbo.json b/examples/stream-transport-vite/turbo.json
new file mode 100644
index 000000000..27d8fc09c
--- /dev/null
+++ b/examples/stream-transport-vite/turbo.json
@@ -0,0 +1,28 @@
+{
+ "extends": [
+ "//"
+ ],
+ "tasks": {
+ "build": {
+ "outputs": [
+ "**/dist/**"
+ ]
+ },
+ "build:internal": {
+ "dependsOn": [
+ "^build:internal"
+ ],
+ "outputs": [
+ "**/dist/**"
+ ]
+ },
+ "dev:client": {
+ "cache": false,
+ "persistent": true
+ },
+ "dev:server": {
+ "cache": false,
+ "persistent": true
+ }
+ }
+}
\ No newline at end of file
diff --git a/examples/stream-transport-vite/vite.config.ts b/examples/stream-transport-vite/vite.config.ts
new file mode 100644
index 000000000..d9f6b0c1e
--- /dev/null
+++ b/examples/stream-transport-vite/vite.config.ts
@@ -0,0 +1,9 @@
+import { defineConfig } from "vite";
+import react from "@vitejs/plugin-react";
+
+// https://vite.dev/config/
+export default defineConfig({
+ plugins: [react()],
+ clearScreen: false,
+ server: { proxy: { "/api": "http://localhost:9123" } },
+});
diff --git a/libs/sdk/src/react/index.ts b/libs/sdk/src/react/index.ts
index b2bccb713..2c94bd609 100644
--- a/libs/sdk/src/react/index.ts
+++ b/libs/sdk/src/react/index.ts
@@ -1,2 +1,10 @@
export { useStream } from "./stream.js";
-export type { MessageMetadata, UseStream, UseStreamOptions } from "./types.js";
+export { FetchStreamTransport } from "./stream.custom.js";
+export type {
+ MessageMetadata,
+ UseStream,
+ UseStreamOptions,
+ UseStreamCustom,
+ UseStreamCustomOptions,
+ UseStreamTransport,
+} from "./types.js";
diff --git a/libs/sdk/src/react/stream.custom.tsx b/libs/sdk/src/react/stream.custom.tsx
new file mode 100644
index 000000000..dc7f35a17
--- /dev/null
+++ b/libs/sdk/src/react/stream.custom.tsx
@@ -0,0 +1,217 @@
+/* __LC_ALLOW_ENTRYPOINT_SIDE_EFFECTS__ */
+
+"use client";
+
+import { useState, useSyncExternalStore } from "react";
+import { EventStreamEvent, StreamManager } from "./manager.js";
+import type {
+ BagTemplate,
+ GetUpdateType,
+ GetCustomEventType,
+ GetInterruptType,
+ RunCallbackMeta,
+ GetConfigurableType,
+ UseStreamCustomOptions,
+ UseStreamCustom,
+ UseStreamTransport,
+ CustomSubmitOptions,
+} from "./types.js";
+import type { Message } from "../types.messages.js";
+import { MessageTupleManager } from "./messages.js";
+import { Interrupt } from "../schema.js";
+import { BytesLineDecoder, SSEDecoder } from "../utils/sse.js";
+import { IterableReadableStream } from "../utils/stream.js";
+import { Command } from "../types.js";
+
+interface FetchStreamTransportOptions {
+ /**
+ * The URL of the API to use.
+ */
+ apiUrl: string;
+
+ /**
+ * Default headers to send with requests.
+ */
+ defaultHeaders?: HeadersInit;
+
+ /**
+ * Specify a custom fetch implementation.
+ */
+ fetch?: typeof fetch | ((...args: any[]) => any); // eslint-disable-line @typescript-eslint/no-explicit-any
+
+ /**
+ * Callback that is called before the request is made.
+ */
+ onRequest?: (
+ url: string,
+ init: RequestInit
+ ) => Promise | RequestInit;
+}
+
+export class FetchStreamTransport<
+ StateType extends Record = Record,
+ Bag extends BagTemplate = BagTemplate
+> implements UseStreamTransport
+{
+ constructor(private readonly options: FetchStreamTransportOptions) {}
+
+ async stream(payload: {
+ input: GetUpdateType | null | undefined;
+ context: GetConfigurableType | undefined;
+ command: Command | undefined;
+ signal: AbortSignal;
+ }): Promise> {
+ const { signal, ...body } = payload;
+
+ let requestInit: RequestInit = {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ ...this.options.defaultHeaders,
+ },
+ body: JSON.stringify(body),
+ signal,
+ };
+
+ if (this.options.onRequest) {
+ requestInit = await this.options.onRequest(
+ this.options.apiUrl,
+ requestInit
+ );
+ }
+ const fetchFn = this.options.fetch ?? fetch;
+
+ const response = await fetchFn(this.options.apiUrl, requestInit);
+ if (!response.ok) {
+ throw new Error(`Failed to stream: ${response.statusText}`);
+ }
+
+ const stream = (
+ response.body || new ReadableStream({ start: (ctrl) => ctrl.close() })
+ )
+ .pipeThrough(BytesLineDecoder())
+ .pipeThrough(SSEDecoder());
+
+ return IterableReadableStream.fromReadableStream(stream);
+ }
+}
+
+export function useStreamCustom<
+ StateType extends Record = Record,
+ Bag extends {
+ ConfigurableType?: Record;
+ InterruptType?: unknown;
+ CustomEventType?: unknown;
+ UpdateType?: unknown;
+ } = BagTemplate
+>(
+ options: UseStreamCustomOptions
+): UseStreamCustom {
+ type UpdateType = GetUpdateType;
+ type CustomType = GetCustomEventType;
+ type InterruptType = GetInterruptType;
+ type ConfigurableType = GetConfigurableType;
+
+ const [messageManager] = useState(() => new MessageTupleManager());
+ const [stream] = useState(
+ () => new StreamManager(messageManager)
+ );
+
+ useSyncExternalStore(
+ stream.subscribe,
+ stream.getSnapshot,
+ stream.getSnapshot
+ );
+
+ const getMessages = (value: StateType): Message[] => {
+ const messagesKey = options.messagesKey ?? "messages";
+ return Array.isArray(value[messagesKey]) ? value[messagesKey] : [];
+ };
+
+ const setMessages = (current: StateType, messages: Message[]): StateType => {
+ const messagesKey = options.messagesKey ?? "messages";
+ return { ...current, [messagesKey]: messages };
+ };
+
+ const historyValues = options.initialValues ?? ({} as StateType);
+
+ const stop = () => stream.stop(historyValues, { onStop: options.onStop });
+
+ const submit = async (
+ values: UpdateType | null | undefined,
+ submitOptions?: CustomSubmitOptions
+ ) => {
+ let callbackMeta: RunCallbackMeta | undefined;
+
+ stream.setStreamValues(() => {
+ if (submitOptions?.optimisticValues != null) {
+ return {
+ ...historyValues,
+ ...(typeof submitOptions.optimisticValues === "function"
+ ? submitOptions.optimisticValues(historyValues)
+ : submitOptions.optimisticValues),
+ };
+ }
+
+ return { ...historyValues };
+ });
+
+ await stream.start(
+ async (signal: AbortSignal) =>
+ options.transport.stream({
+ input: values,
+ context: submitOptions?.context,
+ command: submitOptions?.command,
+ signal,
+ }) as Promise<
+ AsyncGenerator>
+ >,
+ {
+ getMessages,
+ setMessages,
+
+ initialValues: {} as StateType,
+ callbacks: options,
+
+ onSuccess: () => undefined,
+ onError(error) {
+ options.onError?.(error, callbackMeta);
+ },
+ }
+ );
+ };
+
+ return {
+ get values() {
+ return stream.values ?? ({} as StateType);
+ },
+
+ error: stream.error,
+ isLoading: stream.isLoading,
+
+ stop,
+ submit,
+
+ get interrupt(): Interrupt | undefined {
+ if (
+ stream.values != null &&
+ "__interrupt__" in stream.values &&
+ Array.isArray(stream.values.__interrupt__)
+ ) {
+ const valueInterrupts = stream.values.__interrupt__;
+ if (valueInterrupts.length === 0) return { when: "breakpoint" };
+ if (valueInterrupts.length === 1) return valueInterrupts[0];
+
+ // TODO: fix the typing of interrupts if multiple interrupts are returned
+ return valueInterrupts as Interrupt;
+ }
+
+ return undefined;
+ },
+
+ get messages() {
+ if (!stream.values) return [];
+ return getMessages(stream.values);
+ },
+ };
+}
diff --git a/libs/sdk/src/react/stream.lgp.tsx b/libs/sdk/src/react/stream.lgp.tsx
new file mode 100644
index 000000000..0e8d97633
--- /dev/null
+++ b/libs/sdk/src/react/stream.lgp.tsx
@@ -0,0 +1,652 @@
+/* __LC_ALLOW_ENTRYPOINT_SIDE_EFFECTS__ */
+
+"use client";
+
+import {
+ type RefObject,
+ useCallback,
+ useEffect,
+ useMemo,
+ useRef,
+ useState,
+ useSyncExternalStore,
+} from "react";
+import { findLast, unique } from "./utils.js";
+import { StreamError } from "./errors.js";
+import { getBranchContext } from "./branching.js";
+import { EventStreamEvent, StreamManager } from "./manager.js";
+import type {
+ BagTemplate,
+ UseStreamOptions,
+ UseStream,
+ GetUpdateType,
+ GetCustomEventType,
+ GetInterruptType,
+ GetConfigurableType,
+ RunCallbackMeta,
+ SubmitOptions,
+ MessageMetadata,
+} from "./types.js";
+import { Client, getClientConfigHash } from "../client.js";
+import type { Message } from "../types.messages.js";
+import type { Interrupt, ThreadState } from "../schema.js";
+import type { StreamMode } from "../types.stream.js";
+import { MessageTupleManager } from "./messages.js";
+
+function fetchHistory>(
+ client: Client,
+ threadId: string,
+ options?: { limit?: boolean | number }
+) {
+ if (options?.limit === false) {
+ return client.threads.getState(threadId).then((state) => {
+ if (state.checkpoint == null) return [];
+ return [state];
+ });
+ }
+
+ const limit = typeof options?.limit === "number" ? options.limit : 10;
+ return client.threads.getHistory(threadId, { limit });
+}
+
+function useThreadHistory>(
+ threadId: string | undefined | null,
+ client: Client,
+ limit: boolean | number,
+ clearCallbackRef: RefObject<(() => void) | undefined>,
+ submittingRef: RefObject,
+ onErrorRef: RefObject<((error: unknown) => void) | undefined>
+) {
+ const [history, setHistory] = useState[] | undefined>(
+ undefined
+ );
+ const [isLoading, setIsLoading] = useState(() => {
+ if (threadId == null) return false;
+ return true;
+ });
+ const [error, setError] = useState(undefined);
+
+ const clientHash = getClientConfigHash(client);
+ const clientRef = useRef(client);
+ clientRef.current = client;
+
+ const fetcher = useCallback(
+ (
+ threadId: string | undefined | null
+ ): Promise[]> => {
+ if (threadId != null) {
+ const client = clientRef.current;
+
+ setIsLoading(true);
+ return fetchHistory(client, threadId, {
+ limit,
+ })
+ .then(
+ (history) => {
+ setHistory(history);
+ return history;
+ },
+ (error) => {
+ setError(error);
+ onErrorRef.current?.(error);
+ return Promise.reject(error);
+ }
+ )
+ .finally(() => {
+ setIsLoading(false);
+ });
+ }
+
+ setHistory(undefined);
+ setError(undefined);
+ setIsLoading(false);
+
+ clearCallbackRef.current?.();
+ return Promise.resolve([]);
+ },
+ [clearCallbackRef, onErrorRef, limit]
+ );
+
+ useEffect(() => {
+ if (submittingRef.current) return;
+ void fetcher(threadId);
+ }, [fetcher, submittingRef, clientHash, limit, threadId]);
+
+ return {
+ data: history,
+ isLoading,
+ error,
+ mutate: (mutateId?: string) => fetcher(mutateId ?? threadId),
+ };
+}
+
+const useControllableThreadId = (options?: {
+ threadId?: string | null;
+ onThreadId?: (threadId: string) => void;
+}): [string | null, (threadId: string) => void] => {
+ const [localThreadId, _setLocalThreadId] = useState(
+ options?.threadId ?? null
+ );
+
+ const onThreadIdRef = useRef(options?.onThreadId);
+ onThreadIdRef.current = options?.onThreadId;
+
+ const onThreadId = useCallback((threadId: string) => {
+ _setLocalThreadId(threadId);
+ onThreadIdRef.current?.(threadId);
+ }, []);
+
+ if (!options || !("threadId" in options)) {
+ return [localThreadId, onThreadId];
+ }
+
+ return [options.threadId ?? null, onThreadId];
+};
+
+export function useStreamLGP<
+ StateType extends Record = Record,
+ Bag extends {
+ ConfigurableType?: Record;
+ InterruptType?: unknown;
+ CustomEventType?: unknown;
+ UpdateType?: unknown;
+ } = BagTemplate
+>(options: UseStreamOptions): UseStream {
+ type UpdateType = GetUpdateType;
+ type CustomType = GetCustomEventType;
+ type InterruptType = GetInterruptType;
+ type ConfigurableType = GetConfigurableType;
+
+ const reconnectOnMountRef = useRef(options.reconnectOnMount);
+ const runMetadataStorage = useMemo(() => {
+ if (typeof window === "undefined") return null;
+ const storage = reconnectOnMountRef.current;
+ if (storage === true) return window.sessionStorage;
+ if (typeof storage === "function") return storage();
+ return null;
+ }, []);
+
+ const client = useMemo(
+ () =>
+ options.client ??
+ new Client({
+ apiUrl: options.apiUrl,
+ apiKey: options.apiKey,
+ callerOptions: options.callerOptions,
+ defaultHeaders: options.defaultHeaders,
+ }),
+ [
+ options.client,
+ options.apiKey,
+ options.apiUrl,
+ options.callerOptions,
+ options.defaultHeaders,
+ ]
+ );
+
+ const [messageManager] = useState(() => new MessageTupleManager());
+ const [stream] = useState(
+ () => new StreamManager(messageManager)
+ );
+
+ useSyncExternalStore(
+ stream.subscribe,
+ stream.getSnapshot,
+ stream.getSnapshot
+ );
+
+ const [threadId, onThreadId] = useControllableThreadId(options);
+ const trackStreamModeRef = useRef[]>([]);
+
+ const trackStreamMode = useCallback(
+ (...mode: Exclude[]) => {
+ const ref = trackStreamModeRef.current;
+ for (const m of mode) {
+ if (!ref.includes(m)) ref.push(m);
+ }
+ },
+ []
+ );
+
+ const hasUpdateListener = options.onUpdateEvent != null;
+ const hasCustomListener = options.onCustomEvent != null;
+ const hasLangChainListener = options.onLangChainEvent != null;
+ const hasDebugListener = options.onDebugEvent != null;
+ const hasCheckpointListener = options.onCheckpointEvent != null;
+ const hasTaskListener = options.onTaskEvent != null;
+
+ const callbackStreamMode = useMemo(() => {
+ const modes: Exclude[] = [];
+ if (hasUpdateListener) modes.push("updates");
+ if (hasCustomListener) modes.push("custom");
+ if (hasLangChainListener) modes.push("events");
+ if (hasDebugListener) modes.push("debug");
+ if (hasCheckpointListener) modes.push("checkpoints");
+ if (hasTaskListener) modes.push("tasks");
+ return modes;
+ }, [
+ hasUpdateListener,
+ hasCustomListener,
+ hasLangChainListener,
+ hasDebugListener,
+ hasCheckpointListener,
+ hasTaskListener,
+ ]);
+
+ const clearCallbackRef = useRef<() => void>(null!);
+ clearCallbackRef.current = stream.clear;
+
+ const submittingRef = useRef(false);
+ submittingRef.current = stream.isLoading;
+
+ const onErrorRef = useRef<
+ ((error: unknown, run?: RunCallbackMeta) => void) | undefined
+ >(undefined);
+ onErrorRef.current = options.onError;
+
+ const historyLimit =
+ typeof options.fetchStateHistory === "object" &&
+ options.fetchStateHistory != null
+ ? options.fetchStateHistory.limit ?? false
+ : options.fetchStateHistory ?? false;
+
+ const history = useThreadHistory(
+ threadId,
+ client,
+ historyLimit,
+ clearCallbackRef,
+ submittingRef,
+ onErrorRef
+ );
+
+ const getMessages = (value: StateType): Message[] => {
+ const messagesKey = options.messagesKey ?? "messages";
+ return Array.isArray(value[messagesKey]) ? value[messagesKey] : [];
+ };
+
+ const setMessages = (current: StateType, messages: Message[]): StateType => {
+ const messagesKey = options.messagesKey ?? "messages";
+ return { ...current, [messagesKey]: messages };
+ };
+
+ const [branch, setBranch] = useState("");
+ const branchContext = getBranchContext(branch, history.data);
+
+ const historyValues =
+ branchContext.threadHead?.values ??
+ options.initialValues ??
+ ({} as StateType);
+
+ const historyError = (() => {
+ const error = branchContext.threadHead?.tasks?.at(-1)?.error;
+ if (error == null) return undefined;
+ try {
+ const parsed = JSON.parse(error) as unknown;
+ if (StreamError.isStructuredError(parsed)) return new StreamError(parsed);
+ return parsed;
+ } catch {
+ // do nothing
+ }
+ return error;
+ })();
+
+ const messageMetadata = (() => {
+ const alreadyShown = new Set();
+ return getMessages(historyValues).map(
+ (message, idx): Omit, "streamMetadata"> => {
+ const messageId = message.id ?? idx;
+
+ // Find the first checkpoint where the message was seen
+ const firstSeenState = findLast(history.data ?? [], (state) =>
+ getMessages(state.values)
+ .map((m, idx) => m.id ?? idx)
+ .includes(messageId)
+ );
+
+ const checkpointId = firstSeenState?.checkpoint?.checkpoint_id;
+ let branch =
+ checkpointId != null
+ ? branchContext.branchByCheckpoint[checkpointId]
+ : undefined;
+ if (!branch?.branch?.length) branch = undefined;
+
+ // serialize branches
+ const optionsShown = branch?.branchOptions?.flat(2).join(",");
+ if (optionsShown) {
+ if (alreadyShown.has(optionsShown)) branch = undefined;
+ alreadyShown.add(optionsShown);
+ }
+
+ return {
+ messageId: messageId.toString(),
+ firstSeenState,
+
+ branch: branch?.branch,
+ branchOptions: branch?.branchOptions,
+ };
+ }
+ );
+ })();
+
+ const stop = () => stream.stop(historyValues, { onStop: options.onStop });
+
+ // --- TRANSPORT ---
+ const submit = async (
+ values: UpdateType | null | undefined,
+ submitOptions?: SubmitOptions
+ ) => {
+ // Unbranch things
+ const checkpointId = submitOptions?.checkpoint?.checkpoint_id;
+ setBranch(
+ checkpointId != null
+ ? branchContext.branchByCheckpoint[checkpointId]?.branch ?? ""
+ : ""
+ );
+
+ stream.setStreamValues(() => {
+ if (submitOptions?.optimisticValues != null) {
+ return {
+ ...historyValues,
+ ...(typeof submitOptions.optimisticValues === "function"
+ ? submitOptions.optimisticValues(historyValues)
+ : submitOptions.optimisticValues),
+ };
+ }
+
+ return { ...historyValues };
+ });
+
+ // When `fetchStateHistory` is requested, thus we assume that branching
+ // is enabled. We then need to include the implicit branch.
+ const includeImplicitBranch =
+ historyLimit === true || typeof historyLimit === "number";
+
+ let callbackMeta: RunCallbackMeta | undefined;
+ let rejoinKey: `lg:stream:${string}` | undefined;
+ let usableThreadId = threadId;
+
+ await stream.start(
+ async (signal: AbortSignal) => {
+ if (!usableThreadId) {
+ const thread = await client.threads.create({
+ threadId: submitOptions?.threadId,
+ metadata: submitOptions?.metadata,
+ });
+ onThreadId(thread.thread_id);
+ usableThreadId = thread.thread_id;
+ }
+
+ if (!usableThreadId) {
+ throw new Error("Failed to obtain valid thread ID.");
+ }
+
+ const streamMode = unique([
+ ...(submitOptions?.streamMode ?? []),
+ ...trackStreamModeRef.current,
+ ...callbackStreamMode,
+ ]);
+
+ let checkpoint =
+ submitOptions?.checkpoint ??
+ (includeImplicitBranch
+ ? branchContext.threadHead?.checkpoint
+ : undefined) ??
+ undefined;
+
+ // Avoid specifying a checkpoint if user explicitly set it to null
+ if (submitOptions?.checkpoint === null) checkpoint = undefined;
+
+ // eslint-disable-next-line @typescript-eslint/ban-ts-comment
+ // @ts-expect-error
+ if (checkpoint != null) delete checkpoint.thread_id;
+ const streamResumable =
+ submitOptions?.streamResumable ?? !!runMetadataStorage;
+
+ return client.runs.stream(usableThreadId, options.assistantId, {
+ input: values as Record,
+ config: submitOptions?.config,
+ context: submitOptions?.context,
+ command: submitOptions?.command,
+
+ interruptBefore: submitOptions?.interruptBefore,
+ interruptAfter: submitOptions?.interruptAfter,
+ metadata: submitOptions?.metadata,
+ multitaskStrategy: submitOptions?.multitaskStrategy,
+ onCompletion: submitOptions?.onCompletion,
+ onDisconnect:
+ submitOptions?.onDisconnect ??
+ (streamResumable ? "continue" : "cancel"),
+
+ signal,
+
+ checkpoint,
+ streamMode,
+ streamSubgraphs: submitOptions?.streamSubgraphs,
+ streamResumable,
+ durability: submitOptions?.durability,
+ onRunCreated(params) {
+ callbackMeta = {
+ run_id: params.run_id,
+ thread_id: params.thread_id ?? usableThreadId!,
+ };
+
+ if (runMetadataStorage) {
+ rejoinKey = `lg:stream:${usableThreadId}`;
+ runMetadataStorage.setItem(rejoinKey, callbackMeta.run_id);
+ }
+
+ options.onCreated?.(callbackMeta);
+ },
+ }) as AsyncGenerator<
+ EventStreamEvent
+ >;
+ },
+ {
+ getMessages,
+ setMessages,
+
+ initialValues: historyValues,
+ callbacks: options,
+
+ async onSuccess() {
+ if (rejoinKey) runMetadataStorage?.removeItem(rejoinKey);
+ const shouldRefetch =
+ // We're expecting the whole thread state in onFinish
+ options.onFinish != null ||
+ // We're fetching history, thus we need the latest checkpoint
+ // to ensure we're not accidentally submitting to a wrong branch
+ includeImplicitBranch;
+
+ if (shouldRefetch) {
+ const newHistory = await history.mutate(usableThreadId!);
+ const lastHead = newHistory.at(0);
+ if (lastHead) {
+ // We now have the latest update from /history
+ // Thus we can clear the local stream state
+ options.onFinish?.(lastHead, callbackMeta);
+ return null;
+ }
+ }
+
+ return undefined;
+ },
+ onError(error) {
+ options.onError?.(error, callbackMeta);
+ },
+ }
+ );
+ };
+
+ const joinStream = async (
+ runId: string,
+ lastEventId?: string,
+ joinOptions?: { streamMode?: StreamMode | StreamMode[] }
+ ) => {
+ // eslint-disable-next-line no-param-reassign
+ lastEventId ??= "-1";
+ if (!threadId) return;
+
+ const callbackMeta: RunCallbackMeta = {
+ thread_id: threadId,
+ run_id: runId,
+ };
+
+ await stream.start(
+ async (signal: AbortSignal) => {
+ return client.runs.joinStream(threadId, runId, {
+ signal,
+ lastEventId,
+ streamMode: joinOptions?.streamMode,
+ }) as AsyncGenerator<
+ EventStreamEvent
+ >;
+ },
+ {
+ getMessages,
+ setMessages,
+
+ initialValues: historyValues,
+ callbacks: options,
+ async onSuccess() {
+ runMetadataStorage?.removeItem(`lg:stream:${threadId}`);
+ const newHistory = await history.mutate(threadId);
+ const lastHead = newHistory.at(0);
+ if (lastHead) options.onFinish?.(lastHead, callbackMeta);
+ },
+ onError(error) {
+ options.onError?.(error, callbackMeta);
+ },
+ }
+ );
+ };
+
+ const reconnectKey = useMemo(() => {
+ if (!runMetadataStorage || stream.isLoading) return undefined;
+ if (typeof window === "undefined") return undefined;
+ const runId = runMetadataStorage?.getItem(`lg:stream:${threadId}`);
+ if (!runId) return undefined;
+ return { runId, threadId };
+ }, [runMetadataStorage, stream.isLoading, threadId]);
+
+ const shouldReconnect = !!runMetadataStorage;
+ const reconnectRef = useRef({ threadId, shouldReconnect });
+
+ const joinStreamRef = useRef(joinStream);
+ joinStreamRef.current = joinStream;
+
+ useEffect(() => {
+ // reset shouldReconnect when switching threads
+ if (reconnectRef.current.threadId !== threadId) {
+ reconnectRef.current = { threadId, shouldReconnect };
+ }
+ }, [threadId, shouldReconnect]);
+
+ useEffect(() => {
+ if (reconnectKey && reconnectRef.current.shouldReconnect) {
+ reconnectRef.current.shouldReconnect = false;
+ void joinStreamRef.current?.(reconnectKey.runId);
+ }
+ }, [reconnectKey]);
+ // --- END TRANSPORT ---
+
+ const error = stream.error ?? historyError ?? history.error;
+ const values = stream.values ?? historyValues;
+
+ return {
+ get values() {
+ trackStreamMode("values");
+ return values;
+ },
+
+ client,
+ assistantId: options.assistantId,
+
+ error,
+ isLoading: stream.isLoading,
+
+ stop,
+ submit,
+
+ joinStream,
+
+ branch,
+ setBranch,
+
+ get history() {
+ if (historyLimit === false) {
+ throw new Error(
+ "`fetchStateHistory` must be set to `true` to use `history`"
+ );
+ }
+
+ return branchContext.flatHistory;
+ },
+
+ isThreadLoading: history.isLoading && history.data == null,
+
+ get experimental_branchTree() {
+ if (historyLimit === false) {
+ throw new Error(
+ "`fetchStateHistory` must be set to `true` to use `experimental_branchTree`"
+ );
+ }
+
+ return branchContext.branchTree;
+ },
+
+ get interrupt() {
+ if (
+ values != null &&
+ "__interrupt__" in values &&
+ Array.isArray(values.__interrupt__)
+ ) {
+ const valueInterrupts = values.__interrupt__;
+ if (valueInterrupts.length === 0) return { when: "breakpoint" };
+ if (valueInterrupts.length === 1) return valueInterrupts[0];
+
+ // TODO: fix the typing of interrupts if multiple interrupts are returned
+ return valueInterrupts;
+ }
+
+ // If we're deferring to old interrupt detection logic, don't show the interrupt if the stream is loading
+ if (stream.isLoading) return undefined;
+
+ const interrupts = branchContext.threadHead?.tasks?.at(-1)?.interrupts;
+ if (interrupts == null || interrupts.length === 0) {
+ // check if there's a next task present
+ const next = branchContext.threadHead?.next ?? [];
+ if (!next.length || error != null) return undefined;
+ return { when: "breakpoint" };
+ }
+
+ // Return only the current interrupt
+ return interrupts.at(-1) as Interrupt | undefined;
+ },
+
+ get messages() {
+ trackStreamMode("messages-tuple", "values");
+ return getMessages(values);
+ },
+
+ getMessagesMetadata(
+ message: Message,
+ index?: number
+ ): MessageMetadata | undefined {
+ trackStreamMode("values");
+
+ const streamMetadata = messageManager.get(message.id)?.metadata;
+ const historyMetadata = messageMetadata?.find(
+ (m) => m.messageId === (message.id ?? index)
+ );
+
+ if (streamMetadata != null || historyMetadata != null) {
+ return {
+ ...historyMetadata,
+ streamMetadata,
+ } as MessageMetadata;
+ }
+
+ return undefined;
+ },
+ };
+}
diff --git a/libs/sdk/src/react/stream.tsx b/libs/sdk/src/react/stream.tsx
index 3be4184a7..64bce0871 100644
--- a/libs/sdk/src/react/stream.tsx
+++ b/libs/sdk/src/react/stream.tsx
@@ -1,148 +1,30 @@
-/* __LC_ALLOW_ENTRYPOINT_SIDE_EFFECTS__ */
-
-"use client";
-
+import { useState } from "react";
+import { useStreamLGP } from "./stream.lgp.js";
+import { useStreamCustom } from "./stream.custom.js";
import {
- type RefObject,
- useCallback,
- useEffect,
- useMemo,
- useRef,
- useState,
- useSyncExternalStore,
-} from "react";
-import { findLast, unique } from "./utils.js";
-import { StreamError } from "./errors.js";
-import { getBranchContext } from "./branching.js";
-import { EventStreamEvent, StreamManager } from "./manager.js";
-import type {
BagTemplate,
- UseStreamOptions,
UseStream,
- GetUpdateType,
- GetCustomEventType,
- GetInterruptType,
- GetConfigurableType,
- RunCallbackMeta,
- SubmitOptions,
- MessageMetadata,
+ UseStreamCustom,
+ UseStreamCustomOptions,
+ UseStreamOptions,
} from "./types.js";
-import { Client, getClientConfigHash } from "../client.js";
-import type { Message } from "../types.messages.js";
-import type { Interrupt, ThreadState } from "../schema.js";
-import type { StreamMode } from "../types.stream.js";
-import { MessageTupleManager } from "./messages.js";
-
-function fetchHistory>(
- client: Client,
- threadId: string,
- options?: { limit?: boolean | number }
-) {
- if (options?.limit === false) {
- return client.threads.getState(threadId).then((state) => {
- if (state.checkpoint == null) return [];
- return [state];
- });
- }
-
- const limit = typeof options?.limit === "number" ? options.limit : 10;
- return client.threads.getHistory(threadId, { limit });
-}
-
-function useThreadHistory>(
- threadId: string | undefined | null,
- client: Client,
- limit: boolean | number,
- clearCallbackRef: RefObject<(() => void) | undefined>,
- submittingRef: RefObject,
- onErrorRef: RefObject<((error: unknown) => void) | undefined>
-) {
- const [history, setHistory] = useState[] | undefined>(
- undefined
- );
- const [isLoading, setIsLoading] = useState(() => {
- if (threadId == null) return false;
- return true;
- });
- const [error, setError] = useState(undefined);
-
- const clientHash = getClientConfigHash(client);
- const clientRef = useRef(client);
- clientRef.current = client;
-
- const fetcher = useCallback(
- (
- threadId: string | undefined | null
- ): Promise[]> => {
- if (threadId != null) {
- const client = clientRef.current;
-
- setIsLoading(true);
- return fetchHistory(client, threadId, {
- limit,
- })
- .then(
- (history) => {
- setHistory(history);
- return history;
- },
- (error) => {
- setError(error);
- onErrorRef.current?.(error);
- return Promise.reject(error);
- }
- )
- .finally(() => {
- setIsLoading(false);
- });
- }
-
- setHistory(undefined);
- setError(undefined);
- setIsLoading(false);
-
- clearCallbackRef.current?.();
- return Promise.resolve([]);
- },
- [clearCallbackRef, onErrorRef, limit]
- );
-
- useEffect(() => {
- if (submittingRef.current) return;
- void fetcher(threadId);
- }, [fetcher, submittingRef, clientHash, limit, threadId]);
- return {
- data: history,
- isLoading,
- error,
- mutate: (mutateId?: string) => fetcher(mutateId ?? threadId),
- };
+function isCustomOptions<
+ StateType extends Record = Record,
+ Bag extends {
+ ConfigurableType?: Record;
+ InterruptType?: unknown;
+ CustomEventType?: unknown;
+ UpdateType?: unknown;
+ } = BagTemplate
+>(
+ options:
+ | UseStreamOptions
+ | UseStreamCustomOptions
+): options is UseStreamCustomOptions {
+ return "transport" in options;
}
-const useControllableThreadId = (options?: {
- threadId?: string | null;
- onThreadId?: (threadId: string) => void;
-}): [string | null, (threadId: string) => void] => {
- const [localThreadId, _setLocalThreadId] = useState(
- options?.threadId ?? null
- );
-
- const onThreadIdRef = useRef(options?.onThreadId);
- onThreadIdRef.current = options?.onThreadId;
-
- const onThreadId = useCallback((threadId: string) => {
- _setLocalThreadId(threadId);
- onThreadIdRef.current?.(threadId);
- }, []);
-
- if (!options || !("threadId" in options)) {
- return [localThreadId, onThreadId];
- }
-
- return [options.threadId ?? null, onThreadId];
-};
-
export function useStream<
StateType extends Record = Record,
Bag extends {
@@ -151,502 +33,41 @@ export function useStream<
CustomEventType?: unknown;
UpdateType?: unknown;
} = BagTemplate
->(options: UseStreamOptions): UseStream {
- type UpdateType = GetUpdateType;
- type CustomType = GetCustomEventType;
- type InterruptType = GetInterruptType;
- type ConfigurableType = GetConfigurableType;
-
- const reconnectOnMountRef = useRef(options.reconnectOnMount);
- const runMetadataStorage = useMemo(() => {
- if (typeof window === "undefined") return null;
- const storage = reconnectOnMountRef.current;
- if (storage === true) return window.sessionStorage;
- if (typeof storage === "function") return storage();
- return null;
- }, []);
-
- const client = useMemo(
- () =>
- options.client ??
- new Client({
- apiUrl: options.apiUrl,
- apiKey: options.apiKey,
- callerOptions: options.callerOptions,
- defaultHeaders: options.defaultHeaders,
- }),
- [
- options.client,
- options.apiKey,
- options.apiUrl,
- options.callerOptions,
- options.defaultHeaders,
- ]
- );
-
- const [messageManager] = useState(() => new MessageTupleManager());
- const [stream] = useState(
- () => new StreamManager(messageManager)
- );
-
- useSyncExternalStore(
- stream.subscribe,
- stream.getSnapshot,
- stream.getSnapshot
- );
-
- const [threadId, onThreadId] = useControllableThreadId(options);
- const trackStreamModeRef = useRef[]>([]);
-
- const trackStreamMode = useCallback(
- (...mode: Exclude[]) => {
- const ref = trackStreamModeRef.current;
- for (const m of mode) {
- if (!ref.includes(m)) ref.push(m);
- }
- },
- []
- );
-
- const hasUpdateListener = options.onUpdateEvent != null;
- const hasCustomListener = options.onCustomEvent != null;
- const hasLangChainListener = options.onLangChainEvent != null;
- const hasDebugListener = options.onDebugEvent != null;
- const hasCheckpointListener = options.onCheckpointEvent != null;
- const hasTaskListener = options.onTaskEvent != null;
-
- const callbackStreamMode = useMemo(() => {
- const modes: Exclude[] = [];
- if (hasUpdateListener) modes.push("updates");
- if (hasCustomListener) modes.push("custom");
- if (hasLangChainListener) modes.push("events");
- if (hasDebugListener) modes.push("debug");
- if (hasCheckpointListener) modes.push("checkpoints");
- if (hasTaskListener) modes.push("tasks");
- return modes;
- }, [
- hasUpdateListener,
- hasCustomListener,
- hasLangChainListener,
- hasDebugListener,
- hasCheckpointListener,
- hasTaskListener,
- ]);
-
- const clearCallbackRef = useRef<() => void>(null!);
- clearCallbackRef.current = stream.clear;
-
- const submittingRef = useRef(false);
- submittingRef.current = stream.isLoading;
-
- const onErrorRef = useRef<
- ((error: unknown, run?: RunCallbackMeta) => void) | undefined
- >(undefined);
- onErrorRef.current = options.onError;
-
- const historyLimit =
- typeof options.fetchStateHistory === "object" &&
- options.fetchStateHistory != null
- ? options.fetchStateHistory.limit ?? false
- : options.fetchStateHistory ?? false;
-
- const history = useThreadHistory(
- threadId,
- client,
- historyLimit,
- clearCallbackRef,
- submittingRef,
- onErrorRef
- );
-
- const getMessages = (value: StateType): Message[] => {
- const messagesKey = options.messagesKey ?? "messages";
- return Array.isArray(value[messagesKey]) ? value[messagesKey] : [];
- };
-
- const setMessages = (current: StateType, messages: Message[]): StateType => {
- const messagesKey = options.messagesKey ?? "messages";
- return { ...current, [messagesKey]: messages };
- };
-
- const [branch, setBranch] = useState("");
- const branchContext = getBranchContext(branch, history.data);
-
- const historyValues =
- branchContext.threadHead?.values ??
- options.initialValues ??
- ({} as StateType);
-
- const historyError = (() => {
- const error = branchContext.threadHead?.tasks?.at(-1)?.error;
- if (error == null) return undefined;
- try {
- const parsed = JSON.parse(error) as unknown;
- if (StreamError.isStructuredError(parsed)) return new StreamError(parsed);
- return parsed;
- } catch {
- // do nothing
- }
- return error;
- })();
-
- const messageMetadata = (() => {
- const alreadyShown = new Set();
- return getMessages(historyValues).map(
- (message, idx): Omit, "streamMetadata"> => {
- const messageId = message.id ?? idx;
-
- // Find the first checkpoint where the message was seen
- const firstSeenState = findLast(history.data ?? [], (state) =>
- getMessages(state.values)
- .map((m, idx) => m.id ?? idx)
- .includes(messageId)
- );
-
- const checkpointId = firstSeenState?.checkpoint?.checkpoint_id;
- let branch =
- checkpointId != null
- ? branchContext.branchByCheckpoint[checkpointId]
- : undefined;
- if (!branch?.branch?.length) branch = undefined;
-
- // serialize branches
- const optionsShown = branch?.branchOptions?.flat(2).join(",");
- if (optionsShown) {
- if (alreadyShown.has(optionsShown)) branch = undefined;
- alreadyShown.add(optionsShown);
- }
-
- return {
- messageId: messageId.toString(),
- firstSeenState,
-
- branch: branch?.branch,
- branchOptions: branch?.branchOptions,
- };
- }
- );
- })();
-
- const stop = () => stream.stop(historyValues, { onStop: options.onStop });
-
- // --- TRANSPORT ---
- const submit = async (
- values: UpdateType | null | undefined,
- submitOptions?: SubmitOptions
- ) => {
- // Unbranch things
- const checkpointId = submitOptions?.checkpoint?.checkpoint_id;
- setBranch(
- checkpointId != null
- ? branchContext.branchByCheckpoint[checkpointId]?.branch ?? ""
- : ""
- );
-
- stream.setStreamValues(() => {
- if (submitOptions?.optimisticValues != null) {
- return {
- ...historyValues,
- ...(typeof submitOptions.optimisticValues === "function"
- ? submitOptions.optimisticValues(historyValues)
- : submitOptions.optimisticValues),
- };
- }
-
- return { ...historyValues };
- });
-
- // When `fetchStateHistory` is requested, thus we assume that branching
- // is enabled. We then need to include the implicit branch.
- const includeImplicitBranch =
- historyLimit === true || typeof historyLimit === "number";
-
- let callbackMeta: RunCallbackMeta | undefined;
- let rejoinKey: `lg:stream:${string}` | undefined;
- let usableThreadId = threadId;
-
- await stream.start(
- async (signal: AbortSignal) => {
- if (!usableThreadId) {
- const thread = await client.threads.create({
- threadId: submitOptions?.threadId,
- metadata: submitOptions?.metadata,
- });
- onThreadId(thread.thread_id);
- usableThreadId = thread.thread_id;
- }
-
- if (!usableThreadId) {
- throw new Error("Failed to obtain valid thread ID.");
- }
+>(options: UseStreamOptions): UseStream;
- const streamMode = unique([
- ...(submitOptions?.streamMode ?? []),
- ...trackStreamModeRef.current,
- ...callbackStreamMode,
- ]);
-
- let checkpoint =
- submitOptions?.checkpoint ??
- (includeImplicitBranch
- ? branchContext.threadHead?.checkpoint
- : undefined) ??
- undefined;
-
- // Avoid specifying a checkpoint if user explicitly set it to null
- if (submitOptions?.checkpoint === null) checkpoint = undefined;
-
- // eslint-disable-next-line @typescript-eslint/ban-ts-comment
- // @ts-expect-error
- if (checkpoint != null) delete checkpoint.thread_id;
- const streamResumable =
- submitOptions?.streamResumable ?? !!runMetadataStorage;
-
- return client.runs.stream(usableThreadId, options.assistantId, {
- input: values as Record,
- config: submitOptions?.config,
- context: submitOptions?.context,
- command: submitOptions?.command,
-
- interruptBefore: submitOptions?.interruptBefore,
- interruptAfter: submitOptions?.interruptAfter,
- metadata: submitOptions?.metadata,
- multitaskStrategy: submitOptions?.multitaskStrategy,
- onCompletion: submitOptions?.onCompletion,
- onDisconnect:
- submitOptions?.onDisconnect ??
- (streamResumable ? "continue" : "cancel"),
-
- signal,
-
- checkpoint,
- streamMode,
- streamSubgraphs: submitOptions?.streamSubgraphs,
- streamResumable,
- durability: submitOptions?.durability,
- onRunCreated(params) {
- callbackMeta = {
- run_id: params.run_id,
- thread_id: params.thread_id ?? usableThreadId!,
- };
-
- if (runMetadataStorage) {
- rejoinKey = `lg:stream:${usableThreadId}`;
- runMetadataStorage.setItem(rejoinKey, callbackMeta.run_id);
- }
-
- options.onCreated?.(callbackMeta);
- },
- }) as AsyncGenerator<
- EventStreamEvent
- >;
- },
- {
- getMessages,
- setMessages,
-
- initialValues: historyValues,
- callbacks: options,
-
- async onSuccess() {
- if (rejoinKey) runMetadataStorage?.removeItem(rejoinKey);
- const shouldRefetch =
- // We're expecting the whole thread state in onFinish
- options.onFinish != null ||
- // We're fetching history, thus we need the latest checkpoint
- // to ensure we're not accidentally submitting to a wrong branch
- includeImplicitBranch;
-
- if (shouldRefetch) {
- const newHistory = await history.mutate(usableThreadId!);
- const lastHead = newHistory.at(0);
- if (lastHead) {
- // We now have the latest update from /history
- // Thus we can clear the local stream state
- options.onFinish?.(lastHead, callbackMeta);
- return null;
- }
- }
-
- return undefined;
- },
- onError(error) {
- options.onError?.(error, callbackMeta);
- },
- }
- );
- };
-
- const joinStream = async (
- runId: string,
- lastEventId?: string,
- joinOptions?: { streamMode?: StreamMode | StreamMode[] }
- ) => {
- // eslint-disable-next-line no-param-reassign
- lastEventId ??= "-1";
- if (!threadId) return;
-
- const callbackMeta: RunCallbackMeta = {
- thread_id: threadId,
- run_id: runId,
- };
-
- await stream.start(
- async (signal: AbortSignal) => {
- return client.runs.joinStream(threadId, runId, {
- signal,
- lastEventId,
- streamMode: joinOptions?.streamMode,
- }) as AsyncGenerator<
- EventStreamEvent
- >;
- },
- {
- getMessages,
- setMessages,
-
- initialValues: historyValues,
- callbacks: options,
- async onSuccess() {
- runMetadataStorage?.removeItem(`lg:stream:${threadId}`);
- const newHistory = await history.mutate(threadId);
- const lastHead = newHistory.at(0);
- if (lastHead) options.onFinish?.(lastHead, callbackMeta);
- },
- onError(error) {
- options.onError?.(error, callbackMeta);
- },
- }
- );
- };
-
- const reconnectKey = useMemo(() => {
- if (!runMetadataStorage || stream.isLoading) return undefined;
- if (typeof window === "undefined") return undefined;
- const runId = runMetadataStorage?.getItem(`lg:stream:${threadId}`);
- if (!runId) return undefined;
- return { runId, threadId };
- }, [runMetadataStorage, stream.isLoading, threadId]);
-
- const shouldReconnect = !!runMetadataStorage;
- const reconnectRef = useRef({ threadId, shouldReconnect });
-
- const joinStreamRef = useRef(joinStream);
- joinStreamRef.current = joinStream;
-
- useEffect(() => {
- // reset shouldReconnect when switching threads
- if (reconnectRef.current.threadId !== threadId) {
- reconnectRef.current = { threadId, shouldReconnect };
- }
- }, [threadId, shouldReconnect]);
-
- useEffect(() => {
- if (reconnectKey && reconnectRef.current.shouldReconnect) {
- reconnectRef.current.shouldReconnect = false;
- void joinStreamRef.current?.(reconnectKey.runId);
- }
- }, [reconnectKey]);
- // --- END TRANSPORT ---
-
- const error = stream.error ?? historyError ?? history.error;
- const values = stream.values ?? historyValues;
-
- return {
- get values() {
- trackStreamMode("values");
- return values;
- },
-
- client,
- assistantId: options.assistantId,
-
- error,
- isLoading: stream.isLoading,
-
- stop,
- submit,
-
- joinStream,
-
- branch,
- setBranch,
-
- get history() {
- if (historyLimit === false) {
- throw new Error(
- "`fetchStateHistory` must be set to `true` to use `history`"
- );
- }
-
- return branchContext.flatHistory;
- },
-
- isThreadLoading: history.isLoading && history.data == null,
-
- get experimental_branchTree() {
- if (historyLimit === false) {
- throw new Error(
- "`fetchStateHistory` must be set to `true` to use `experimental_branchTree`"
- );
- }
-
- return branchContext.branchTree;
- },
-
- get interrupt() {
- if (
- values != null &&
- "__interrupt__" in values &&
- Array.isArray(values.__interrupt__)
- ) {
- const valueInterrupts = values.__interrupt__;
- if (valueInterrupts.length === 0) return { when: "breakpoint" };
- if (valueInterrupts.length === 1) return valueInterrupts[0];
-
- // TODO: fix the typing of interrupts if multiple interrupts are returned
- return valueInterrupts;
- }
-
- // If we're deferring to old interrupt detection logic, don't show the interrupt if the stream is loading
- if (stream.isLoading) return undefined;
-
- const interrupts = branchContext.threadHead?.tasks?.at(-1)?.interrupts;
- if (interrupts == null || interrupts.length === 0) {
- // check if there's a next task present
- const next = branchContext.threadHead?.next ?? [];
- if (!next.length || error != null) return undefined;
- return { when: "breakpoint" };
- }
-
- // Return only the current interrupt
- return interrupts.at(-1) as Interrupt | undefined;
- },
-
- get messages() {
- trackStreamMode("messages-tuple", "values");
- return getMessages(values);
- },
-
- getMessagesMetadata(
- message: Message,
- index?: number
- ): MessageMetadata | undefined {
- trackStreamMode("values");
-
- const streamMetadata = messageManager.get(message.id)?.metadata;
- const historyMetadata = messageMetadata?.find(
- (m) => m.messageId === (message.id ?? index)
- );
+export function useStream<
+ StateType extends Record = Record,
+ Bag extends {
+ ConfigurableType?: Record;
+ InterruptType?: unknown;
+ CustomEventType?: unknown;
+ UpdateType?: unknown;
+ } = BagTemplate
+>(
+ options: UseStreamCustomOptions
+): UseStreamCustom;
- if (streamMetadata != null || historyMetadata != null) {
- return {
- ...historyMetadata,
- streamMetadata,
- } as MessageMetadata;
- }
+export function useStream<
+ StateType extends Record = Record,
+ Bag extends {
+ ConfigurableType?: Record;
+ InterruptType?: unknown;
+ CustomEventType?: unknown;
+ UpdateType?: unknown;
+ } = BagTemplate
+>(
+ options:
+ | UseStreamOptions
+ | UseStreamCustomOptions
+): UseStream | UseStreamCustom {
+ // Store this in useState to make sure we're not changing the implementation in re-renders
+ const [isCustom] = useState(isCustomOptions(options));
+
+ if (isCustom) {
+ // eslint-disable-next-line react-hooks/rules-of-hooks
+ return useStreamCustom(options as UseStreamCustomOptions);
+ }
- return undefined;
- },
- };
+ // eslint-disable-next-line react-hooks/rules-of-hooks
+ return useStreamLGP(options as UseStreamOptions);
}
diff --git a/libs/sdk/src/react/types.tsx b/libs/sdk/src/react/types.tsx
index 3a9890490..73b326530 100644
--- a/libs/sdk/src/react/types.tsx
+++ b/libs/sdk/src/react/types.tsx
@@ -436,3 +436,59 @@ export interface SubmitOptions<
*/
threadId?: string;
}
+
+/**
+ * Transport used to stream the thread.
+ * Only applicable for custom endpoints using `toLangGraphEventStream` or `toLangGraphEventStreamResponse`.
+ */
+export interface UseStreamTransport<
+ StateType extends Record = Record,
+ Bag extends BagTemplate = BagTemplate
+> {
+ stream: (payload: {
+ input: GetUpdateType | null | undefined;
+ context: GetConfigurableType | undefined;
+ command: Command | undefined;
+ signal: AbortSignal;
+ }) => Promise>;
+}
+
+export type UseStreamCustomOptions<
+ StateType extends Record = Record,
+ Bag extends BagTemplate = BagTemplate
+> = Pick<
+ UseStreamOptions,
+ | "messagesKey"
+ | "onError"
+ | "onCreated"
+ | "onUpdateEvent"
+ | "onCustomEvent"
+ | "onMetadataEvent"
+ | "onLangChainEvent"
+ | "onDebugEvent"
+ | "onCheckpointEvent"
+ | "onTaskEvent"
+ | "onStop"
+ | "initialValues"
+> & { transport: UseStreamTransport };
+
+export type UseStreamCustom<
+ StateType extends Record = Record,
+ Bag extends BagTemplate = BagTemplate
+> = Pick<
+ UseStream,
+ "values" | "error" | "isLoading" | "stop" | "interrupt" | "messages"
+> & {
+ submit: (
+ values: GetUpdateType | null | undefined,
+ options?: CustomSubmitOptions>
+ ) => Promise;
+};
+
+export type CustomSubmitOptions<
+ StateType extends Record = Record,
+ ConfigurableType extends Record = Record
+> = Pick<
+ SubmitOptions,
+ "optimisticValues" | "context" | "command"
+>;
diff --git a/package.json b/package.json
index 2f48dfb6d..addad0fdb 100644
--- a/package.json
+++ b/package.json
@@ -16,6 +16,7 @@
"workspaces": [
"docs",
"examples",
+ "examples/*",
"libs/*",
"internal/*"
],
diff --git a/yarn.lock b/yarn.lock
index e99a0a8f6..5649918e1 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -2276,6 +2276,19 @@ __metadata:
languageName: node
linkType: hard
+"@langchain/openai@npm:^1.0.0-alpha":
+ version: 1.0.0-alpha.1
+ resolution: "@langchain/openai@npm:1.0.0-alpha.1"
+ dependencies:
+ js-tiktoken: "npm:^1.0.12"
+ openai: "npm:5.12.2"
+ zod: "npm:^3.25.32"
+ peerDependencies:
+ "@langchain/core": ">=1.0.0-alpha <2.0.0"
+ checksum: 10/1ed24517585a3d516c8f5d652e05eaf5b25080f681c360b4db70812093423abc5a2a41c3e7ec6264693a686724cb461fb79a0564bb238bda659d4267b23977de
+ languageName: node
+ linkType: hard
+
"@langchain/openai@npm:~0.3.0":
version: 0.3.0
resolution: "@langchain/openai@npm:0.3.0"
@@ -10957,6 +10970,23 @@ __metadata:
languageName: node
linkType: hard
+"openai@npm:5.12.2":
+ version: 5.12.2
+ resolution: "openai@npm:5.12.2"
+ peerDependencies:
+ ws: ^8.18.0
+ zod: ^3.23.8
+ peerDependenciesMeta:
+ ws:
+ optional: true
+ zod:
+ optional: true
+ bin:
+ openai: bin/cli
+ checksum: 10/9e2eea1a888ff99ebf61a7715d6130ae21c3b3e5d56508fd5080adbe30bb1891ae44e668abf2d1a714b7a148485be2bbaa0019fe66fc91bdf44e7b5ef7aedcad
+ languageName: node
+ linkType: hard
+
"openai@npm:^4.57.3, openai@npm:^4.77.0, openai@npm:^4.87.3":
version: 4.104.0
resolution: "openai@npm:4.104.0"
@@ -13147,6 +13177,29 @@ __metadata:
languageName: node
linkType: hard
+"stream-transport-vite@workspace:examples/stream-transport-vite":
+ version: 0.0.0-use.local
+ resolution: "stream-transport-vite@workspace:examples/stream-transport-vite"
+ dependencies:
+ "@hono/node-server": "npm:^1.12.0"
+ "@langchain/core": "npm:^1.0.0-alpha"
+ "@langchain/langgraph": "workspace:*"
+ "@langchain/langgraph-sdk": "workspace:*"
+ "@langchain/openai": "npm:^1.0.0-alpha"
+ "@types/react": "npm:^19.0.8"
+ "@types/react-dom": "npm:^19.0.3"
+ "@vitejs/plugin-react": "npm:^4.4.1"
+ hono: "npm:^4.8.2"
+ prettier: "npm:^2.8.3"
+ react: "npm:^19.0.0"
+ react-dom: "npm:^19.0.0"
+ tsx: "npm:^4.19.3"
+ typescript: "npm:~5.8.3"
+ vite: "npm:^6.0.0"
+ zod: "npm:^3.23.8"
+ languageName: unknown
+ linkType: soft
+
"streamx@npm:^2.15.0":
version: 2.18.0
resolution: "streamx@npm:2.18.0"
@@ -14125,7 +14178,7 @@ __metadata:
languageName: node
linkType: hard
-"typescript@npm:^4.9.5 || ^5.4.5, typescript@npm:^5.2.2, typescript@npm:^5.4.5":
+"typescript@npm:^4.9.5 || ^5.4.5, typescript@npm:^5.2.2, typescript@npm:^5.4.5, typescript@npm:~5.8.3":
version: 5.8.3
resolution: "typescript@npm:5.8.3"
bin:
@@ -14145,7 +14198,7 @@ __metadata:
languageName: node
linkType: hard
-"typescript@patch:typescript@npm%3A^4.9.5 || ^5.4.5#optional!builtin, typescript@patch:typescript@npm%3A^5.2.2#optional!builtin, typescript@patch:typescript@npm%3A^5.4.5#optional!builtin":
+"typescript@patch:typescript@npm%3A^4.9.5 || ^5.4.5#optional!builtin, typescript@patch:typescript@npm%3A^5.2.2#optional!builtin, typescript@patch:typescript@npm%3A^5.4.5#optional!builtin, typescript@patch:typescript@npm%3A~5.8.3#optional!builtin":
version: 5.8.3
resolution: "typescript@patch:typescript@npm%3A5.8.3#optional!builtin::version=5.8.3&hash=5786d5"
bin:
@@ -14417,7 +14470,7 @@ __metadata:
languageName: node
linkType: hard
-"vite@npm:^5.0.0 || ^6.0.0":
+"vite@npm:^5.0.0 || ^6.0.0, vite@npm:^6.0.0":
version: 6.3.6
resolution: "vite@npm:6.3.6"
dependencies: