Skip to content

Commit

Permalink
Fix disconnection allow messages to ne cleanedup
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Dec 3, 2024
1 parent 44ba71f commit 159fadb
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/actors/proxyutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class ActorAwaiter<
DuplexChannel<any, any> {
ch: Promise<TChannel> | null = null;
ctrl: AbortController;
_disconnected: PromiseWithResolvers<void> = Promise.withResolvers();
constructor(
protected actorName: string,
protected actorMethod: string,
Expand Down Expand Up @@ -138,6 +139,9 @@ export class ActorAwaiter<
};
await Promise.all([recvLoop(), sendLoop()]);
if (ch.signal.aborted && !reliableCh.signal.aborted) {
const prev = this._disconnected;
this._disconnected = Promise.withResolvers();
prev.resolve();
console.error("channel closed, retrying...");
await new Promise((resolve) => setTimeout(resolve, 2e3)); // retrying in 2 second
return nextConnection();
Expand All @@ -151,6 +155,10 @@ export class ActorAwaiter<
return this.ch;
}

get disconnected() {
return this._disconnected.promise;
}

async send(value: unknown): Promise<void> {
const ch = await this.channel;
await ch.send(value);
Expand Down Expand Up @@ -312,12 +320,11 @@ export const createRPCInvoker = <
const id = crypto.randomUUID();
const response = Promise.withResolvers<TResponse>();
pendingRequests.set(id, { response });
channel.closed.finally(() => {
const resolver = pendingRequests.get(id);
if (resolver) {
resolver.response.reject(new Error("Channel closed"));
}
});
const cleanup = () => {
response.reject(new Error("Channel closed"));
};
channel.closed.finally(cleanup);
channel.disconnected?.finally(cleanup);

try {
await channel.send({
Expand Down
1 change: 1 addition & 0 deletions src/actors/util/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export interface DuplexChannel<TSend, TReceive> extends Disposable {
close: () => void | Promise<void>;
closed: Promise<void>;
signal: AbortSignal;
disconnected?: Promise<void>; // used when the channel allows reconnections
}

/**
Expand Down

0 comments on commit 159fadb

Please sign in to comment.