Skip to content

Commit

Permalink
Merge pull request #119 from cloudnc/fix/worker-kept-open-when-it-sho…
Browse files Browse the repository at this point in the history
…uld-not

fix: worker kept open when it shouldn't
  • Loading branch information
maxime1992 authored Mar 27, 2024
2 parents 2c9fe41 + 8b68204 commit 01e4017
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 22 deletions.
11 changes: 11 additions & 0 deletions projects/observable-webworker/src/lib/from-worker-pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ describe('fromWorkerPool', () => {
sub.unsubscribe();
});

it('does not send input close notification to ensure the workers are kept alive', () => {
const subscriptionSpy = jasmine.createSpy('subscriptionSpy');
const sub = stubbedWorkerStream.subscribe(subscriptionSpy);

input$.next(1);

expect(stubbedWorkers[0].postMessage).not.toHaveBeenCalledWith(jasmine.objectContaining({ kind: 'C' }));

sub.unsubscribe();
});

it('shuts down workers when subscriber unsubscribes', () => {
const subscriptionSpy = jasmine.createSpy('subscriptionSpy');
const sub = stubbedWorkerStream.subscribe(subscriptionSpy);
Expand Down
8 changes: 6 additions & 2 deletions projects/observable-webworker/src/lib/from-worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, ObservableInput, of, Subject, zip } from 'rxjs';
import { concat, NEVER, Observable, ObservableInput, of, Subject, zip } from 'rxjs';
import { finalize, map, mergeAll, tap } from 'rxjs/operators';
import { fromWorker } from './from-worker';

Expand Down Expand Up @@ -69,7 +69,11 @@ export function fromWorkerPool<I, O>(
}),
map(
([worker, unitWork]): Observable<O> => {
return fromWorker<I, O>(() => worker.factory(), of(unitWork), selectTransferables, {
// input should not complete to ensure the worker doesn't send back completion notifications when work unit is
// processed, otherwise these would cause the fromWorker to unsubscribe from the result.
const input$ = concat(of(unitWork), NEVER);
// const input$ = of(unitWork);
return fromWorker<I, O>(() => worker.factory(), input$, selectTransferables, {
terminateOnComplete: false,
}).pipe(
finalize(() => {
Expand Down
53 changes: 40 additions & 13 deletions projects/observable-webworker/src/lib/run-worker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { fakeAsync, tick } from '@angular/core/testing';
import { BehaviorSubject, Notification, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import {
DoTransferableWork,
DoTransferableWorkUnit,
Expand Down Expand Up @@ -76,12 +77,6 @@ describe('runWorker', () => {
}),
);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'C',
}),
);

sub.unsubscribe();
});

Expand Down Expand Up @@ -128,13 +123,51 @@ describe('runWorker', () => {
[expected.buffer] as any,
);

sub.unsubscribe();
});

// https://github.com/cloudnc/observable-webworker/issues/116
it('should complete the notification stream when the worker completes', () => {
const postMessageSpy = spyOn(window, 'postMessage');
postMessageSpy.calls.reset();

class TestWorker implements DoWork<number, number> {
public work(input$: Observable<number>): Observable<number> {
// here nothing should keep the subscription alive when input$ completes
return input$.pipe(map(input => input * 2));
}
}

const sub = runWorker(TestWorker);

const notificationEvent: WorkerMessageNotification<number> = new MessageEvent('message', {
data: new Notification('N', 10),
});

self.dispatchEvent(notificationEvent);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'N',
value: 20,
}),
);

const completeEvent: WorkerMessageNotification<number> = new MessageEvent('message', {
data: new Notification('C'),
});

self.dispatchEvent(completeEvent);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'C',
}),
);

sub.unsubscribe();
// do note here that instead of manually closing the subscription
// we check it's already closed as expected
expect(sub.closed).toBeTrue();
});

it('should not complete the notification stream if the worker does not complete', () => {
Expand Down Expand Up @@ -197,12 +230,6 @@ describe('runWorker', () => {
}),
);

expect(postMessageSpy).toHaveBeenCalledWith(
jasmine.objectContaining({
kind: 'C',
}),
);

sub.unsubscribe();
}));
});
12 changes: 6 additions & 6 deletions projects/observable-webworker/src/lib/run-worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { from, fromEvent, Notification, Observable, Subscription } from 'rxjs';
import { concatMap, dematerialize, filter, map, materialize } from 'rxjs/operators';
import { concatMap, dematerialize, map, materialize } from 'rxjs/operators';
import { DoTransferableWork, DoWork, DoWorkUnit, WorkerMessageNotification } from './observable-worker.types';

export type ObservableWorkerConstructor<I = any, O = any> = new (...args: any[]) => DoWork<I, O> | DoWorkUnit<I, O>;
Expand All @@ -25,15 +25,15 @@ export function getWorkerResult<I, O>(
incomingMessages$: Observable<WorkerMessageNotification<I>>,
): Observable<Notification<O>> {
const input$ = incomingMessages$.pipe(
map((e: WorkerMessageNotification<I>): Notification<I> => e.data),
map((n: Notification<I>) => new Notification(n.kind, n.value, n.error)),
// ignore complete, the calling thread will manage termination of the stream
filter(n => n.kind !== 'C'),
map(
(e: WorkerMessageNotification<I>): Notification<I> => new Notification(e.data.kind, e.data.value, e.data.error),
),
dematerialize(),
);

return workerIsUnitType(worker)
? input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize())))
? // note we intentionally materialize the inner observable so the main thread can reassemble the multiple stream values per input observable
input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize())))
: worker.work(input$).pipe(materialize());
}

Expand Down
3 changes: 2 additions & 1 deletion projects/observable-webworker/tsconfig.lib.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"inlineSources": true,
"types": [],
"lib": ["dom", "es2018"],
"strict": true
"strict": true,
"removeComments": true
},
"angularCompilerOptions": {
"skipTemplateCodegen": true,
Expand Down

0 comments on commit 01e4017

Please sign in to comment.