Skip to content

Commit

Permalink
feat(proxy): enable response streaming when supported
Browse files Browse the repository at this point in the history
  • Loading branch information
drochetti committed Jul 26, 2024
1 parent d9561c9 commit 13ca3a1
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 85 deletions.
2 changes: 1 addition & 1 deletion libs/proxy/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@fal-ai/serverless-proxy",
"version": "0.7.5",
"version": "0.8.0-alpha.1",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
30 changes: 28 additions & 2 deletions libs/proxy/src/express.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,37 @@ export const handler: RequestHandler = async (request, response, next) => {
await handleRequest({
id: 'express',
method: request.method,
respondWith: (status, data) => response.status(status).json(data),
getRequestBody: async () => JSON.stringify(request.body),
getHeaders: () => request.headers,
getHeader: (name) => request.headers[name],
sendHeader: (name, value) => response.setHeader(name, value),
getBody: async () => JSON.stringify(request.body),
respondWith: (status, data) => response.status(status).json(data),
sendResponse: async (res) => {
if (res.body instanceof ReadableStream) {
const reader = res.body.getReader();
const stream = async () => {
const { done, value } = await reader.read();
if (done) {
response.end();
return response;
}
response.write(value);
return await stream();
};

return await stream().catch((error) => {
if (!response.headersSent) {
response.status(500).send(error.message);
} else {
response.end();
}
});
}
if (res.headers.get('content-type')?.includes('application/json')) {
return response.status(res.status).json(await res.json());
}
return response.status(res.status).send(await res.text());
},
});
next();
};
14 changes: 6 additions & 8 deletions libs/proxy/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ export interface ProxyBehavior<ResponseType> {
method: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
respondWith(status: number, data: string | any): ResponseType;
sendResponse(response: Response): Promise<ResponseType>;
getHeaders(): Record<string, HeaderValue>;
getHeader(name: string): HeaderValue;
sendHeader(name: string, value: string): void;
getBody(): Promise<string | undefined>;
getRequestBody(): Promise<string | undefined>;
resolveApiKey?: () => Promise<string | undefined>;
}

Expand Down Expand Up @@ -109,7 +110,7 @@ export async function handleRequest<ResponseType>(
body:
behavior.method?.toUpperCase() === 'GET'
? undefined
: await behavior.getBody(),
: await behavior.getRequestBody(),
});

// copy headers from fal to the proxied response
Expand All @@ -119,12 +120,7 @@ export async function handleRequest<ResponseType>(
}
});

if (res.headers.get('content-type')?.includes('application/json')) {
const data = await res.json();
return behavior.respondWith(res.status, data);
}
const data = await res.text();
return behavior.respondWith(res.status, data);
return behavior.sendResponse(res);
}

export function fromHeaders(
Expand All @@ -138,3 +134,5 @@ export function fromHeaders(
});
return result;
}

export const responsePassthrough = (res: Response) => Promise.resolve(res);
31 changes: 24 additions & 7 deletions libs/proxy/src/nextjs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { NextResponse, type NextRequest } from 'next/server';
import type { NextApiHandler } from 'next/types';
import { DEFAULT_PROXY_ROUTE, fromHeaders, handleRequest } from './index';
import {
DEFAULT_PROXY_ROUTE,
fromHeaders,
handleRequest,
responsePassthrough,
} from './index';

/**
* The default Next API route for the fal.ai client proxy.
Expand All @@ -11,6 +16,8 @@ export const PROXY_ROUTE = DEFAULT_PROXY_ROUTE;
* The Next API route handler for the fal.ai client proxy.
* Use it with the /pages router in Next.js.
*
* Note: the page routers proxy doesn't support streaming responses.
*
* @param request the Next API request object.
* @param response the Next API response object.
* @returns a promise that resolves when the request is handled.
Expand All @@ -19,11 +26,17 @@ export const handler: NextApiHandler = async (request, response) => {
return handleRequest({
id: 'nextjs-page-router',
method: request.method || 'POST',
respondWith: (status, data) => response.status(status).json(data),
getRequestBody: async () => JSON.stringify(request.body),
getHeaders: () => request.headers,
getHeader: (name) => request.headers[name],
sendHeader: (name, value) => response.setHeader(name, value),
getBody: async () => JSON.stringify(request.body),
respondWith: (status, data) => response.status(status).json(data),
sendResponse: async (res) => {
if (res.headers.get('content-type')?.includes('application/json')) {
return response.status(res.status).json(await res.json());
}
return response.status(res.status).send(await res.text());
},
});
};

Expand All @@ -36,18 +49,22 @@ export const handler: NextApiHandler = async (request, response) => {
async function routeHandler(request: NextRequest) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const responseHeaders: Record<string, any> = {};

// check if response if from a streaming request

return await handleRequest({
id: 'nextjs-app-router',
method: request.method,
getRequestBody: async () => request.text(),
getHeaders: () => fromHeaders(request.headers),
getHeader: (name) => request.headers.get(name),
sendHeader: (name, value) => (responseHeaders[name] = value),
respondWith: (status, data) =>
NextResponse.json(data, {
status,
headers: responseHeaders,
}),
getHeaders: () => fromHeaders(request.headers),
getHeader: (name) => request.headers.get(name),
sendHeader: (name, value) => (responseHeaders[name] = value),
getBody: async () => request.text(),
sendResponse: responsePassthrough,
});
}

Expand Down
13 changes: 7 additions & 6 deletions libs/proxy/src/svelte.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { type RequestHandler } from '@sveltejs/kit';
import { fromHeaders, handleRequest } from './index';
import { fromHeaders, handleRequest, responsePassthrough } from './index';

type RequestHandlerParams = {
/**
Expand Down Expand Up @@ -28,16 +28,17 @@ export const createRequestHandler = ({
return await handleRequest({
id: 'svelte-app-router',
method: request.method,
getRequestBody: async () => request.text(),
getHeaders: () => fromHeaders(request.headers),
getHeader: (name) => request.headers.get(name),
sendHeader: (name, value) => (responseHeaders[name] = value),
resolveApiKey: () => Promise.resolve(FAL_KEY),
respondWith: (status, data) =>
new Response(JSON.stringify(data), {
status,
headers: responseHeaders,
}),
getHeaders: () => fromHeaders(request.headers),
getHeader: (name) => request.headers.get(name),
sendHeader: (name, value) => (responseHeaders[name] = value),
getBody: async () => request.text(),
resolveApiKey: () => Promise.resolve(FAL_KEY),
sendResponse: responsePassthrough,
});
};
return {
Expand Down
Loading

0 comments on commit 13ca3a1

Please sign in to comment.