Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat: handle server side events #50

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions apps/demo-nextjs-app-router/app/sse/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
'use client';

import * as fal from '@fal-ai/serverless-client';
import { useState } from 'react';

// @snippet:start(client.config)
fal.config({
proxyUrl: '/api/fal/proxy', // the built-int nextjs proxy
// proxyUrl: 'http://localhost:3333/api/fal/proxy', // or your own external proxy
});
// @snippet:end

type ErrorProps = {
error: any;
};

function Error(props: ErrorProps) {
if (!props.error) {
return null;
}
return (
<div
className="p-4 mb-4 text-sm text-red-800 rounded bg-red-50 dark:bg-gray-800 dark:text-red-400"
role="alert"
>
<span className="font-medium">Error</span> {props.error.message}
</div>
);
}

const DEFAULT_PROMPT = 'What is a cat?';

export default function Home() {
// @snippet:start("client.ui.state")
// Input state
const [prompt, setPrompt] = useState<string>(DEFAULT_PROMPT);
// Result state
const [loading, setLoading] = useState(false);
const [error, setError] = useState<Error | null>(null);
const [result, setResult] = useState<any>(null);
const [logs, setLogs] = useState<string[]>([]);
const [elapsedTime, setElapsedTime] = useState<number>(0);
// @snippet:end

const reset = () => {
setLoading(false);
setError(null);
setResult(null);
setLogs([]);
setElapsedTime(0);
};

const generateImage = async () => {
reset();
// @snippet:start("client.queue.subscribe")
setLoading(true);
const start = Date.now();
try {
const result: any = await fal.listenForEvents<any, any>(
'38204337/mederllm',
{
prompt: 'hello there',
}
);
// Handle the received SSE data
console.log(result);
setResult(result);
} catch (error: any) {
setError(error);
} finally {
setLoading(false);
setElapsedTime(Date.now() - start);
}
// @snippet:end
};
return (
<div className="min-h-screen dark:bg-gray-900 bg-gray-100">
<main className="container dark:text-gray-50 text-gray-900 flex flex-col items-center justify-center w-full flex-1 py-10 space-y-8">
<h1 className="text-4xl font-bold mb-8">
Hello <code className="font-light text-pink-600">fal</code>
</h1>
<div className="text-lg w-full"></div>
<div className="text-lg w-full">
<label htmlFor="prompt" className="block mb-2 text-current">
Prompt
</label>
<input
className="w-full text-lg p-2 rounded bg-black/10 dark:bg-white/5 border border-black/20 dark:border-white/10"
id="prompt"
name="prompt"
placeholder="Imagine..."
value={prompt}
autoComplete="off"
onChange={(e) => setPrompt(e.target.value)}
onBlur={(e) => setPrompt(e.target.value.trim())}
/>
</div>

<button
onClick={(e) => {
e.preventDefault();
generateImage();
}}
className="bg-indigo-600 hover:bg-indigo-700 text-white font-bold text-lg py-3 px-6 mx-auto rounded focus:outline-none focus:shadow-outline"
disabled={loading}
>
{loading ? 'Generating...' : 'Send'}
</button>

<Error error={error} />

<div className="w-full flex flex-col space-y-4">
<div className="space-y-2">
<h3 className="text-xl font-light">JSON Result</h3>
<p className="text-sm text-current/80">
{`Elapsed Time (seconds): ${(elapsedTime / 1000).toFixed(2)}`}
</p>
<pre className="text-sm bg-black/70 text-white/80 font-mono h-60 rounded whitespace-pre overflow-auto w-full">
{result
? JSON.stringify(result, null, 2)
: '// result pending...'}
</pre>
</div>

<div className="space-y-2">
<h3 className="text-xl font-light">Logs</h3>
<pre className="text-sm bg-black/70 text-white/80 font-mono h-60 rounded whitespace-pre overflow-auto w-full">
{logs.filter(Boolean).join('\n')}
</pre>
</div>
</div>
</main>
</div>
);
}
19 changes: 18 additions & 1 deletion libs/client/src/function.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { dispatchRequest } from './request';
import { dispatchRequest, dispatchSSE } from './request';
import { storageImpl } from './storage';
import { EnqueueResult, QueueStatus } from './types';
import { ensureAppIdFormat, isUUIDv4, isValidUrl } from './utils';
Expand Down Expand Up @@ -112,6 +112,23 @@ export async function run<Input, Output>(
return send(id, options);
}

/**
* Listen for events from a fal function identified by its `id`.
*
* @param id the registered function revision id or alias.
* @returns the remote function output
*/
export async function listenForEvents<Input, Output>(
id: string,
options: RunOptions<Input> & ExtraOptions = {}
): Promise<Output> {
const input =
options.input && options.autoUpload !== false
? await storageImpl.transformInput(options.input)
: options.input;
return dispatchSSE<Input, Output>(buildUrl(id, options), input as Input);
}

/**
* Subscribes to updates for a specific request in the queue.
*
Expand Down
2 changes: 1 addition & 1 deletion libs/client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export { config, getConfig } from './config';
export { queue, run, subscribe } from './function';
export { listenForEvents, queue, run, subscribe } from './function';
export { withMiddleware, withProxy } from './middleware';
export type { RequestMiddleware } from './middleware';
export { realtimeImpl as realtime } from './realtime';
Expand Down
40 changes: 40 additions & 0 deletions libs/client/src/request.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import EventSource from 'eventsource';
import { getConfig } from './config';
import { getUserAgent, isBrowser } from './runtime';

Expand Down Expand Up @@ -49,3 +50,42 @@
});
return await responseHandler(response);
}

export async function dispatchSSE<Input, Output>(
targetUrl: string,
input: Input

Check warning on line 56 in libs/client/src/request.ts

View workflow job for this annotation

GitHub Actions / build

'input' is defined but never used
): Promise<Output> {
const {
credentials: credentialsValue,
requestMiddleware,
responseHandler,

Check warning on line 61 in libs/client/src/request.ts

View workflow job for this annotation

GitHub Actions / build

'responseHandler' is assigned a value but never used
} = getConfig();
const credentials =
typeof credentialsValue === 'function'
? credentialsValue()
: credentialsValue;

const { url, headers } = await requestMiddleware({
url: targetUrl,
});
const authHeader = credentials ? { Authorization: `Key ${credentials}` } : {};
const requestHeaders = {
...authHeader,
Accept: 'text/event-stream',
'Content-Type': 'application/json',
...(headers ?? {}),
} as HeadersInit;

const eventSource = new EventSource(url, { headers: requestHeaders });

return new Promise<Output>((resolve, reject) => {
eventSource.onmessage = (event) => {
const eventData = JSON.parse(event.data);
resolve(eventData);
};

eventSource.onerror = (error) => {
reject(error);
};
});
}
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"cross-fetch": "^3.1.5",
"dotenv": "^16.3.1",
"encoding": "^0.1.13",
"eventsource": "^2.0.2",
"execa": "^8.0.1",
"express": "^4.18.2",
"fast-glob": "^3.2.12",
Expand Down Expand Up @@ -83,6 +84,7 @@
"@testing-library/react": "14.0.0",
"@theunderscorer/nx-semantic-release": "^2.2.1",
"@types/cors": "^2.8.14",
"@types/eventsource": "^1.1.15",
"@types/express": "4.17.13",
"@types/jest": "29.4.4",
"@types/node": "18.14.2",
Expand Down
Loading