From 94a9259b83cb40195286d7a0468498485e157e9e Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Tue, 15 Oct 2024 21:32:52 -0300 Subject: [PATCH] Allow for incontext invocation Signed-off-by: Marcos Candeia --- src/actors/proxy.ts | 244 +---------------------------------- src/actors/proxyutil.ts | 251 +++++++++++++++++++++++++++++++++++++ src/actors/runtime.test.ts | 31 ++++- src/actors/runtime.ts | 15 ++- src/actors/state.ts | 14 ++- 5 files changed, 307 insertions(+), 248 deletions(-) create mode 100644 src/actors/proxyutil.ts diff --git a/src/actors/proxy.ts b/src/actors/proxy.ts index 8626caf..8703fb3 100644 --- a/src/actors/proxy.ts +++ b/src/actors/proxy.ts @@ -1,228 +1,11 @@ +import { create, createHttpInvoker, type Promisify } from "./proxyutil.ts"; import type { Actor, ActorConstructor } from "./runtime.ts"; -import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts"; -import { - type ChannelUpgrader, - type DuplexChannel, - makeWebSocket, -} from "./util/channels/channel.ts"; - -export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id"; -export const ACTOR_ID_QS_NAME = "x-deno-isolate-instance-id"; -/** - * Promise.prototype.then onfufilled callback type. - */ -export type Fulfilled = ((result: R) => T | PromiseLike) | null; - -/** - * Promise.then onrejected callback type. - */ -// deno-lint-ignore no-explicit-any -export type Rejected = ((reason: any) => E | PromiseLike) | null; - -export interface ActorInvoker< - // deno-lint-ignore no-explicit-any - TResponse = any, - TChannel extends DuplexChannel = DuplexChannel< - unknown, - unknown - >, -> { - invoke( - name: string, - method: string, - methodArgs: unknown[], - ): Promise; - - connect( - name: string, - method: string, - methodArgs: unknown[], - ): Promise; -} -export class ActorAwaiter< - TResponse, - TChannel extends DuplexChannel, -> implements - PromiseLike< - TResponse - >, - DuplexChannel { - ch: Promise | null = null; - constructor( - protected actorName: string, - protected actorMethod: string, - protected methodArgs: unknown[], - protected invoker: ActorInvoker, - ) { - } - async close() { - const ch = await this.channel; - await ch.close(); - } - - async *recv(signal?: AbortSignal): AsyncIterableIterator { - const ch = await this.channel; - yield* ch.recv(signal); - } - - private get channel(): Promise { - return this.ch ??= this.invoker.connect( - this.actorName, - this.actorMethod, - this.methodArgs, - ); - } - - async send(value: unknown): Promise { - const ch = await this.channel; - await ch.send(value); - } - - catch(onrejected: Rejected): Promise { - return this.invoker.invoke( - this.actorName, - this.actorMethod, - this.methodArgs, - ) - .catch(onrejected); - } - - then( - onfufilled?: Fulfilled< - TResponse, - TResult1 - >, - onrejected?: Rejected, - ): Promise { - return this.invoker.invoke( - this.actorName, - this.actorMethod, - this.methodArgs, - ).then(onfufilled).catch( - onrejected, - ); - } -} - -/** - * options to create a new actor proxy. - */ -export interface ProxyOptions { - actor: ActorConstructor | string; - server: string; -} - -type PromisifyKey = Actor[key] extends - (...args: infer Args) => Awaited - ? Return extends ChannelUpgrader - ? (...args: Args) => DuplexChannel - : (...args: Args) => Promise - : Actor[key]; - -type Promisify = { - [key in keyof Actor]: PromisifyKey; -}; export interface ActorsServer { url: string; deploymentId?: string; } -const IS_BROWSER = typeof document !== "undefined"; - -let _server: ActorsServer | null = null; -const isLayeredUrl = (url: string): boolean => url.includes("layers"); -const initServer = (): ActorsServer => { - if (IS_BROWSER) { - return { - url: "", // relative - }; - } - - const deploymentId = Deno.env.get("DENO_DEPLOYMENT_ID"); - const fallbackUrl = typeof deploymentId === "string" - ? undefined - : `http://localhost:${Deno.env.get("PORT") ?? 8000}`; - - return { - url: Deno.env.get( - "DECO_ACTORS_SERVER_URL", - ) ?? - fallbackUrl ?? "", - deploymentId: deploymentId && isLayeredUrl(deploymentId) - ? deploymentId - : undefined, - }; -}; - -const urlFor = ( - serverUrl: string, - actor: ActorConstructor | string, - actorMethod: string, -): string => { - return `${serverUrl}/actors/${ - typeof actor === "string" ? actor : actor.name - }/invoke/${actorMethod}`; -}; - -const createHttpInvoker = < - TResponse, - TChannel extends DuplexChannel, ->( - actorId: string, - server: ActorsServer, -): ActorInvoker => { - return { - connect: (name, method, methodArgs) => { - const endpoint = urlFor(server.url, name, method); - const ws = new WebSocket( - `${endpoint}?args=${ - encodeURIComponent( - btoa(JSON.stringify({ args: methodArgs ?? [] })), - ) - }&${ACTOR_ID_QS_NAME}=${actorId}`, - ); - return makeWebSocket(ws) as Promise; - }, - invoke: async (name, method, methodArgs) => { - const endpoint = urlFor(server.url, name, method); - const abortCtrl = new AbortController(); - const resp = await fetch( - endpoint, - { - method: "POST", - signal: abortCtrl.signal, - headers: { - "Content-Type": "application/json", - [ACTOR_ID_HEADER_NAME]: actorId, - ...server.deploymentId - ? { ["x-deno-deployment-id"]: server.deploymentId } - : {}, - }, - body: JSON.stringify({ - args: methodArgs ?? [], - }), - }, - ); - if ( - resp.headers.get("content-type") === - EVENT_STREAM_RESPONSE_HEADER - ) { - const iterator = readFromStream(resp); - const retn = iterator.return; - iterator.return = function (val) { - abortCtrl.abort(); - return retn?.call(iterator, val) ?? val; - }; - return iterator; - } - if (resp.status === 204) { - return; - } - return resp.json(); - }, - }; -}; /** * utilities to create and manage actors. */ @@ -231,28 +14,7 @@ export const actors = { actor: ActorConstructor | string, server?: ActorsServer | undefined, ): { id: (id: string) => Promisify } => { - if (!server) { - _server ??= initServer(); - } - const actorsServer = server ?? _server!; - const name = typeof actor === "string" ? actor : actor.name; - return { - id: (id: string): Promisify => { - return new Proxy>({} as Promisify, { - get: (_, method) => { - const invoker = createHttpInvoker(id, actorsServer); - return (...args: unknown[]) => { - const awaiter = new ActorAwaiter( - name, - String(method), - args, - invoker, - ); - return awaiter; - }; - }, - }); - }, - }; + const factory = (id: string) => createHttpInvoker(id, server); + return create(actor, factory); }, }; diff --git a/src/actors/proxyutil.ts b/src/actors/proxyutil.ts new file mode 100644 index 0000000..4ff7194 --- /dev/null +++ b/src/actors/proxyutil.ts @@ -0,0 +1,251 @@ +import type { ActorsServer } from "./proxy.ts"; +import type { Actor, ActorConstructor } from "./runtime.ts"; +import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts"; +import { + type ChannelUpgrader, + type DuplexChannel, + makeWebSocket, +} from "./util/channels/channel.ts"; + +export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id"; +export const ACTOR_ID_QS_NAME = "x-deno-isolate-instance-id"; +/** + * Promise.prototype.then onfufilled callback type. + */ +export type Fulfilled = ((result: R) => T | PromiseLike) | null; + +/** + * Promise.then onrejected callback type. + */ +// deno-lint-ignore no-explicit-any +export type Rejected = ((reason: any) => E | PromiseLike) | null; + +export interface ActorInvoker< + // deno-lint-ignore no-explicit-any + TResponse = any, + TChannel extends DuplexChannel = DuplexChannel< + unknown, + unknown + >, +> { + invoke( + name: string, + method: string, + methodArgs: unknown[], + ): Promise; + + invoke( + name: string, + method: string, + methodArgs: unknown[], + connect: true, + ): Promise; +} +export class ActorAwaiter< + TResponse, + TChannel extends DuplexChannel, +> implements + PromiseLike< + TResponse + >, + DuplexChannel { + ch: Promise | null = null; + constructor( + protected actorName: string, + protected actorMethod: string, + protected methodArgs: unknown[], + protected invoker: ActorInvoker, + ) { + } + async close() { + const ch = await this.channel; + await ch.close(); + } + + async *recv(signal?: AbortSignal): AsyncIterableIterator { + const ch = await this.channel; + yield* ch.recv(signal); + } + + private get channel(): Promise { + return this.ch ??= this.invoker.invoke( + this.actorName, + this.actorMethod, + this.methodArgs, + true, + ); + } + + async send(value: unknown): Promise { + const ch = await this.channel; + await ch.send(value); + } + + catch(onrejected: Rejected): Promise { + return this.invoker.invoke( + this.actorName, + this.actorMethod, + this.methodArgs, + ) + .catch(onrejected); + } + + then( + onfufilled?: Fulfilled< + TResponse, + TResult1 + >, + onrejected?: Rejected, + ): Promise { + return this.invoker.invoke( + this.actorName, + this.actorMethod, + this.methodArgs, + ).then(onfufilled).catch( + onrejected, + ); + } +} + +/** + * options to create a new actor proxy. + */ +export interface ProxyOptions { + actor: ActorConstructor | string; + server: string; +} + +export type PromisifyKey = Actor[key] extends + (...args: infer Args) => Awaited + ? Return extends ChannelUpgrader + ? (...args: Args) => DuplexChannel + : (...args: Args) => Promise + : Actor[key]; + +export type Promisify = { + [key in keyof Actor]: PromisifyKey; +}; + +const urlFor = ( + serverUrl: string, + actor: ActorConstructor | string, + actorMethod: string, +): string => { + return `${serverUrl}/actors/${ + typeof actor === "string" ? actor : actor.name + }/invoke/${actorMethod}`; +}; + +const IS_BROWSER = typeof document !== "undefined"; + +let _server: ActorsServer | null = null; +const isLayeredUrl = (url: string): boolean => url.includes("layers"); +const initServer = (): ActorsServer => { + if (IS_BROWSER) { + return { + url: "", // relative + }; + } + + const deploymentId = Deno.env.get("DENO_DEPLOYMENT_ID"); + const fallbackUrl = typeof deploymentId === "string" + ? undefined + : `http://localhost:${Deno.env.get("PORT") ?? 8000}`; + + return { + url: Deno.env.get( + "DECO_ACTORS_SERVER_URL", + ) ?? + fallbackUrl ?? "", + deploymentId: deploymentId && isLayeredUrl(deploymentId) + ? deploymentId + : undefined, + }; +}; + +export const createHttpInvoker = < + TResponse, + TChannel extends DuplexChannel, +>( + actorId: string, + server?: ActorsServer, +): ActorInvoker => { + if (!server) { + _server ??= initServer(); + } + const actorsServer = server ?? _server!; + return { + invoke: async (name, method, methodArgs, connect?: true) => { + const endpoint = urlFor(actorsServer.url, name, method); + if (connect) { + const ws = new WebSocket( + `${endpoint}?args=${ + encodeURIComponent( + btoa(JSON.stringify({ args: methodArgs ?? [] })), + ) + }&${ACTOR_ID_QS_NAME}=${actorId}`, + ); + return makeWebSocket(ws) as Promise; + } + const abortCtrl = new AbortController(); + const resp = await fetch( + endpoint, + { + method: "POST", + signal: abortCtrl.signal, + headers: { + "Content-Type": "application/json", + [ACTOR_ID_HEADER_NAME]: actorId, + ...actorsServer.deploymentId + ? { ["x-deno-deployment-id"]: actorsServer.deploymentId } + : {}, + }, + body: JSON.stringify({ + args: methodArgs ?? [], + }), + }, + ); + if ( + resp.headers.get("content-type") === + EVENT_STREAM_RESPONSE_HEADER + ) { + const iterator = readFromStream(resp); + const retn = iterator.return; + iterator.return = function (val) { + abortCtrl.abort(); + return retn?.call(iterator, val) ?? val; + }; + return iterator; + } + if (resp.status === 204) { + return; + } + return resp.json(); + }, + }; +}; + +export const create = ( + actor: ActorConstructor | string, + invokerFactory: (id: string) => ActorInvoker, +): { id: (id: string) => Promisify } => { + const name = typeof actor === "string" ? actor : actor.name; + return { + id: (id: string): Promisify => { + return new Proxy>({} as Promisify, { + get: (_, method) => { + const invoker = invokerFactory(id); + return (...args: unknown[]) => { + const awaiter = new ActorAwaiter( + name, + String(method), + args, + invoker, + ); + return awaiter; + }; + }, + }); + }, + }; +}; diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 300bee7..1ad2a3c 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -5,6 +5,11 @@ import type { ActorState } from "./state.ts"; import type { ChannelUpgrader } from "./util/channels/channel.ts"; import { WatchTarget } from "./util/watch.ts"; +class Hello { + sayHello(): string { + return "Hello, World!"; + } +} class Counter { private count: number; private watchTarget = new WatchTarget(); @@ -16,6 +21,11 @@ class Counter { }); } + callSayHello(): Promise { + const hello = this.state.proxy(Hello).id(this.state.id); + return hello.sayHello(); + } + async increment(): Promise { this.count++; await this.state.storage.put("counter", this.count); @@ -50,8 +60,14 @@ class Counter { } } -const runServer = (rt: ActorRuntime): AsyncDisposable => { - const server = Deno.serve(rt.fetch.bind(rt)); +const runServer = ( + rt: ActorRuntime, + onReq?: (req: Request) => void, +): AsyncDisposable => { + const server = Deno.serve((req) => { + onReq?.(req); + return rt.fetch(req); + }); return { async [Symbol.asyncDispose]() { await server.shutdown(); @@ -60,8 +76,11 @@ const runServer = (rt: ActorRuntime): AsyncDisposable => { }; Deno.test("counter tests", async () => { - const rt = new ActorRuntime([Counter]); - await using _server = runServer(rt); + const rt = new ActorRuntime([Counter, Hello]); + let reqCount = 0; + await using _server = runServer(rt, () => { + reqCount++; + }); const actorId = "1234"; const counterProxy = actors.proxy(Counter); @@ -110,4 +129,8 @@ Deno.test("counter tests", async () => { assertEquals(done, false); } watcher.return?.(); + + const prevReqCount = reqCount; + assertEquals(await actor.callSayHello(), "Hello, World!"); + assertEquals(reqCount, prevReqCount + 1); // does not need a new request for invoking another actor method }); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index 4d21f11..0bf4e95 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -4,7 +4,9 @@ import { ACTOR_ID_HEADER_NAME, ACTOR_ID_QS_NAME, type ActorInvoker, -} from "./proxy.ts"; + create, + createHttpInvoker, +} from "./proxyutil.ts"; import { ActorState } from "./state.ts"; import type { ActorStorage } from "./storage.ts"; import { DenoKvActorStorage } from "./storage/denoKv.ts"; @@ -113,7 +115,6 @@ export class ActorRuntime { }; this.invoker = { invoke, - connect: invoke, }; } @@ -144,7 +145,17 @@ export class ActorRuntime { this.actorsConstructors.forEach((Actor) => { const storage = this.getActorStorage(actorId, Actor.name); const state = new ActorState({ + id: actorId, storage, + proxy: (actor) => { + const invoker = (id: string) => { + if (id === actorId) { + return this.invoker; + } + return createHttpInvoker(id); + }; + return create(actor, invoker); + }, }); const actor = new Actor( state, diff --git a/src/actors/state.ts b/src/actors/state.ts index e1446d8..d4c0e81 100644 --- a/src/actors/state.ts +++ b/src/actors/state.ts @@ -1,16 +1,28 @@ +import type { create } from "./proxyutil.ts"; +import type { Actor, ActorConstructor } from "./runtime.ts"; import type { ActorStorage } from "./storage.ts"; export interface ActorStateOptions { + id: string; storage: ActorStorage; + proxy: ( + actor: ActorConstructor, + ) => ReturnType>; } /** * Represents the state of an actor. */ export class ActorState { + public id: string; public storage: ActorStorage; + public proxy: ( + actor: ActorConstructor, + ) => ReturnType>; public initialization: Promise = Promise.resolve(); - constructor(private options: ActorStateOptions) { + constructor(options: ActorStateOptions) { this.storage = options.storage; + this.proxy = options.proxy; + this.id = options.id; } blockConcurrencyWhile(callback: () => Promise): Promise {