Skip to content

Commit

Permalink
Fixes abort usage
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Oct 18, 2024
1 parent d8a2435 commit de3c495
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions src/actors/util/watch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export type { ChannelUpgrader } from "./channels/channel.ts";

/**
* Watches events and returns async iterators for the events.
*/
Expand All @@ -9,25 +10,38 @@ export class WatchTarget<T> {
subscribe(signal?: AbortSignal): AsyncIterableIterator<T> {
const subscriptionId = crypto.randomUUID();
const queue: Array<(value: IteratorResult<T>) => void> = [];
let isCancelled = false;

const pushQueue = (value: IteratorResult<T>) => {
queue.forEach((resolve) => resolve(value));
};

const nextPromise = () =>
new Promise<IteratorResult<T>>((resolve) => {
queue.push((v) => {
if (signal?.aborted) {
return resolve({ value: undefined, done: true });
}
resolve(v);
});
// If already cancelled, resolve immediately
if (isCancelled || signal?.aborted) {
return resolve({ value: undefined, done: true });
}

// Push a new handler to the queue to resolve when a value is emitted
queue.push((v) => resolve(v));

// Listen for the abort signal and resolve immediately if it happens
signal?.addEventListener(
"abort",
() => {
isCancelled = true;
resolve({ value: undefined, done: true });
},
{ once: true }, // Only fire once on the first abort
);
});

const iterator: AsyncIterableIterator<T> = {
next: () => nextPromise(),
return: () => {
// Clean up the subscription when the consumer is done
isCancelled = true;
delete this.subscribers[subscriptionId];
return Promise.resolve({ value: undefined, done: true });
},
Expand All @@ -36,11 +50,19 @@ export class WatchTarget<T> {
},
};

// If the signal is already aborted at the time of subscription, return immediately
if (signal?.aborted) {
iterator?.return?.(); // Clean up immediately
return iterator;
}

this.subscribers[subscriptionId] = (value: T) => {
pushQueue({ value, done: false });
};

// Handle the signal being aborted after subscription
signal?.addEventListener("abort", () => {
iterator?.return?.();
iterator?.return?.(); // Immediately cancel the iterator
});

return iterator;
Expand Down

0 comments on commit de3c495

Please sign in to comment.