Skip to content

Commit

Permalink
Make channel upgrader work
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Oct 16, 2024
1 parent 986d0de commit 8285421
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
3 changes: 3 additions & 0 deletions src/actors/proxyutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ export class ActorAwaiter<
protected invoker: ActorInvoker<TResponse, TChannel>,
) {
}
[Symbol.dispose](): void {
this.close();
}
async close() {
const ch = await this.channel;
await ch.close();
Expand Down
10 changes: 5 additions & 5 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ class Hello {
chan(name: string): ChannelUpgrader<string, string> {
return (async ({ send }) => {
for (let i = 0; i < 10; i++) {
console.log("sending event", i);
await send(`Hello ${name} ${i}`);
console.log("event sent");
}
});
}
Expand Down Expand Up @@ -57,11 +55,11 @@ class Counter {
watch(): AsyncIterableIterator<number> {
return this.watchTarget.subscribe();
}

async *readHelloChan(name: string): AsyncIterableIterator<string> {
const hello = this.state.proxy(Hello).id(this.state.id);
const helloChan = hello.chan(name);
using helloChan = hello.chan(name);
for await (const event of helloChan.recv()) {
console.log("event received", event);
yield event;
}
}
Expand Down Expand Up @@ -152,10 +150,12 @@ Deno.test("counter tests", async () => {
assertEquals(await actor.callSayHello(), "Hello, World!");
assertEquals(reqCount, prevReqCount + 1); // does not need a new request for invoking another actor method

const prevReqCountHello = reqCount;
const helloChan = await actor.readHelloChan(name);
console.log(helloChan);
for (let i = 0; i < 10; i++) {
const { value } = await helloChan.next();
assertEquals(`Hello ${name} ${i}`, value);
}
helloChan.return?.();
assertEquals(reqCount, prevReqCountHello + 1); // does not need a new request for invoking another actor method
});
18 changes: 14 additions & 4 deletions src/actors/util/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export const makeChan = <T>(capacity = 0): Channel<T> => {
};
};

export interface DuplexChannel<TSend, TReceive> {
export interface DuplexChannel<TSend, TReceive> extends Disposable {
send: Channel<TSend>["send"];
recv: Channel<TReceive>["recv"];
close: () => void | Promise<void>;
Expand Down Expand Up @@ -195,6 +195,7 @@ export const makeWebSocket = <
recv: recvChan.recv.bind(recvChan),
send: sendChan.send.bind(recvChan),
close: () => socket.close(),
[Symbol.dispose]: () => socket.close(),
});
for await (const message of sendChan.recv()) {
try {
Expand All @@ -210,9 +211,9 @@ export const makeWebSocket = <
return ch.promise;
};

export const makeDuplexChannel = async <TSend, TReceive>(
export const makeDuplexChannel = <TSend, TReceive>(
upgrader?: ChannelUpgrader<TSend, TReceive>,
): Promise<DuplexChannel<TSend, TReceive>> => {
): DuplexChannel<TSend, TReceive> => {
// Create internal send and receive channels
const sendChan = makeChan<TSend>();
const recvChan = makeChan<TReceive>();
Expand All @@ -224,11 +225,20 @@ export const makeDuplexChannel = async <TSend, TReceive>(
sendChan.close();
recvChan.close();
},
[Symbol.dispose]: () => {
sendChan.close();
recvChan.close();
},
};

// If there's an upgrader, we upgrade the duplex channel
if (upgrader && isUpgrade(upgrader)) {
await upgrader(duplexChannel);
upgrader({
send: recvChan.send.bind(recvChan),
recv: sendChan.recv.bind(sendChan),
close: duplexChannel.close,
[Symbol.dispose]: duplexChannel[Symbol.dispose],
});
}

return duplexChannel;
Expand Down

0 comments on commit 8285421

Please sign in to comment.