Skip to content

Commit

Permalink
fix(Worker pool): prevent sending completion notification to workers …
Browse files Browse the repository at this point in the history
…in the pool context to keep the workers alive
  • Loading branch information
zak-cloudnc committed Mar 26, 2024
1 parent 69df17d commit 7a88a65
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 23 deletions.
11 changes: 11 additions & 0 deletions projects/observable-webworker/src/lib/from-worker-pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ describe('fromWorkerPool', () => {
sub.unsubscribe();
});

it('does not send input close notification to ensure the workers are kept alive', () => {
const subscriptionSpy = jasmine.createSpy('subscriptionSpy');
const sub = stubbedWorkerStream.subscribe(subscriptionSpy);

input$.next(1);

expect(stubbedWorkers[0].postMessage).not.toHaveBeenCalledWith(jasmine.objectContaining({ kind: 'C' }));

sub.unsubscribe();
});

it('shuts down workers when subscriber unsubscribes', () => {
const subscriptionSpy = jasmine.createSpy('subscriptionSpy');
const sub = stubbedWorkerStream.subscribe(subscriptionSpy);
Expand Down
8 changes: 6 additions & 2 deletions projects/observable-webworker/src/lib/from-worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, ObservableInput, of, Subject, zip } from 'rxjs';
import { concat, NEVER, Observable, ObservableInput, of, Subject, zip } from 'rxjs';
import { finalize, map, mergeAll, tap } from 'rxjs/operators';
import { fromWorker } from './from-worker';

Expand Down Expand Up @@ -69,7 +69,11 @@ export function fromWorkerPool<I, O>(
}),
map(
([worker, unitWork]): Observable<O> => {
return fromWorker<I, O>(() => worker.factory(), of(unitWork), selectTransferables, {
// input should not complete to ensure the worker doesn't send back completion notifications when work unit is
// processed, otherwise these would cause the fromWorker to unsubscribe from the result.
const input$ = concat(of(unitWork), NEVER);
// const input$ = of(unitWork);
return fromWorker<I, O>(() => worker.factory(), input$, selectTransferables, {
terminateOnComplete: false,
}).pipe(
finalize(() => {
Expand Down
18 changes: 0 additions & 18 deletions projects/observable-webworker/src/lib/run-worker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ describe('runWorker', () => {
}),
);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'C',
}),
);

sub.unsubscribe();
});

Expand Down Expand Up @@ -129,12 +123,6 @@ describe('runWorker', () => {
[expected.buffer] as any,
);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'C',
}),
);

sub.unsubscribe();
});

Expand Down Expand Up @@ -242,12 +230,6 @@ describe('runWorker', () => {
}),
);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'C',
}),
);

sub.unsubscribe();
}));
});
4 changes: 2 additions & 2 deletions projects/observable-webworker/src/lib/run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ export function getWorkerResult<I, O>(
incomingMessages$: Observable<WorkerMessageNotification<I>>,
): Observable<Notification<O>> {
const input$ = incomingMessages$.pipe(
map((e: WorkerMessageNotification<I>): Notification<I> => e.data),
map((n: Notification<I>) => new Notification(n.kind, n.value, n.error)),
map((e: WorkerMessageNotification<I>): Notification<I> => new Notification(e.data.kind, e.data.value, e.data.error)),
dematerialize(),
);

return workerIsUnitType(worker)
// note we intentionally materialize the inner observable so the main thread can reassemble the multiple stream values per input observable
? input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize())))
: worker.work(input$).pipe(materialize());
}
Expand Down
3 changes: 2 additions & 1 deletion projects/observable-webworker/tsconfig.lib.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"inlineSources": true,
"types": [],
"lib": ["dom", "es2018"],
"strict": true
"strict": true,
"removeComments": true
},
"angularCompilerOptions": {
"skipTemplateCodegen": true,
Expand Down

0 comments on commit 7a88a65

Please sign in to comment.