Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions frontend/next.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ const config = {
devIndicators: false,
async rewrites() {
const rewrites = [];
const fallback = [];
const gatewayURL = getInternalServiceURL(
"DEER_FLOW_INTERNAL_GATEWAY_BASE_URL",
"http://127.0.0.1:8001",
);

if (!process.env.NEXT_PUBLIC_LANGGRAPH_BASE_URL) {
rewrites.push({
fallback.push({
Comment thread
LittleChenLiya marked this conversation as resolved.
Outdated
source: "/api/langgraph",
destination: `${gatewayURL}/api`,
});
rewrites.push({
fallback.push({
source: "/api/langgraph/:path*",
destination: `${gatewayURL}/api/:path*`,
});
Expand All @@ -57,20 +58,16 @@ const config = {
destination: `${gatewayURL}/api/skills/:path*`,
});

// Catch-all for remaining gateway API routes (models, threads, memory,
// mcp, artifacts, uploads, suggestions, runs, etc.) that don't have
// their own NEXT_PUBLIC_* env var toggle.
//
// NOTE: this must come AFTER the /api/langgraph rewrite above so that
// LangGraph-compatible routes keep their public prefix while Gateway
// receives its native /api/* paths.
rewrites.push({
fallback.push({
source: "/api/:path*",
destination: `${gatewayURL}/api/:path*`,
});
}

return rewrites;
return {
beforeFiles: rewrites,
fallback,
};
},
};

Expand Down
92 changes: 92 additions & 0 deletions frontend/src/app/api/langgraph/_lib/stream-proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { type NextRequest } from "next/server";

const DEFAULT_GATEWAY_BASE_URL = "http://127.0.0.1:8001";

const HOP_BY_HOP_HEADERS = [
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
] as const;

function getGatewayBaseUrl() {
const configured = process.env.DEER_FLOW_INTERNAL_GATEWAY_BASE_URL?.trim();
return (
configured && configured.length > 0 ? configured : DEFAULT_GATEWAY_BASE_URL
).replace(/\/+$/, "");
}
Comment thread
LittleChenLiya marked this conversation as resolved.
Outdated

function getConnectionHeaderNames(headers: Headers) {
return (
headers
.get("connection")
?.split(",")
.map((name) => name.trim().toLowerCase())
.filter(Boolean) ?? []
);
}

function deleteHopByHopHeaders(
headers: Headers,
additionalNames: string[] = [],
) {
for (const name of [...HOP_BY_HOP_HEADERS, ...additionalNames]) {
headers.delete(name);
}
}

function buildGatewayUrl(path: string[], search: string) {
const pathname = ["api", ...path.map(encodeURIComponent)].join("/");
return `${getGatewayBaseUrl()}/${pathname}${search}`;
}

function buildHeaders(request: NextRequest) {
const headers = new Headers(request.headers);
deleteHopByHopHeaders(headers, getConnectionHeaderNames(headers));
for (const name of ["host", "content-length"]) {
headers.delete(name);
}
headers.set("accept-encoding", "identity");
return headers;
}

export async function proxyLangGraphStream(
request: NextRequest,
path: string[],
) {
if (process.env.NEXT_PUBLIC_LANGGRAPH_BASE_URL) {
Comment thread
LittleChenLiya marked this conversation as resolved.
Outdated
return new Response(null, { status: 404 });
}

const target = buildGatewayUrl(path, request.nextUrl.search);
const method = request.method.toUpperCase();
const hasBody = !["GET", "HEAD", "OPTIONS"].includes(method);
const init: RequestInit & { duplex?: "half" } = {
method,
headers: buildHeaders(request),
redirect: "manual",
cache: "no-store",
signal: request.signal,
};

if (hasBody) {
init.body = request.body;
init.duplex = "half";
}

const upstream = await fetch(target, init);
const headers = new Headers(upstream.headers);
Comment thread
LittleChenLiya marked this conversation as resolved.
deleteHopByHopHeaders(headers, getConnectionHeaderNames(headers));
headers.delete("content-length");
headers.set("X-Accel-Buffering", "no");

return new Response(upstream.body, {
status: upstream.status,
statusText: upstream.statusText,
headers,
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { type NextRequest } from "next/server";

import { proxyLangGraphStream } from "@/app/api/langgraph/_lib/stream-proxy";

export const runtime = "nodejs";
export const dynamic = "force-dynamic";

type RouteContext = {
params: Promise<{ threadId: string; runId: string }>;
};

async function proxyRunJoinStream(request: NextRequest, context: RouteContext) {
// Keep resumable/join streams on the same direct proxy path as new run streams.
const { threadId, runId } = await context.params;
return proxyLangGraphStream(request, [
"threads",
threadId,
"runs",
runId,
"stream",
]);
}

export const GET = proxyRunJoinStream;
export const HEAD = proxyRunJoinStream;
export const POST = proxyRunJoinStream;
export const OPTIONS = proxyRunJoinStream;
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { type NextRequest } from "next/server";

import { proxyLangGraphStream } from "@/app/api/langgraph/_lib/stream-proxy";

export const runtime = "nodejs";
export const dynamic = "force-dynamic";

type RouteContext = {
params: Promise<{ threadId: string }>;
};

async function proxyRunStream(request: NextRequest, context: RouteContext) {
// Keep run streams out of Next rewrites because the dev proxy can buffer SSE
// events on Windows. Returning the upstream body directly preserves streaming.
const { threadId } = await context.params;
return proxyLangGraphStream(request, ["threads", threadId, "runs", "stream"]);
}

export const GET = proxyRunStream;
export const HEAD = proxyRunStream;
export const POST = proxyRunStream;
export const OPTIONS = proxyRunStream;
Loading
Loading