Skip to content

Commit

Permalink
Introduces a new final state (#178)
Browse files Browse the repository at this point in the history
* introduces new final state and reactor mechanism

still missing unit tests

* introduces new take method on Derivables

* small optimization

* adds tests and fixes some discovered bugs

* add stopOnError option to take operator

* addresses review comments

* added additional tests to confirm to-/fromObservable behaviour

* changes rxjs adapter to align with derivable philosophy
  • Loading branch information
pavadeli authored Jul 2, 2019
1 parent a00f51a commit 7ec537e
Show file tree
Hide file tree
Showing 58 changed files with 2,327 additions and 1,586 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"files.trimTrailingWhitespace": true,
"editor.formatOnSave": true,
"files.insertFinalNewline": true,
"tslint.autoFixOnSave": true,
"editor.codeActionsOnSave": {
"source.fixAll": true
},
"typescript.tsdk": "node_modules/typescript/lib"
}
127 changes: 116 additions & 11 deletions extensions/sherlock-rxjs/rxjs.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
import { _internal, atom, SettableDerivable } from '@politie/sherlock';
import { Subject } from 'rxjs';
import { _internal, atom, DerivableAtom } from '@politie/sherlock';
import { defer, Subject } from 'rxjs';
import { fromObservable, toObservable } from './rxjs';

describe('rxjs/rxjs', () => {
describe('toObservable', () => {
let a$: SettableDerivable<string>;
let a$: DerivableAtom<string>;

beforeEach(() => { a$ = atom('a'); });

it('should complete the Observable immediately when the derivable is already final', () => {
a$.setFinal('final value');
let value = '';
let complete = false;
toObservable(a$).subscribe(v => value = v, undefined, () => complete = true);
expect(value).toBe('final value');
expect(complete).toBeTrue();
});

it('should complete the Observable when the derivable becomes final', () => {
let value = '';
let complete = false;
toObservable(a$).subscribe(v => value = v, undefined, () => complete = true);
expect(value).toBe('a');
expect(complete).toBeFalse();

a$.setFinal('b');
expect(value).toBe('b');
expect(complete).toBeTrue();
});

it('should complete the Observable when until becomes true', () => {
let complete = false;
let value = '';
Expand Down Expand Up @@ -91,23 +112,30 @@ describe('rxjs/rxjs', () => {
});

describe('fromObservable', () => {
it('should be unresolved when not connected or when no value has been emitted yet', () => {
it('should be unresolved until connected and the first value has been emitted', () => {
const subj = new Subject<string>();
const d$ = fromObservable(subj);

expect(d$.resolved).toBe(false);

d$.react(() => 0, { skipFirst: true, once: true });
let value = '';
d$.react(v => value = v, { skipFirst: true, once: true });

expect(d$.resolved).toBe(false);

subj.next('first value');

expect(d$.resolved).toBe(true);
expect(value).toBe('');

subj.next('this stops the reactor');

expect(d$.resolved).toBe(false);
expect(d$.value).toBe('this stops the reactor');
expect(value).toBe('this stops the reactor');

subj.next('this is ignored');

expect(d$.value).toBe('this stops the reactor');
expect(value).toBe('this stops the reactor');
});

it('should subscribe to observable when used to power a reactor', () => {
Expand All @@ -118,7 +146,7 @@ describe('rxjs/rxjs', () => {

let value: string | undefined;
let reactions = 0;
const done = d$.react(v => (++reactions, value = v));
let done = d$.react(v => (++reactions, value = v));

expect(subj.observers).toHaveLength(1);
expect(reactions).toBe(0);
Expand All @@ -133,7 +161,82 @@ describe('rxjs/rxjs', () => {

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(1);
expect(d$.resolved).toBe(false);
expect(d$.get()).toBe('value');

subj.next('another value');

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(1);
expect(d$.get()).toBe('value');

done = d$.react(v => (++reactions, value = v));

expect(subj.observers).toHaveLength(1);
expect(reactions).toBe(2);
expect(d$.get()).toBe('value');

subj.next('yet another value');

expect(subj.observers).toHaveLength(1);
expect(reactions).toBe(3);
expect(d$.get()).toBe('yet another value');

done();

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(3);
expect(d$.get()).toBe('yet another value');
});

it('should disconnect and finalize when the observable completes', () => {
const subj = new Subject<string>();
let connections = 0;
const d$ = fromObservable(defer(() => (++connections, subj)));

expect(connections).toBe(0);

let value = '';
d$.react(v => value = v);
expect(connections).toBe(1);

subj.next('value');
expect(value).toBe('value');
expect(d$.connected).toBeTrue();
expect(d$.final).toBeFalse();

subj.complete();
expect(value).toBe('value');
expect(d$.value).toBe('value');
expect(d$.connected).toBeFalse();
expect(d$.final).toBeTrue();

// Should never connect again.
d$.react(() => 0);
expect(connections).toBe(1);
});

it('should disconnect and finalize when the observable errors', () => {
const subj = new Subject<string>();
const d$ = fromObservable(subj);

let error = '';
d$.react(() => 0, { onError: e => error = e });

expect(subj.observers.length).toBe(1);

subj.next('value');
expect(d$.connected).toBeTrue();
expect(d$.final).toBeFalse();

subj.error('oh no!');
expect(error).toBe('oh no!');
expect(d$.error).toBe('oh no!');
expect(d$.connected).toBeFalse();
expect(d$.final).toBeTrue();

// Should never connect again.
d$.react(() => 0, { onError: () => 0 });
expect(subj.observers.length).toBe(0);
});

it('should subscribe to the observable only once with multiple reactors', () => {
Expand Down Expand Up @@ -213,7 +316,7 @@ describe('rxjs/rxjs', () => {

expect(subj.observers).toHaveLength(0);
expect(reactions).toBe(2);
expect(d$.get()).toBe('fallback');
expect(d$.get()).toBe('value');
});

it('should propagate errors', () => {
Expand Down Expand Up @@ -245,8 +348,10 @@ describe('rxjs/rxjs', () => {

setTimeout(() => subj.error(new Error('my error')), 0);

// Reusing the same will return the last known value.
expect(await d$.toPromise()).toBe('value');
try {
await d$.toPromise();
await fromObservable(subj).toPromise();
throw new Error('should have thrown an error');
} catch (e) {
expect(e.message).toBe('my error');
Expand Down
6 changes: 3 additions & 3 deletions extensions/sherlock-rxjs/rxjs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { _internal, atom, Derivable, ReactorOptions } from '@politie/sherlock';
import { _internal, atom, Derivable, ErrorWrapper, ReactorOptions } from '@politie/sherlock';
import { Observable, Subscribable, Subscriber, Unsubscribable } from 'rxjs';

/**
Expand All @@ -25,12 +25,12 @@ export function fromObservable<V>(observable: Subscribable<V>): Derivable<V> {
if (connected) {
subscription = observable.subscribe(
value => atom$.set(value),
err => atom$.setError(err),
err => atom$.setFinal(new ErrorWrapper(err)),
() => atom$.setFinal(atom$.getState()),
);
} else {
subscription!.unsubscribe();
subscription = undefined;
atom$.unset();
}
}, { skipFirst: true });

Expand Down
1 change: 0 additions & 1 deletion extensions/sherlock-utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export * from './src/control-flow';
export * from './src/derivable-cache';
export * from './src/from-promise';
export * from './src/lift';
Expand Down
Loading

0 comments on commit 7ec537e

Please sign in to comment.