Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support websocket-like connection with Channels #4

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 94 additions & 8 deletions src/actors/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,75 @@
import type { Actor, ActorConstructor } from "./runtime.ts";
import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts";
import {
type ChannelUpgrader,
type DuplexChannel,
makeWebSocket,
} from "./util/channels/channel.ts";

export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id";
export const ACTOR_ID_QS_NAME = "x-deno-isolate-instance-id";
/**
* Promise.prototype.then onfufilled callback type.
*/
export type Fulfilled<R, T> = ((result: R) => T | PromiseLike<T>) | null;

/**
* Promise.then onrejected callback type.
*/
// deno-lint-ignore no-explicit-any
export type Rejected<E> = ((reason: any) => E | PromiseLike<E>) | null;

export class ActorAwaiter<
TResponse,
TChannel extends DuplexChannel<unknown, unknown>,
> implements
PromiseLike<
TResponse
>,
DuplexChannel<unknown, unknown> {
ch: Promise<TChannel> | null = null;
constructor(
protected fetcher: () => Promise<
TResponse
>,
protected ws: () => Promise<TChannel>,
) {
}
async close() {
const ch = await this.channel;
await ch.close();
}

async *recv(signal?: AbortSignal): AsyncIterableIterator<unknown> {
const ch = await this.channel;
yield* ch.recv(signal);
}

private get channel(): Promise<TChannel> {
return this.ch ??= this.ws();
}

async send(value: unknown): Promise<void> {
const ch = await this.channel;
await ch.send(value);
}

catch<TResult>(onrejected: Rejected<TResult>): Promise<TResponse | TResult> {
return this.fetcher().catch(onrejected);
}

then<TResult1, TResult2 = TResult1>(
onfufilled?: Fulfilled<
TResponse,
TResult1
>,
onrejected?: Rejected<TResult2>,
): Promise<TResult1 | TResult2> {
return this.fetcher().then(onfufilled).catch(
onrejected,
);
}
}

/**
* options to create a new actor proxy.
Expand All @@ -11,11 +79,15 @@ export interface ProxyOptions<TInstance extends Actor> {
server: string;
}

type PromisifyKey<key extends keyof Actor, Actor> = Actor[key] extends
(...args: infer Args) => Awaited<infer Return>
? Return extends ChannelUpgrader<infer TSend, infer TReceive>
? (...args: Args) => DuplexChannel<TSend, TReceive>
: (...args: Args) => Promise<Return>
: Actor[key];

type Promisify<Actor> = {
[key in keyof Actor]: Actor[key] extends (...args: infer Args) => infer Return
? Return extends Promise<unknown> ? Actor[key]
: (...args: Args) => Promise<Return>
: Actor[key];
[key in keyof Actor]: PromisifyKey<key, Actor>;
};

export interface ActorsServer {
Expand Down Expand Up @@ -66,12 +138,13 @@ export const actors = {
id: (id: string): Promisify<TInstance> => {
return new Proxy<Promisify<TInstance>>({} as Promisify<TInstance>, {
get: (_, prop) => {
return async (...args: unknown[]) => {
const endpoint = `${actorsServer.url}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`;
const fetcher = async (...args: unknown[]) => {
const abortCtrl = new AbortController();
const resp = await fetch(
`${actorsServer.url}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`,
endpoint,
{
method: "POST",
signal: abortCtrl.signal,
Expand Down Expand Up @@ -104,6 +177,19 @@ export const actors = {
}
return resp.json();
};
return (...args: unknown[]) => {
const awaiter = new ActorAwaiter(() => fetcher(...args), () => {
const ws = new WebSocket(
`${endpoint}?args=${
encodeURIComponent(
btoa(JSON.stringify({ args: args ?? [] })),
)
}&${ACTOR_ID_QS_NAME}=${id}`,
);
return makeWebSocket(ws);
});
return awaiter;
};
},
});
},
Expand Down
33 changes: 31 additions & 2 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { assertEquals } from "@std/assert";
import { assertEquals, assertFalse } from "@std/assert";
import { actors } from "./proxy.ts";
import { ActorRuntime } from "./runtime.ts";
import type { ActorState } from "./state.ts";
import type { ChannelUpgrader } from "./util/channels/channel.ts";
import { WatchTarget } from "./util/watch.ts";

class Counter {
Expand Down Expand Up @@ -36,6 +37,17 @@ class Counter {
watch(): AsyncIterableIterator<number> {
return this.watchTarget.subscribe();
}

chan(name: string): ChannelUpgrader<string, string> {
return (async ({ send, recv }) => {
await send(`Hello ${name}`);
for await (const str of recv()) {
if (str === "PING") {
await send("PONG");
}
}
});
}
}

const runServer = (rt: ActorRuntime): AsyncDisposable => {
Expand All @@ -47,13 +59,30 @@ const runServer = (rt: ActorRuntime): AsyncDisposable => {
};
};

Deno.test("counter increment and getCount", async () => {
Deno.test("counter tests", async () => {
const rt = new ActorRuntime([Counter]);
await using _server = runServer(rt);
const actorId = "1234";
const counterProxy = actors.proxy(Counter);

const actor = counterProxy.id(actorId);
const name = `Counter Actor`;
const ch = actor.chan(name);
const it = ch.recv();
const { value, done } = await it.next();

assertFalse(done);
assertEquals(value, `Hello ${name}`);

await ch.send("PING");

const { value: pong, done: pongDone } = await it.next();

assertFalse(pongDone);
assertEquals(pong, "PONG");

await ch.close();

const watcher = await actor.watch();
// Test increment
const number = await actor.increment();
Expand Down
17 changes: 15 additions & 2 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http";
import { ACTOR_ID_HEADER_NAME } from "./proxy.ts";
import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts";
import { ACTOR_ID_HEADER_NAME, ACTOR_ID_QS_NAME } from "./proxy.ts";
import { ActorState } from "./state.ts";
import { DenoKvActorStorage } from "./storage/denoKv.ts";
import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts";
Expand Down Expand Up @@ -117,7 +118,8 @@ export class ActorRuntime {
*/
async fetch(req: Request): Promise<Response> {
const url = new URL(req.url);
const actorId = req.headers.get(ACTOR_ID_HEADER_NAME);
const actorId = req.headers.get(ACTOR_ID_HEADER_NAME) ??
url.searchParams.get(ACTOR_ID_QS_NAME);
if (!actorId) {
return new Response(`missing ${ACTOR_ID_HEADER_NAME} header`, {
status: 400,
Expand Down Expand Up @@ -147,6 +149,12 @@ export class ActorRuntime {
if (req.headers.get("content-type")?.includes("application/json")) {
const { args: margs } = await req.json();
args = margs;
} else if (url.searchParams.get("args")) {
const qargs = url.searchParams.get("args");
const parsedArgs = qargs
? JSON.parse(atob(decodeURIComponent(qargs)))
: {};
args = parsedArgs.args;
}
const methodImpl = actor[method as keyof typeof actor];
if (!isInvocable(methodImpl)) {
Expand All @@ -161,6 +169,11 @@ export class ActorRuntime {
const res = await (methodImpl as Function).bind(actor)(
...Array.isArray(args) ? args : [args],
);
if (req.headers.get("upgrade") === "websocket" && isUpgrade(res)) {
const { socket, response } = Deno.upgradeWebSocket(req);
makeWebSocket(socket).then((ch) => res(ch)).finally(() => socket.close());
return response;
}
if (isEventStreamResponse(res)) {
req.signal.onabort = () => {
res?.return?.();
Expand Down
Loading
Loading