Skip to content

Commit

Permalink
Support websocket-like connection with Channels (#4)
Browse files Browse the repository at this point in the history
* Add support for strongly typed websockets

Signed-off-by: Marcos Candeia <[email protected]>

* Refactoring channel folder

Signed-off-by: Marcos Candeia <[email protected]>

---------

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Oct 1, 2024
1 parent 0d95bd7 commit afb654f
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 12 deletions.
102 changes: 94 additions & 8 deletions src/actors/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,75 @@
import type { Actor, ActorConstructor } from "./runtime.ts";
import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts";
import {
type ChannelUpgrader,
type DuplexChannel,
makeWebSocket,
} from "./util/channels/channel.ts";

export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id";
export const ACTOR_ID_QS_NAME = "x-deno-isolate-instance-id";
/**
* Promise.prototype.then onfufilled callback type.
*/
export type Fulfilled<R, T> = ((result: R) => T | PromiseLike<T>) | null;

/**
* Promise.then onrejected callback type.
*/
// deno-lint-ignore no-explicit-any
export type Rejected<E> = ((reason: any) => E | PromiseLike<E>) | null;

export class ActorAwaiter<
TResponse,
TChannel extends DuplexChannel<unknown, unknown>,
> implements
PromiseLike<
TResponse
>,
DuplexChannel<unknown, unknown> {
ch: Promise<TChannel> | null = null;
constructor(
protected fetcher: () => Promise<
TResponse
>,
protected ws: () => Promise<TChannel>,
) {
}
async close() {
const ch = await this.channel;
await ch.close();
}

async *recv(signal?: AbortSignal): AsyncIterableIterator<unknown> {
const ch = await this.channel;
yield* ch.recv(signal);
}

private get channel(): Promise<TChannel> {
return this.ch ??= this.ws();
}

async send(value: unknown): Promise<void> {
const ch = await this.channel;
await ch.send(value);
}

catch<TResult>(onrejected: Rejected<TResult>): Promise<TResponse | TResult> {
return this.fetcher().catch(onrejected);
}

then<TResult1, TResult2 = TResult1>(
onfufilled?: Fulfilled<
TResponse,
TResult1
>,
onrejected?: Rejected<TResult2>,
): Promise<TResult1 | TResult2> {
return this.fetcher().then(onfufilled).catch(
onrejected,
);
}
}

/**
* options to create a new actor proxy.
Expand All @@ -11,11 +79,15 @@ export interface ProxyOptions<TInstance extends Actor> {
server: string;
}

type PromisifyKey<key extends keyof Actor, Actor> = Actor[key] extends
(...args: infer Args) => Awaited<infer Return>
? Return extends ChannelUpgrader<infer TSend, infer TReceive>
? (...args: Args) => DuplexChannel<TSend, TReceive>
: (...args: Args) => Promise<Return>
: Actor[key];

type Promisify<Actor> = {
[key in keyof Actor]: Actor[key] extends (...args: infer Args) => infer Return
? Return extends Promise<unknown> ? Actor[key]
: (...args: Args) => Promise<Return>
: Actor[key];
[key in keyof Actor]: PromisifyKey<key, Actor>;
};

export interface ActorsServer {
Expand Down Expand Up @@ -66,12 +138,13 @@ export const actors = {
id: (id: string): Promisify<TInstance> => {
return new Proxy<Promisify<TInstance>>({} as Promisify<TInstance>, {
get: (_, prop) => {
return async (...args: unknown[]) => {
const endpoint = `${actorsServer.url}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`;
const fetcher = async (...args: unknown[]) => {
const abortCtrl = new AbortController();
const resp = await fetch(
`${actorsServer.url}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`,
endpoint,
{
method: "POST",
signal: abortCtrl.signal,
Expand Down Expand Up @@ -104,6 +177,19 @@ export const actors = {
}
return resp.json();
};
return (...args: unknown[]) => {
const awaiter = new ActorAwaiter(() => fetcher(...args), () => {
const ws = new WebSocket(
`${endpoint}?args=${
encodeURIComponent(
btoa(JSON.stringify({ args: args ?? [] })),
)
}&${ACTOR_ID_QS_NAME}=${id}`,
);
return makeWebSocket(ws);
});
return awaiter;
};
},
});
},
Expand Down
33 changes: 31 additions & 2 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { assertEquals } from "@std/assert";
import { assertEquals, assertFalse } from "@std/assert";
import { actors } from "./proxy.ts";
import { ActorRuntime } from "./runtime.ts";
import type { ActorState } from "./state.ts";
import type { ChannelUpgrader } from "./util/channels/channel.ts";
import { WatchTarget } from "./util/watch.ts";

class Counter {
Expand Down Expand Up @@ -36,6 +37,17 @@ class Counter {
watch(): AsyncIterableIterator<number> {
return this.watchTarget.subscribe();
}

chan(name: string): ChannelUpgrader<string, string> {
return (async ({ send, recv }) => {
await send(`Hello ${name}`);
for await (const str of recv()) {
if (str === "PING") {
await send("PONG");
}
}
});
}
}

const runServer = (rt: ActorRuntime): AsyncDisposable => {
Expand All @@ -47,13 +59,30 @@ const runServer = (rt: ActorRuntime): AsyncDisposable => {
};
};

Deno.test("counter increment and getCount", async () => {
Deno.test("counter tests", async () => {
const rt = new ActorRuntime([Counter]);
await using _server = runServer(rt);
const actorId = "1234";
const counterProxy = actors.proxy(Counter);

const actor = counterProxy.id(actorId);
const name = `Counter Actor`;
const ch = actor.chan(name);
const it = ch.recv();
const { value, done } = await it.next();

assertFalse(done);
assertEquals(value, `Hello ${name}`);

await ch.send("PING");

const { value: pong, done: pongDone } = await it.next();

assertFalse(pongDone);
assertEquals(pong, "PONG");

await ch.close();

const watcher = await actor.watch();
// Test increment
const number = await actor.increment();
Expand Down
17 changes: 15 additions & 2 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http";
import { ACTOR_ID_HEADER_NAME } from "./proxy.ts";
import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts";
import { ACTOR_ID_HEADER_NAME, ACTOR_ID_QS_NAME } from "./proxy.ts";
import { ActorState } from "./state.ts";
import { DenoKvActorStorage } from "./storage/denoKv.ts";
import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts";
Expand Down Expand Up @@ -117,7 +118,8 @@ export class ActorRuntime {
*/
async fetch(req: Request): Promise<Response> {
const url = new URL(req.url);
const actorId = req.headers.get(ACTOR_ID_HEADER_NAME);
const actorId = req.headers.get(ACTOR_ID_HEADER_NAME) ??
url.searchParams.get(ACTOR_ID_QS_NAME);
if (!actorId) {
return new Response(`missing ${ACTOR_ID_HEADER_NAME} header`, {
status: 400,
Expand Down Expand Up @@ -147,6 +149,12 @@ export class ActorRuntime {
if (req.headers.get("content-type")?.includes("application/json")) {
const { args: margs } = await req.json();
args = margs;
} else if (url.searchParams.get("args")) {
const qargs = url.searchParams.get("args");
const parsedArgs = qargs
? JSON.parse(atob(decodeURIComponent(qargs)))
: {};
args = parsedArgs.args;
}
const methodImpl = actor[method as keyof typeof actor];
if (!isInvocable(methodImpl)) {
Expand All @@ -161,6 +169,11 @@ export class ActorRuntime {
const res = await (methodImpl as Function).bind(actor)(
...Array.isArray(args) ? args : [args],
);
if (req.headers.get("upgrade") === "websocket" && isUpgrade(res)) {
const { socket, response } = Deno.upgradeWebSocket(req);
makeWebSocket(socket).then((ch) => res(ch)).finally(() => socket.close());
return response;
}
if (isEventStreamResponse(res)) {
req.signal.onabort = () => {
res?.return?.();
Expand Down
Loading

0 comments on commit afb654f

Please sign in to comment.