diff --git a/src/actors/proxyutil.ts b/src/actors/proxyutil.ts index 4ff7194..148c07c 100644 --- a/src/actors/proxyutil.ts +++ b/src/actors/proxyutil.ts @@ -57,6 +57,9 @@ export class ActorAwaiter< protected invoker: ActorInvoker, ) { } + [Symbol.dispose](): void { + this.close(); + } async close() { const ch = await this.channel; await ch.close(); diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 1ad2a3c..571df95 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -9,6 +9,14 @@ class Hello { sayHello(): string { return "Hello, World!"; } + + chan(name: string): ChannelUpgrader { + return (async ({ send }) => { + for (let i = 0; i < 10; i++) { + await send(`Hello ${name} ${i}`); + } + }); + } } class Counter { private count: number; @@ -48,6 +56,14 @@ class Counter { return this.watchTarget.subscribe(); } + async *readHelloChan(name: string): AsyncIterableIterator { + const hello = this.state.proxy(Hello).id(this.state.id); + using helloChan = hello.chan(name); + for await (const event of helloChan.recv()) { + yield event; + } + } + chan(name: string): ChannelUpgrader { return (async ({ send, recv }) => { await send(`Hello ${name}`); @@ -133,4 +149,13 @@ Deno.test("counter tests", async () => { const prevReqCount = reqCount; assertEquals(await actor.callSayHello(), "Hello, World!"); assertEquals(reqCount, prevReqCount + 1); // does not need a new request for invoking another actor method + + const prevReqCountHello = reqCount; + const helloChan = await actor.readHelloChan(name); + for (let i = 0; i < 10; i++) { + const { value } = await helloChan.next(); + assertEquals(`Hello ${name} ${i}`, value); + } + helloChan.return?.(); + assertEquals(reqCount, prevReqCountHello + 1); // does not need a new request for invoking another actor method }); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index 0bf4e95..87a9970 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -12,7 +12,11 @@ import type { ActorStorage } from "./storage.ts"; import { DenoKvActorStorage } from "./storage/denoKv.ts"; import { S3ActorStorage } from "./storage/s3.ts"; import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts"; -import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts"; +import { + isUpgrade, + makeDuplexChannel, + makeWebSocket, +} from "./util/channels/channel.ts"; /** * Represents an actor. @@ -150,7 +154,20 @@ export class ActorRuntime { proxy: (actor) => { const invoker = (id: string) => { if (id === actorId) { - return this.invoker; + return { + invoke: async ( + name: string, + method: string, + args: unknown[], + connect?: true, + ) => { + const resp = await this.invoker.invoke(name, method, args); + if (connect && isUpgrade(resp)) { + return makeDuplexChannel(resp); + } + return resp; + }, + }; } return createHttpInvoker(id); }; diff --git a/src/actors/util/channels/channel.ts b/src/actors/util/channels/channel.ts index 8263084..e6c023e 100644 --- a/src/actors/util/channels/channel.ts +++ b/src/actors/util/channels/channel.ts @@ -129,7 +129,7 @@ export const makeChan = (capacity = 0): Channel => { }; }; -export interface DuplexChannel { +export interface DuplexChannel extends Disposable { send: Channel["send"]; recv: Channel["recv"]; close: () => void | Promise; @@ -195,6 +195,7 @@ export const makeWebSocket = < recv: recvChan.recv.bind(recvChan), send: sendChan.send.bind(recvChan), close: () => socket.close(), + [Symbol.dispose]: () => socket.close(), }); for await (const message of sendChan.recv()) { try { @@ -209,3 +210,41 @@ export const makeWebSocket = < }; return ch.promise; }; + +/** + * Creates a new duplex channel. + * @param upgrader the channel upgrader + * @returns a created duplex channel + */ +export const makeDuplexChannel = ( + upgrader?: ChannelUpgrader, +): DuplexChannel => { + // Create internal send and receive channels + const sendChan = makeChan(); + const recvChan = makeChan(); + + const duplexChannel: DuplexChannel = { + send: sendChan.send.bind(sendChan), + recv: recvChan.recv.bind(recvChan), + close: () => { + sendChan.close(); + recvChan.close(); + }, + [Symbol.dispose]: () => { + sendChan.close(); + recvChan.close(); + }, + }; + + // If there's an upgrader, we upgrade the duplex channel + if (upgrader && isUpgrade(upgrader)) { + upgrader({ + send: recvChan.send.bind(recvChan), + recv: sendChan.recv.bind(sendChan), + close: duplexChannel.close, + [Symbol.dispose]: duplexChannel[Symbol.dispose], + }); + } + + return duplexChannel; +};