diff --git a/src/actors/errors.ts b/src/actors/errors.ts new file mode 100644 index 0000000..b50ca7e --- /dev/null +++ b/src/actors/errors.ts @@ -0,0 +1,9 @@ +export type ErrorCode = + | "NOT_FOUND" + | "METHOD_NOT_FOUND" + | "METHOD_NOT_INVOCABLE"; +export class ActorError extends Error { + constructor(msg: string, public code: ErrorCode, options?: ErrorOptions) { + super(msg, options); + } +} diff --git a/src/actors/proxy.ts b/src/actors/proxy.ts index b729e1b..8703fb3 100644 --- a/src/actors/proxy.ts +++ b/src/actors/proxy.ts @@ -1,127 +1,11 @@ +import { create, createHttpInvoker, type Promisify } from "./proxyutil.ts"; import type { Actor, ActorConstructor } from "./runtime.ts"; -import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts"; -import { - type ChannelUpgrader, - type DuplexChannel, - makeWebSocket, -} from "./util/channels/channel.ts"; - -export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id"; -export const ACTOR_ID_QS_NAME = "x-deno-isolate-instance-id"; -/** - * Promise.prototype.then onfufilled callback type. - */ -export type Fulfilled = ((result: R) => T | PromiseLike) | null; - -/** - * Promise.then onrejected callback type. - */ -// deno-lint-ignore no-explicit-any -export type Rejected = ((reason: any) => E | PromiseLike) | null; - -export class ActorAwaiter< - TResponse, - TChannel extends DuplexChannel, -> implements - PromiseLike< - TResponse - >, - DuplexChannel { - ch: Promise | null = null; - constructor( - protected fetcher: () => Promise< - TResponse - >, - protected ws: () => Promise, - ) { - } - async close() { - const ch = await this.channel; - await ch.close(); - } - - async *recv(signal?: AbortSignal): AsyncIterableIterator { - const ch = await this.channel; - yield* ch.recv(signal); - } - - private get channel(): Promise { - return this.ch ??= this.ws(); - } - - async send(value: unknown): Promise { - const ch = await this.channel; - await ch.send(value); - } - - catch(onrejected: Rejected): Promise { - return this.fetcher().catch(onrejected); - } - - then( - onfufilled?: Fulfilled< - TResponse, - TResult1 - >, - onrejected?: Rejected, - ): Promise { - return this.fetcher().then(onfufilled).catch( - onrejected, - ); - } -} - -/** - * options to create a new actor proxy. - */ -export interface ProxyOptions { - actor: ActorConstructor | string; - server: string; -} - -type PromisifyKey = Actor[key] extends - (...args: infer Args) => Awaited - ? Return extends ChannelUpgrader - ? (...args: Args) => DuplexChannel - : (...args: Args) => Promise - : Actor[key]; - -type Promisify = { - [key in keyof Actor]: PromisifyKey; -}; export interface ActorsServer { url: string; deploymentId?: string; } -const IS_BROWSER = typeof document !== "undefined"; - -let _server: ActorsServer | null = null; -const isLayeredUrl = (url: string): boolean => url.includes("layers"); -const initServer = (): ActorsServer => { - if (IS_BROWSER) { - return { - url: "", // relative - }; - } - - const deploymentId = Deno.env.get("DENO_DEPLOYMENT_ID"); - const fallbackUrl = typeof deploymentId === "string" - ? undefined - : `http://localhost:${Deno.env.get("PORT") ?? 8000}`; - - return { - url: Deno.env.get( - "DECO_ACTORS_SERVER_URL", - ) ?? - fallbackUrl ?? "", - deploymentId: deploymentId && isLayeredUrl(deploymentId) - ? deploymentId - : undefined, - }; -}; - /** * utilities to create and manage actors. */ @@ -130,69 +14,7 @@ export const actors = { actor: ActorConstructor | string, server?: ActorsServer | undefined, ): { id: (id: string) => Promisify } => { - if (!server) { - _server ??= initServer(); - } - const actorsServer = server ?? _server!; - return { - id: (id: string): Promisify => { - return new Proxy>({} as Promisify, { - get: (_, prop) => { - const endpoint = `${actorsServer.url}/actors/${ - typeof actor === "string" ? actor : actor.name - }/invoke/${String(prop)}`; - const fetcher = async (...args: unknown[]) => { - const abortCtrl = new AbortController(); - const resp = await fetch( - endpoint, - { - method: "POST", - signal: abortCtrl.signal, - headers: { - "Content-Type": "application/json", - [ACTOR_ID_HEADER_NAME]: id, - ...actorsServer.deploymentId - ? { ["x-deno-deployment-id"]: actorsServer.deploymentId } - : {}, - }, - body: JSON.stringify({ - args: args ?? [], - }), - }, - ); - if ( - resp.headers.get("content-type") === - EVENT_STREAM_RESPONSE_HEADER - ) { - const iterator = readFromStream(resp); - const retn = iterator.return; - iterator.return = function (val) { - abortCtrl.abort(); - return retn?.call(iterator, val) ?? val; - }; - return iterator; - } - if (resp.status === 204) { - return; - } - return resp.json(); - }; - return (...args: unknown[]) => { - const awaiter = new ActorAwaiter(() => fetcher(...args), () => { - const ws = new WebSocket( - `${endpoint}?args=${ - encodeURIComponent( - btoa(JSON.stringify({ args: args ?? [] })), - ) - }&${ACTOR_ID_QS_NAME}=${id}`, - ); - return makeWebSocket(ws); - }); - return awaiter; - }; - }, - }); - }, - }; + const factory = (id: string) => createHttpInvoker(id, server); + return create(actor, factory); }, }; diff --git a/src/actors/proxyutil.ts b/src/actors/proxyutil.ts new file mode 100644 index 0000000..4ff7194 --- /dev/null +++ b/src/actors/proxyutil.ts @@ -0,0 +1,251 @@ +import type { ActorsServer } from "./proxy.ts"; +import type { Actor, ActorConstructor } from "./runtime.ts"; +import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts"; +import { + type ChannelUpgrader, + type DuplexChannel, + makeWebSocket, +} from "./util/channels/channel.ts"; + +export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id"; +export const ACTOR_ID_QS_NAME = "x-deno-isolate-instance-id"; +/** + * Promise.prototype.then onfufilled callback type. + */ +export type Fulfilled = ((result: R) => T | PromiseLike) | null; + +/** + * Promise.then onrejected callback type. + */ +// deno-lint-ignore no-explicit-any +export type Rejected = ((reason: any) => E | PromiseLike) | null; + +export interface ActorInvoker< + // deno-lint-ignore no-explicit-any + TResponse = any, + TChannel extends DuplexChannel = DuplexChannel< + unknown, + unknown + >, +> { + invoke( + name: string, + method: string, + methodArgs: unknown[], + ): Promise; + + invoke( + name: string, + method: string, + methodArgs: unknown[], + connect: true, + ): Promise; +} +export class ActorAwaiter< + TResponse, + TChannel extends DuplexChannel, +> implements + PromiseLike< + TResponse + >, + DuplexChannel { + ch: Promise | null = null; + constructor( + protected actorName: string, + protected actorMethod: string, + protected methodArgs: unknown[], + protected invoker: ActorInvoker, + ) { + } + async close() { + const ch = await this.channel; + await ch.close(); + } + + async *recv(signal?: AbortSignal): AsyncIterableIterator { + const ch = await this.channel; + yield* ch.recv(signal); + } + + private get channel(): Promise { + return this.ch ??= this.invoker.invoke( + this.actorName, + this.actorMethod, + this.methodArgs, + true, + ); + } + + async send(value: unknown): Promise { + const ch = await this.channel; + await ch.send(value); + } + + catch(onrejected: Rejected): Promise { + return this.invoker.invoke( + this.actorName, + this.actorMethod, + this.methodArgs, + ) + .catch(onrejected); + } + + then( + onfufilled?: Fulfilled< + TResponse, + TResult1 + >, + onrejected?: Rejected, + ): Promise { + return this.invoker.invoke( + this.actorName, + this.actorMethod, + this.methodArgs, + ).then(onfufilled).catch( + onrejected, + ); + } +} + +/** + * options to create a new actor proxy. + */ +export interface ProxyOptions { + actor: ActorConstructor | string; + server: string; +} + +export type PromisifyKey = Actor[key] extends + (...args: infer Args) => Awaited + ? Return extends ChannelUpgrader + ? (...args: Args) => DuplexChannel + : (...args: Args) => Promise + : Actor[key]; + +export type Promisify = { + [key in keyof Actor]: PromisifyKey; +}; + +const urlFor = ( + serverUrl: string, + actor: ActorConstructor | string, + actorMethod: string, +): string => { + return `${serverUrl}/actors/${ + typeof actor === "string" ? actor : actor.name + }/invoke/${actorMethod}`; +}; + +const IS_BROWSER = typeof document !== "undefined"; + +let _server: ActorsServer | null = null; +const isLayeredUrl = (url: string): boolean => url.includes("layers"); +const initServer = (): ActorsServer => { + if (IS_BROWSER) { + return { + url: "", // relative + }; + } + + const deploymentId = Deno.env.get("DENO_DEPLOYMENT_ID"); + const fallbackUrl = typeof deploymentId === "string" + ? undefined + : `http://localhost:${Deno.env.get("PORT") ?? 8000}`; + + return { + url: Deno.env.get( + "DECO_ACTORS_SERVER_URL", + ) ?? + fallbackUrl ?? "", + deploymentId: deploymentId && isLayeredUrl(deploymentId) + ? deploymentId + : undefined, + }; +}; + +export const createHttpInvoker = < + TResponse, + TChannel extends DuplexChannel, +>( + actorId: string, + server?: ActorsServer, +): ActorInvoker => { + if (!server) { + _server ??= initServer(); + } + const actorsServer = server ?? _server!; + return { + invoke: async (name, method, methodArgs, connect?: true) => { + const endpoint = urlFor(actorsServer.url, name, method); + if (connect) { + const ws = new WebSocket( + `${endpoint}?args=${ + encodeURIComponent( + btoa(JSON.stringify({ args: methodArgs ?? [] })), + ) + }&${ACTOR_ID_QS_NAME}=${actorId}`, + ); + return makeWebSocket(ws) as Promise; + } + const abortCtrl = new AbortController(); + const resp = await fetch( + endpoint, + { + method: "POST", + signal: abortCtrl.signal, + headers: { + "Content-Type": "application/json", + [ACTOR_ID_HEADER_NAME]: actorId, + ...actorsServer.deploymentId + ? { ["x-deno-deployment-id"]: actorsServer.deploymentId } + : {}, + }, + body: JSON.stringify({ + args: methodArgs ?? [], + }), + }, + ); + if ( + resp.headers.get("content-type") === + EVENT_STREAM_RESPONSE_HEADER + ) { + const iterator = readFromStream(resp); + const retn = iterator.return; + iterator.return = function (val) { + abortCtrl.abort(); + return retn?.call(iterator, val) ?? val; + }; + return iterator; + } + if (resp.status === 204) { + return; + } + return resp.json(); + }, + }; +}; + +export const create = ( + actor: ActorConstructor | string, + invokerFactory: (id: string) => ActorInvoker, +): { id: (id: string) => Promisify } => { + const name = typeof actor === "string" ? actor : actor.name; + return { + id: (id: string): Promisify => { + return new Proxy>({} as Promisify, { + get: (_, method) => { + const invoker = invokerFactory(id); + return (...args: unknown[]) => { + const awaiter = new ActorAwaiter( + name, + String(method), + args, + invoker, + ); + return awaiter; + }; + }, + }); + }, + }; +}; diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 300bee7..1ad2a3c 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -5,6 +5,11 @@ import type { ActorState } from "./state.ts"; import type { ChannelUpgrader } from "./util/channels/channel.ts"; import { WatchTarget } from "./util/watch.ts"; +class Hello { + sayHello(): string { + return "Hello, World!"; + } +} class Counter { private count: number; private watchTarget = new WatchTarget(); @@ -16,6 +21,11 @@ class Counter { }); } + callSayHello(): Promise { + const hello = this.state.proxy(Hello).id(this.state.id); + return hello.sayHello(); + } + async increment(): Promise { this.count++; await this.state.storage.put("counter", this.count); @@ -50,8 +60,14 @@ class Counter { } } -const runServer = (rt: ActorRuntime): AsyncDisposable => { - const server = Deno.serve(rt.fetch.bind(rt)); +const runServer = ( + rt: ActorRuntime, + onReq?: (req: Request) => void, +): AsyncDisposable => { + const server = Deno.serve((req) => { + onReq?.(req); + return rt.fetch(req); + }); return { async [Symbol.asyncDispose]() { await server.shutdown(); @@ -60,8 +76,11 @@ const runServer = (rt: ActorRuntime): AsyncDisposable => { }; Deno.test("counter tests", async () => { - const rt = new ActorRuntime([Counter]); - await using _server = runServer(rt); + const rt = new ActorRuntime([Counter, Hello]); + let reqCount = 0; + await using _server = runServer(rt, () => { + reqCount++; + }); const actorId = "1234"; const counterProxy = actors.proxy(Counter); @@ -110,4 +129,8 @@ Deno.test("counter tests", async () => { assertEquals(done, false); } watcher.return?.(); + + const prevReqCount = reqCount; + assertEquals(await actor.callSayHello(), "Hello, World!"); + assertEquals(reqCount, prevReqCount + 1); // does not need a new request for invoking another actor method }); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index bf3f5e1..0bf4e95 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -1,5 +1,12 @@ import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http"; -import { ACTOR_ID_HEADER_NAME, ACTOR_ID_QS_NAME } from "./proxy.ts"; +import { ActorError } from "./errors.ts"; +import { + ACTOR_ID_HEADER_NAME, + ACTOR_ID_QS_NAME, + type ActorInvoker, + create, + createHttpInvoker, +} from "./proxyutil.ts"; import { ActorState } from "./state.ts"; import type { ActorStorage } from "./storage.ts"; import { DenoKvActorStorage } from "./storage/denoKv.ts"; @@ -53,7 +60,7 @@ export type ActorConstructor = new ( /** * Represents an actor invoker. */ -export interface ActorInvoker { +export interface ActorInstance { /** * The actor instance. */ @@ -72,8 +79,9 @@ export interface ActorInvoker { * Represents the runtime for managing and invoking actors. */ export class ActorRuntime { - private actors: Map = new Map(); + private actors: Map = new Map(); private initilized = false; + private invoker: ActorInvoker; /** * Creates an instance of ActorRuntime. * @param actorsConstructors - An array of actor constructors. @@ -81,6 +89,33 @@ export class ActorRuntime { constructor( protected actorsConstructors: Array, ) { + const invoke: ActorInvoker["invoke"] = async (actorName, method, args) => { + const actorInvoker = actorName ? this.actors.get(actorName) : undefined; + if (!actorInvoker) { + throw new ActorError(`actor ${actorName} not found`, "NOT_FOUND"); + } + const { actor, initialization } = actorInvoker; + if (!(method in actor)) { + throw new ActorError( + `actor ${actorName} not found`, + "METHOD_NOT_FOUND", + ); + } + const methodImpl = actor[method as keyof typeof actor]; + if (!isInvocable(methodImpl)) { + throw new ActorError( + `actor ${actorName} not found`, + "METHOD_NOT_INVOCABLE", + ); + } + await initialization; + return await (methodImpl as Function).bind(actor)( + ...Array.isArray(args) ? args : [args], + ); + }; + this.invoker = { + invoke, + }; } getActorStorage(actorId: string, actorName: string): ActorStorage { @@ -110,7 +145,17 @@ export class ActorRuntime { this.actorsConstructors.forEach((Actor) => { const storage = this.getActorStorage(actorId, Actor.name); const state = new ActorState({ + id: actorId, storage, + proxy: (actor) => { + const invoker = (id: string) => { + if (id === actorId) { + return this.invoker; + } + return createHttpInvoker(id); + }; + return create(actor, invoker); + }, }); const actor = new Actor( state, @@ -147,16 +192,12 @@ export class ActorRuntime { } const groups = result?.pathname.groups ?? {}; const actorName = groups[ACTOR_NAME_PATH_PARAM]; - const actorInvoker = actorName ? this.actors.get(actorName) : undefined; - if (!actorInvoker) { - return new Response(`actor ${actorName} not found`, { - status: 404, - }); - } - const { actor, initialization } = actorInvoker; const method = groups[METHOD_NAME_PATH_PARAM]; - if (!method || !(method in actor)) { - return new Response(`method not found for the actor`, { status: 404 }); + if (!method || !actorName) { + return new Response( + `method ${method} not found for the actor ${actorName}`, + { status: 404 }, + ); } let args = []; if (req.headers.get("content-type")?.includes("application/json")) { @@ -169,55 +210,58 @@ export class ActorRuntime { : {}; args = parsedArgs.args; } - const methodImpl = actor[method as keyof typeof actor]; - if (!isInvocable(methodImpl)) { - return new Response( - `cannot invoke actor method for type ${typeof methodImpl}`, - { - status: 400, - }, - ); - } - await initialization; - const res = await (methodImpl as Function).bind(actor)( - ...Array.isArray(args) ? args : [args], - ); - if (req.headers.get("upgrade") === "websocket" && isUpgrade(res)) { - const { socket, response } = Deno.upgradeWebSocket(req); - makeWebSocket(socket).then((ch) => res(ch)).finally(() => socket.close()); - return response; - } - if (isEventStreamResponse(res)) { - req.signal.onabort = () => { - res?.return?.(); - }; + try { + const res = await this.invoker.invoke(actorName, method, args); + if (req.headers.get("upgrade") === "websocket" && isUpgrade(res)) { + const { socket, response } = Deno.upgradeWebSocket(req); + makeWebSocket(socket).then((ch) => res(ch)).finally(() => + socket.close() + ); + return response; + } + if (isEventStreamResponse(res)) { + req.signal.onabort = () => { + res?.return?.(); + }; - return new Response( - new ReadableStream({ - async pull(controller) { - for await (const content of res) { - controller.enqueue({ - data: encodeURIComponent(JSON.stringify(content)), - id: Date.now(), - event: "message", - }); - } - controller.close(); - }, - cancel() { - res?.return?.(); + return new Response( + new ReadableStream({ + async pull(controller) { + for await (const content of res) { + controller.enqueue({ + data: encodeURIComponent(JSON.stringify(content)), + id: Date.now(), + event: "message", + }); + } + controller.close(); + }, + cancel() { + res?.return?.(); + }, + }).pipeThrough(new ServerSentEventStream()), + { + headers: { + "Content-Type": EVENT_STREAM_RESPONSE_HEADER, + }, }, - }).pipeThrough(new ServerSentEventStream()), - { - headers: { - "Content-Type": EVENT_STREAM_RESPONSE_HEADER, - }, - }, - ); - } - if (typeof res === "undefined") { - return new Response(null, { status: 204 }); + ); + } + if (typeof res === "undefined") { + return new Response(null, { status: 204 }); + } + return Response.json(res); + } catch (err) { + if (err instanceof ActorError) { + return new Response(err.message, { + status: { + METHOD_NOT_FOUND: 404, + METHOD_NOT_INVOCABLE: 405, + NOT_FOUND: 404, + }[err.code] ?? 500, + }); + } + throw err; } - return Response.json(res); } } diff --git a/src/actors/state.ts b/src/actors/state.ts index e1446d8..d4c0e81 100644 --- a/src/actors/state.ts +++ b/src/actors/state.ts @@ -1,16 +1,28 @@ +import type { create } from "./proxyutil.ts"; +import type { Actor, ActorConstructor } from "./runtime.ts"; import type { ActorStorage } from "./storage.ts"; export interface ActorStateOptions { + id: string; storage: ActorStorage; + proxy: ( + actor: ActorConstructor, + ) => ReturnType>; } /** * Represents the state of an actor. */ export class ActorState { + public id: string; public storage: ActorStorage; + public proxy: ( + actor: ActorConstructor, + ) => ReturnType>; public initialization: Promise = Promise.resolve(); - constructor(private options: ActorStateOptions) { + constructor(options: ActorStateOptions) { this.storage = options.storage; + this.proxy = options.proxy; + this.id = options.id; } blockConcurrencyWhile(callback: () => Promise): Promise {