Skip to content

Commit

Permalink
Merge pull request #3 from skunkteam/add-from-event-pattern
Browse files Browse the repository at this point in the history
add fromEventPattern
  • Loading branch information
pavadeli authored Jan 25, 2021
2 parents f4aa5ff + 777ddf7 commit 2e89e76
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 44 deletions.
96 changes: 73 additions & 23 deletions libs/sherlock-rxjs/src/lib/rxjs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ describe('rxjs/rxjs', () => {
next: v => (value = v),
complete: () => (complete = true),
});
expect(complete).toBe(false);
expect(complete).toBeFalse();
expect(value).toBe('a');

a$.set('aa');
expect(complete).toBe(false);
expect(complete).toBeFalse();
expect(value).toBe('aa');

a$.set('aaa');
expect(complete).toBe(true);
expect(complete).toBeTrue();
expect(value).toBe('aa');
});

Expand All @@ -68,7 +68,7 @@ describe('rxjs/rxjs', () => {
next: v => values.push(v),
complete: () => (complete = true),
});
expect(complete).toBe(true);
expect(complete).toBeTrue();
expect(values).toEqual(['a']);

a$.set('b');
Expand All @@ -82,15 +82,15 @@ describe('rxjs/rxjs', () => {
next: v => values.push(v),
complete: () => (complete = true),
});
expect(complete).toBe(false);
expect(complete).toBeFalse();
expect(Object.keys(values)).toHaveLength(0);

a$.set('b');
expect(complete).toBe(true);
expect(complete).toBeTrue();
expect(values).toEqual(['b']);

a$.set('c');
expect(complete).toBe(true);
expect(complete).toBeTrue();
expect(values).toEqual(['b']);
});

Expand Down Expand Up @@ -131,7 +131,7 @@ describe('rxjs/rxjs', () => {
toObservable(a$)
.subscribe({ complete: () => (complete = true) })
.unsubscribe();
expect(complete).toBe(false);
expect(complete).toBeFalse();
});
});

Expand All @@ -140,25 +140,27 @@ describe('rxjs/rxjs', () => {
const subj = new Subject<string>();
const d$ = fromObservable(subj);

expect(d$.resolved).toBe(false);
let value = '';
expect(d$.resolved).toBeFalse();
let value: string | undefined;
d$.react(v => (value = v), { skipFirst: true, once: true });

expect(d$.resolved).toBe(false);
expect(d$.resolved).toBeFalse();

subj.next('first value');

expect(d$.resolved).toBe(true);
expect(value).toBe('');
expect(d$.resolved).toBeTrue();
expect(value).toBeUndefined();

subj.next('this stops the reactor');

expect(d$.value).toBe('this stops the reactor');
expect(d$.resolved).toBeFalse();
expect(d$.value).toBeUndefined();
expect(value).toBe('this stops the reactor');

subj.next('this is ignored');

expect(d$.value).toBe('this stops the reactor');
expect(d$.resolved).toBeFalse();
expect(d$.value).toBeUndefined();
expect(value).toBe('this stops the reactor');
});

Expand All @@ -177,6 +179,56 @@ describe('rxjs/rxjs', () => {

subj.next('value');

expect(reactions).toBe(1);
expect(value).toBe('value');
expect(d$.value).toBe('value');

done();

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(1);
expect(d$.value).toBeUndefined();

subj.next('another value');

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(1);
expect(d$.value).toBeUndefined();

done = d$.react(v => (++reactions, (value = v)));

expect(subj.observers).toHaveLength(1);
expect(reactions).toBe(1);
expect(d$.value).toBeUndefined();

subj.next('yet another value');

expect(subj.observers).toHaveLength(1);
expect(reactions).toBe(2);
expect(d$.value).toBe('yet another value');

done();

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(2);
expect(d$.value).toBeUndefined();
});

it('should allow opting into old behavior of caching values while not subscribed', () => {
const subj = new Subject<string>();
const d$ = fromObservable(subj).take({ when: d => d.resolved });

expect(subj.observers).toHaveLength(0);

let value: string | undefined;
let reactions = 0;
let done = d$.react(v => (++reactions, (value = v)));

expect(subj.observers).toHaveLength(1);
expect(reactions).toBe(0);

subj.next('value');

expect(reactions).toBe(1);
expect(value).toBe('value');
expect(d$.get()).toBe('value');
Expand Down Expand Up @@ -295,13 +347,13 @@ describe('rxjs/rxjs', () => {

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(1);
expect(value).toBe(false);
expect(value).toBeFalse();

useIt$.set(true);

expect(subj.observers).toHaveLength(1);
expect(reactions).toBe(1);
expect(value).toBe(false);
expect(value).toBeFalse();

subj.next('value');

Expand All @@ -312,7 +364,7 @@ describe('rxjs/rxjs', () => {

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(3);
expect(value).toBe(false);
expect(value).toBeFalse();
});

it('should work with a fallback when given and not connected', () => {
Expand Down Expand Up @@ -340,7 +392,7 @@ describe('rxjs/rxjs', () => {

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(2);
expect(d$.get()).toBe('value');
expect(d$.get()).toBe('fallback');
});

it('should propagate errors', () => {
Expand All @@ -350,7 +402,7 @@ describe('rxjs/rxjs', () => {
d$.autoCache();

expect(subj.observers).toHaveLength(0);
expect(d$.resolved).toBe(false);
expect(d$.resolved).toBeFalse();
expect(subj.observers).toHaveLength(1);

subj.next('a value');
Expand All @@ -372,10 +424,8 @@ describe('rxjs/rxjs', () => {

setTimeout(() => subj.error(new Error('my error')), 0);

// Reusing the same will return the last known value.
expect(await d$.toPromise()).toBe('value');
try {
await fromObservable(subj).toPromise();
await d$.toPromise();
throw new Error('should have thrown an error');
} catch (e) {
expect(e.message).toBe('my error');
Expand Down
31 changes: 10 additions & 21 deletions libs/sherlock-rxjs/src/lib/rxjs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { atom, Derivable, ErrorWrapper, ReactorOptions } from '@skunkteam/sherlock';
import { Observable, Subscribable, Subscriber, Unsubscribable } from 'rxjs';
import { Derivable, ErrorWrapper, ReactorOptions } from '@skunkteam/sherlock';
import { fromEventPattern } from '@skunkteam/sherlock-utils';
import { Observable, Subscribable, Subscriber } from 'rxjs';

/**
* Creates an RxJS Observable from a Derivable. Optionally accepts a `ReactorOptions` that governs RxJS emissions
Expand All @@ -20,24 +21,12 @@ export function toObservable<V>(derivable: Derivable<V>, options?: Partial<React
}

export function fromObservable<V>(observable: Subscribable<V>): Derivable<V> {
const atom$ = atom.unresolved<V>();

let subscription: Unsubscribable | undefined;
atom$.connected$.react(() => {
if (atom$.connected && !subscription) {
subscription = observable.subscribe({
next: value => atom$.set(value),
error: err => atom$.setFinal(new ErrorWrapper(err)),
complete: () => atom$.setFinal(atom$.getState()),
});
}
// This is not chained with the previous as an `else` branch, because this can be true immediately after
// the subscription occurs. Observables can complete synchronously on subscription.
if (!atom$.connected && subscription) {
subscription.unsubscribe();
subscription = undefined;
}
return fromEventPattern(value$ => {
const subscription = observable.subscribe({
next: value => value$.set(value),
error: err => value$.setFinal(new ErrorWrapper(err)),
complete: () => value$.setFinal(value$.getState()),
});
return () => subscription.unsubscribe();
});

return atom$;
}
1 change: 1 addition & 0 deletions libs/sherlock-utils/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './lib/derivable-cache';
export * from './lib/from-event-pattern';
export * from './lib/from-promise';
export * from './lib/lift';
export * from './lib/pairwise';
Expand Down
125 changes: 125 additions & 0 deletions libs/sherlock-utils/src/lib/from-event-pattern.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { atom, Derivable, DerivableAtom, isDerivableAtom } from '@skunkteam/sherlock';
import { fromEventPattern } from './from-event-pattern';

describe('sherlock-utils/fromEventPattern', () => {
const until = atom(false);
beforeEach(() => until.set(false));
afterEach(() => until.set(true));

const reactor = jest.fn<void, [value: string, stop: () => void]>();
const onError = jest.fn<void, [error: unknown, stop: () => void]>();
const afterShutdown = jest.fn<void, []>();
const subscribeHandler = jest.fn<() => void, [value$: DerivableAtom<string>]>();
const unsubscribeHandler = jest.fn<void, []>();

beforeEach(() => {
reactor.mockReset();
subscribeHandler.mockReset();
subscribeHandler.mockReturnValue(unsubscribeHandler);
unsubscribeHandler.mockReset();
onError.mockReset();
afterShutdown.mockReset();
});

function getAtom(): DerivableAtom<any> {
return subscribeHandler.mock.calls[subscribeHandler.mock.calls.length - 1][0];
}

it('should start as `unresolved`', () => {
const d$ = fromEventPattern(subscribeHandler);
expect(d$.resolved).toBeFalse();
});

it('should handle synchronous finalization', () => {
subscribeHandler.mockImplementation(v$ => {
v$.setFinal('value');
return unsubscribeHandler;
});
const d$ = fromEventPattern(subscribeHandler);
d$.react(reactor, { onError, afterShutdown });
expect(unsubscribeHandler).toHaveBeenCalledAfter(subscribeHandler);
expect(reactor).toHaveBeenCalledAfter(subscribeHandler);
expect(reactor).toHaveBeenCalledWith('value', expect.toBeFunction());
expect(afterShutdown).toHaveBeenCalledAfter(unsubscribeHandler);
expect(onError).not.toHaveBeenCalled();
});

describe('when the first connection to the `Derivable` starts', () => {
let d$: Derivable<string>;
beforeEach(() => (d$ = fromEventPattern(subscribeHandler)));
beforeEach(() => d$.react(reactor, { until, onError }));

it('should subscribe using the `addHandler`', () => {
expect(subscribeHandler).toHaveBeenCalled();
});

it('should not call `addHandler` more than once', () => {
expect(subscribeHandler).toHaveBeenCalledTimes(1);
d$.react(reactor, { until });
expect(d$.value).toBeUndefined();
d$.react(reactor, { until });
expect(subscribeHandler).toHaveBeenCalledTimes(1);
});

it('should call the `addHandler` with the DerivableAtom', () => {
expect(isDerivableAtom(getAtom())).toBeTrue();
});

it('should not resolve before getting a value', () => {
expect(reactor).not.toHaveBeenCalled();
});

describe('after the first value is emitted', () => {
const value = 'first value';
beforeEach(() => getAtom().set(value));

it('should resolve the `Derivable`', () => {
expect(d$.resolved).toBe(true);
});

it('should output the value', () => {
expect(reactor).toHaveBeenCalledTimes(1);
expect(reactor).toHaveBeenLastCalledWith('first value', expect.toBeFunction());
});

it('should output any errors', () => {
const error = new Error('My Error');
getAtom().setError(error);
expect(onError).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalledWith(error, expect.toBeFunction());
expect(d$.error).toBe(error);
});

it('should be able to `get()` the latest value', () => {
expect(d$.get()).toBe(value);
});

describe('when the last connection to the `Derivable` stops', () => {
beforeEach(() => until.set(true));

it('should unsubscribe using the `removeHandler`', () => {
expect(unsubscribeHandler).toHaveBeenCalledTimes(1);
});
it('should return to being `unresolved`', () => {
expect(d$.resolved).toBe(false);
});
it('should be able to start a connection again', () => {
reactor.mockReset();
const stop = d$.react(reactor);

expect(subscribeHandler).toHaveBeenCalledTimes(2);

expect(d$.resolved).toBe(false);

const val = 'second value';
getAtom().set(val);
expect(d$.resolved).toBe(true);
expect(reactor).toHaveBeenCalledTimes(1);
expect(reactor).toHaveBeenLastCalledWith(val, expect.toBeFunction());

stop();
});
});
});
});
});
Loading

0 comments on commit 2e89e76

Please sign in to comment.