Skip to content

Commit

Permalink
fix(Worker pool): prevent sending completion notification to workers …
Browse files Browse the repository at this point in the history
…in the pool context to keep the workers alive
  • Loading branch information
zak-cloudnc committed Mar 26, 2024
1 parent 69df17d commit f57d27c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
11 changes: 11 additions & 0 deletions projects/observable-webworker/src/lib/from-worker-pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ describe('fromWorkerPool', () => {
sub.unsubscribe();
});

it('does not send input close notification to ensure the workers are kept alive', () => {
const subscriptionSpy = jasmine.createSpy('subscriptionSpy');
const sub = stubbedWorkerStream.subscribe(subscriptionSpy);

input$.next(1);

expect(stubbedWorkers[0].postMessage).not.toHaveBeenCalledWith(jasmine.objectContaining({ kind: 'C' }));

sub.unsubscribe();
});

it('shuts down workers when subscriber unsubscribes', () => {
const subscriptionSpy = jasmine.createSpy('subscriptionSpy');
const sub = stubbedWorkerStream.subscribe(subscriptionSpy);
Expand Down
4 changes: 2 additions & 2 deletions projects/observable-webworker/src/lib/from-worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, ObservableInput, of, Subject, zip } from 'rxjs';
import { concat, NEVER, Observable, ObservableInput, of, Subject, zip } from 'rxjs';
import { finalize, map, mergeAll, tap } from 'rxjs/operators';
import { fromWorker } from './from-worker';

Expand Down Expand Up @@ -69,7 +69,7 @@ export function fromWorkerPool<I, O>(
}),
map(
([worker, unitWork]): Observable<O> => {
return fromWorker<I, O>(() => worker.factory(), of(unitWork), selectTransferables, {
return fromWorker<I, O>(() => worker.factory(), concat(of(unitWork), NEVER), selectTransferables, {
terminateOnComplete: false,
}).pipe(
finalize(() => {
Expand Down
7 changes: 4 additions & 3 deletions projects/observable-webworker/src/lib/run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ export function getWorkerResult<I, O>(
dematerialize(),
);

return workerIsUnitType(worker)
? input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize())))
: worker.work(input$).pipe(materialize());
return (workerIsUnitType(worker)
? input$.pipe(concatMap(input => from(worker.workUnit(input))))
: worker.work(input$)
).pipe(materialize());
}

export function runWorker<I, O>(workerConstructor: ObservableWorkerConstructor<I, O>): Subscription {
Expand Down

0 comments on commit f57d27c

Please sign in to comment.