Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow in context channel #8

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/actors/proxyutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ export class ActorAwaiter<
protected invoker: ActorInvoker<TResponse, TChannel>,
) {
}
[Symbol.dispose](): void {
this.close();
}
async close() {
const ch = await this.channel;
await ch.close();
Expand Down
25 changes: 25 additions & 0 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ class Hello {
sayHello(): string {
return "Hello, World!";
}

chan(name: string): ChannelUpgrader<string, string> {
return (async ({ send }) => {
for (let i = 0; i < 10; i++) {
await send(`Hello ${name} ${i}`);
}
});
}
}
class Counter {
private count: number;
Expand Down Expand Up @@ -48,6 +56,14 @@ class Counter {
return this.watchTarget.subscribe();
}

async *readHelloChan(name: string): AsyncIterableIterator<string> {
const hello = this.state.proxy(Hello).id(this.state.id);
using helloChan = hello.chan(name);
for await (const event of helloChan.recv()) {
yield event;
}
}

chan(name: string): ChannelUpgrader<string, string> {
return (async ({ send, recv }) => {
await send(`Hello ${name}`);
Expand Down Expand Up @@ -133,4 +149,13 @@ Deno.test("counter tests", async () => {
const prevReqCount = reqCount;
assertEquals(await actor.callSayHello(), "Hello, World!");
assertEquals(reqCount, prevReqCount + 1); // does not need a new request for invoking another actor method

const prevReqCountHello = reqCount;
const helloChan = await actor.readHelloChan(name);
for (let i = 0; i < 10; i++) {
const { value } = await helloChan.next();
assertEquals(`Hello ${name} ${i}`, value);
}
helloChan.return?.();
assertEquals(reqCount, prevReqCountHello + 1); // does not need a new request for invoking another actor method
});
21 changes: 19 additions & 2 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import type { ActorStorage } from "./storage.ts";
import { DenoKvActorStorage } from "./storage/denoKv.ts";
import { S3ActorStorage } from "./storage/s3.ts";
import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts";
import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts";
import {
isUpgrade,
makeDuplexChannel,
makeWebSocket,
} from "./util/channels/channel.ts";

/**
* Represents an actor.
Expand Down Expand Up @@ -150,7 +154,20 @@ export class ActorRuntime {
proxy: (actor) => {
const invoker = (id: string) => {
if (id === actorId) {
return this.invoker;
return {
invoke: async (
name: string,
method: string,
args: unknown[],
connect?: true,
) => {
const resp = await this.invoker.invoke(name, method, args);
if (connect && isUpgrade(resp)) {
return makeDuplexChannel(resp);
}
return resp;
},
};
}
return createHttpInvoker(id);
};
Expand Down
41 changes: 40 additions & 1 deletion src/actors/util/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export const makeChan = <T>(capacity = 0): Channel<T> => {
};
};

export interface DuplexChannel<TSend, TReceive> {
export interface DuplexChannel<TSend, TReceive> extends Disposable {
send: Channel<TSend>["send"];
recv: Channel<TReceive>["recv"];
close: () => void | Promise<void>;
Expand Down Expand Up @@ -195,6 +195,7 @@ export const makeWebSocket = <
recv: recvChan.recv.bind(recvChan),
send: sendChan.send.bind(recvChan),
close: () => socket.close(),
[Symbol.dispose]: () => socket.close(),
});
for await (const message of sendChan.recv()) {
try {
Expand All @@ -209,3 +210,41 @@ export const makeWebSocket = <
};
return ch.promise;
};

/**
* Creates a new duplex channel.
* @param upgrader the channel upgrader
* @returns a created duplex channel
*/
export const makeDuplexChannel = <TSend, TReceive>(
upgrader?: ChannelUpgrader<TSend, TReceive>,
): DuplexChannel<TSend, TReceive> => {
// Create internal send and receive channels
const sendChan = makeChan<TSend>();
const recvChan = makeChan<TReceive>();

const duplexChannel: DuplexChannel<TSend, TReceive> = {
send: sendChan.send.bind(sendChan),
recv: recvChan.recv.bind(recvChan),
close: () => {
sendChan.close();
recvChan.close();
},
[Symbol.dispose]: () => {
sendChan.close();
recvChan.close();
},
};

// If there's an upgrader, we upgrade the duplex channel
if (upgrader && isUpgrade(upgrader)) {
upgrader({
send: recvChan.send.bind(recvChan),
recv: sendChan.recv.bind(sendChan),
close: duplexChannel.close,
[Symbol.dispose]: duplexChannel[Symbol.dispose],
});
}

return duplexChannel;
};