- RxJS
- 1.0 A Brief Recap of Programming Paradigms
- 1.1: Core Reactive Concepts
- 1.2: Comparing the Autocomplete function using JS vs RxJS
- 3: The Core of Reactive Extensions
- 3.1: Obervables, Operators and Subscriptions
- 3.2 Creating Observables
- 3.3: Built in Observables
- 3.4: Using RxJS with Node, jQuery and Promises
- 3.5: Subjects
- 3.6: RxJS Resources and Documentation
- 3.7: Sharing Observable Sequences
- 4.0: Operators that everyone should know
- 4.1: Do / Finally / StartWith / Filter
- 4.2: Merge / Concat
- 4.3: Map / MergeMap / SwitchMap
- 4.4: Reduce / Scan
- 4.5: Buffer / ToArray
- 4.6: First / Last / Single / Skip / Take
- 4.7: Zip / WithLatestFrom / CombineLatest
- 4.8: Error Handling Catch and Retry
Procedural Program
- The ideas that programs are a sense of functions
- Goes from top to bottom
- Relies heavily on global state, but any line can change the global state
- C being a procedural language
- "Imperative" execution
- Easy to write, difficult to maintain
- Prone to difficult bugs
Object Oriented
- Based around a Primitive: object
- Objects have well defined interfaces
- Localised behaviour
- Objects control state
- Composition
- Code is still imperative - pro AND con! - Still telling the computer EXACTLY what to do.
- Can be more verbose
Declarative
- Describing what you want to happen, but not telling the computer how to do it
- Eg. SQL, Regex, HTML
- Data is self-describing
- As powerful as the interpreter allows
- As limiting as the interpreter allows - You want build a game in SQL etc.
Functional
- What we want to happen but not how
- Little state
- Few side effects
- Easy to reason about
- Composition
- Expressive
- Works great with OO
- Basis in higher math
- Cons to think differently
- Not always the best choice
- No loops, no control logic - Just telling it what we want to happen
Reactive
- Primitive: Observable
- Instead of describing data in terms of other data, we describe it in terms of streams of events - From this, we create a pipeline such that we certain data changes, a lot is processed and changed - Example: spreadsheets!
- Composition
- Expressive
- Data flows unidirectionally
- Tough to think differently
- Subscriptions help change the data
const cellC2$ = cellA2$
.combineLatest(cellB2$)
.map((cells) => cells[0] + cells[1]);
cellC2$.subscribe((value) => {
console.log(value);
});
Core Concept 1: Pull model vs Push model
- any data sitting there that you "ask" for at some point - example refresh button - manual button trigger - time interval
- observable (stream) which is a reactive data source - produces items over a period of time that will either error, complete, or never complete until a page closes - not telling the stream when to get data, it has inbuilt logic on how to get data - we may transform this data - the display of the data is actually part of the description
An an example for a pull
based code, we can think of a window.setInterval() that fires every 5000 seconds.
An example of a push would be to have a function fire and then the return continutes to filter, flatMap, map and subscribe.
Core Concept 2: Everything is a database
- mouse movements
- current user
- web requests
- input boxes
In the comparison where the $title.on('keyup', () => {})
runs with a promise returned. The query can run into race conditions.
Also note that every single result also fires.
The issues:
// Fix up and down arrow
// Stop always querying
// Getting race condition
Bad ways
- generally
if last query == currentTitle return
- using setTimeout to reduce number of queries
- Race condition still happening, but bad attempts may be increasing the timeout - Could also use a current id compared to next query id and then returning before the callback occurs
- A lot of state across the module being changed
The Rx way
// npm install rxjs-es for es6
import $ from 'jquery';
import Rx from 'rxjs/Rx';
const $title = $('#title');
const $results = $('#results');
const keyUps$ = Rx.Observable.fromEvent($title, "keyup");
const queries$ = keyUps$
.map(e => e.target.value)
.distinctUntilChanged()
.debounceTime(250)
.switchMap(getItems); // similar to merge, but if new query comes in, discard the old data
//.mergeMap(getItems); // alias for flatMap
queries$.subscribe(query => {
// get rid of the promise will stop race condition
$results.empty();
$results.append(items.map( r => $(`<li />`).text(r)));
})
<!-- queries$.subscribe(query => {
console.log(e); // prints out event
getItems(query)
.then(items => {
$results.empty();
$results.append(items.map( r => $(`<li />`).text(r)));
});
}) -->
An even better way.
import $ from 'jquery';
import Rx from 'rxjs/Rx';
const $title = $('#title');
const $results = $('#results');
Rx.Observable.fromEvent($title, 'keyup')
.map((e) => e.target.value)
.distinctUntilChanged()
.debounceTime(500)
.switchMap(getItems)
.subscribe((items) => {
$results.empty();
$results.append(items.map((r) => $(`<li />`).text(r)));
});
- All the Rx has no external state, whereas the other code does.
- Rx doesn't have to wait for us to tell it when to do it.
- Observable: Something that can be observed which produces values
- Operator: It's an operation that modifies the data being pushed in from the observable - They don't produce values in and of themselves, but move them through the pipeline.
- Subscriptions: Piece of code that will do something with the values returned by the operators
Note, you can model anything in a reactive context by thinking a little bit differently.
Web API Request Example
- Reactive can still complete, or it can error out and retry.
import Rx from 'rxjs/Rx';
# promise will always execute - not lazy
const promise = new Promise((resolve, reject) => {
console.log("In promise");
resolve("hey");
});
promise.then(item => console.log(item));
# this doesn't give any output!
# observables are lazy!
# won't run without a subscription
const simple$ = new Rx.Observable(observer => {
console.log("Generating observable");
setTimeout(() => {
observer.next("An items!");
setTimeout(() => {
observer.next("Another item!");
observer.complete();
}, 1000);
}, 1000);
});
# creating a subscription
# first arg is the next function
# second arg is error
# third arg is complete
simple$.subscribe(
item => console.log(`one.next ${item}`),
error => console.log(`one.error ${item}`),
() => console.log("one.complete")
);
# Generating observable
# one.next An item!
# one.next Another item!
# one.complete
setTimeout(() => {
simple$.subscribe({
next: item => console.log(`two.next ${item}`),
error: error => console.log(`two.error ${item}`),
complete: () => console.log("two.complete")
});
}, 3000)
- Re-subscribing to an observable allows you to run that generator again
function createInterval(time) {
return new Rx.Observable(observer => {
let index = 0;
let interval = setInterval(() => {
observer.next(index++);
}, time);
return () => {
// will run when we unsubscribe
clearnInterval(interval);
};
});
}
function createSubscriber(tag) {
return {
next(item) { console.log(`${tag}.next ${item}`); },
error(error) { console.log(`${tag}.error ${error.stack || error }`); },
complete() { console.log(`${tag}.complete`); }
};
}
function take(observable, amount) {
return new Rx.Observable(observer => {
});
}
// this is the core of subscriptions
function take(sourceObservable, amount) {
return new Rx.Observable(observer => {
let count = 0;
const subscription = sourceObservable.subscribe({
next(item) {
observer.next(item);
if (++count >= amount) {
observer.complete();
}
},
error(error) { observer.error(error); },
complete() { observer.complete(); }
});
return () => subscription.unsubscribe();
});
}
const everySecond_ = createInterval(1000);
const firstFiveSeconds = take(everySecond_, 5);
const subscription = everySecond_.subscribe(createSubscriber("one"));
setTimeout(() => {
subscription.unsubscribe();
}, 3500);
This subscription will console.log out forever and ever and ever... - unless, we dispose of a description
How do operators come into play?
We could run something like const subscription = everySecond_.take(3)subscribe(createSubscriber("one"));
The steps for it are that it listens for a source and emits a transformation!
import Rx from 'rxjs/Rx';
Rx.Observable.interval(500)
.take(5)
.subscribe(createSubscriber("interval"));
Rx.Observable.timer(1000, 500)
.take(3)
.subscribe(createSubscriber("timer");
// note, array doesn't work - use from
Rx.Observable.of("Hello world!", 42, "whoa")
.subscribe(createSubscriber("of"));
Rx.Observable.from(["Hello world!", 42, "whoa"])
.subscribe(createSubscriber("of"));
Rx.Observable.from(generate())
.subscribe(createSubscriber("of"));
Rx.Observable.from("hello world!")
.subscribe(createSubscriber("of"));
// it can also take in a generator function!
function* generate() {
yield 1;
yield 5;
yield "HEY";
}
Rx.Observable.throw(new Error("Hey"))
.subscribe(createSubscriber("error"));
// empty
Rx.Observable.empty()
.subscribe(createSubscriber("empty"));
// defer
let sideEffect = 0;
const defer = Rx.Observable.defer(() => {
sideEffect++;
return Rx.Obserable.of(sideEffect);
});
defer.subscribe(createSubscriber("defer.one"));
defer.subscribe(createSubscriber("defer.two"));
defer.subscribe(createSubscriber("defer.three"));
Rx.Observable.never()
.subscribe(createSubscriber("never"));
Rx.Observable.range(10, 30)
.subscribe(createSubscriber("range"));
Benefits of the iterable from
?
- For every iterable, we could map every element.
Rx.Observable.fromEvent($title, 'keyup')
.map((e) => e.target.value)
.distinctUntilChanged()
.debounceTime(500)
.switchMap(getItems)
.subscribe((items) => {
$results.empty();
$results.append(items.map((i) => $('<li />').text(i)));
});
NOTE: Without the subscribe, it will never be subscribed to the dom!
If we have the .take(10)
- it would complete after taking 10 and then furthermore unsubscribe and be great for performance!
fromEvent
calls from addEventListener
, so it can do powerful things like keyup
for those that don't initially support it.
import fs from 'fs';
fs.readdir('./src/server', (err, items) => {
if (err) console.log(err);
else {
console.log(items);
}
});
// alternative
const readdir = Rx.Observable.bindNodeCallBack(fs.readdir);
readdir('./src/server')
// mergeMap creates iterable converted from array
.mergeMap((files) => Rx.Observable.from(files))
.map((file) => `MANIPULATED ${file}`)
.subscribe(createSubscriber('readdir'));
// promises
function getItem() {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('Hello');
}, 1000);
});
}
Rx.Observable.fromPromise(getItem()).subscribe(createSubscriber('promise'));
Subjects are another Rx primitive. They are both an observable and a observer! Used to bridge non-reactive code with reactive code.
Behaviour, replay subjects etc.
Warning: you should only really consider them as a last resort when bridging non-reactive and reactive code.
const simple = new Rx.Subject();
simple.subscribe(createSubscriber('simple'));
simple.next('Hello');
simple.next('World');
simple.complete();
const interval = Rx.Observable.interval(1000).take(5);
const intervalSubject = new Rx.Subject();
intervalSubject.subscribe(interval);
intervalSubject.subscribe(createSubscriber('sub1'));
intervalSubject.subscribe(createSubscriber('sub2'));
intervalSubject.subscribe(createSubscriber('sub3'));
// subscribes after three seconds
setTimeout(() => {
intervalSubject.subscribe(createSubscriber('LOOK AT ME'));
}, 3000);
Before, we had to invoke a function that call next
and complete
.
In the above example, intervalSubject is acting as a proxy to another observable.
// needs init state parameter
const currentUser = new Rx.BehaviorSubject({ isLoggedIn: false });
const isLoggedIn = currentUser.map((u) => u.isLoggedIn);
currentUser.next({ isLoggedIn: false });
isLoggedIn.subscribe(createSubscriber('isLoggedIn'));
setTimeout(() => {
currentUser.next({ isLoggedIn: true, name: 'nelson' });
}, 3000);
setTimeout(() => {
isLogged.subscribe(createSubscription('delayed'));
}, 1500);
How do you remember multiple states?
const replay = new Rx.ReplaySubject(3);
replay.next(1);
replay.next(2);
replay.subscribe(createSubscriber("one"));
replay.next(3);
replay.next(4);
replay.next(5);
// this subscription only gets the previous three items
replay.subscribe(createSubscriber("two"));
replay.next(6);
// what you see
one.next 1
one.next 2
one.next 3
one.next 4
one.next 5
two.next 3
two.next 4
two.next 5
one.next 6
two.next 6
Async Subjects
const apiCall = new Rx.AsyncSubject();
apiCall.next(1);
apiCall.subscribe(createSubscriber("one"));
apiCall.next(2);
// only will emit the final item before it is complete
apiCall.complete();
// if you subscribe to it again, that final value will be emitted
setTimeout(() => {
apiCall.subscribe(createSubscriber("two"));
}, 2000);
// output
one.next 2
one.complete
two.next 2
two.complete
Subject Summary
- if you can get around it, don't use subjects unless you have to
- you should use an observable workflow where possible
Sources:
- Hot Observable: It will produce events regardless of if you're listening - eg.
fromEvent($title, 'keyup')
- Cold Obserable: Starts once you subscribe - Interval Observables are actually cold observables
// this example shows when both start from the beginning eg cold
import Rx from 'rxjs/Rx';
const interval = Rx.Observable.interval(1000).take(10);
setTimeout(() => {
interval.subscribe(createSubscriber('one'));
}, 1200);
setTimeout(() => {
interval.subscribe(createSubscriber('two'));
}, 3200);
// HOT
// connectable observable
import Rx from 'rxjs/Rx';
const interval = Rx.Observable.interval(1000)
.take(10)
.publish();
interval.connect();
setTimeout(() => {
interval.subscribe(createSubscriber('one'));
}, 1200);
setTimeout(() => {
interval.subscribe(createSubscriber('two'));
}, 3200);
// if you connect after a set interval, then it begins executing and sharing the underlying observable
Why would you want a hot variable?
// here subscribe console.log runs twice
const socket = { on: () => {} };
const chatMessage = new Rx.Observable((observable) => {
console.log('subscribed');
socket.on('chat:message', (message) => observer.next(message));
});
chatMessage.subscribe(createSubscriber('one'));
chatMessage.subscribe(createSubscriber('two'));
// without it
const socket = { on: () => {} };
const chatMessage = new Rx.Observable((observable) => {
console.log('subscribed');
socket.on('chat:message', (message) => observer.next(message));
}).publish();
chatMessage.connect();
chatMessage.subscribe(createSubscriber('one'));
chatMessage.subscribe(createSubscriber('two'));
// using publishLast()
const simple = new Rx.Observable((observer) => {
observer.next('one');
observer.next('two');
observer.complete();
});
// always returns the last value
const published = simple.publishLast();
// even if we subscribe before connect, both will get the last value
published.subscribe(createSubscriber('one'));
published.connect();
published.subscribe(creaSubscriber('two'));
// using publishReplay()
const simple = new Rx.Observable((observer) => {
observer.next('one');
observer.next('two');
observer.next('three');
return () => console.log('Disposed');
});
// always returns the last value
const published = simple.publishReplay(2);
// even if we subscribe before connect, both will get the last value
// to dispose without running complete, we need to disconnect by unsubscribing
const sub1 = published.subscribe(createSubscriber('one'));
const connection = published.connect();
const sub2 = published.subscribe(creaSubscriber('two'));
sub1.unsubscribe();
sub2.unsubscribe();
connection.unsubscribe();
Refcount is a way to automatically handle the connection and the unsubscription of a connection observable.
It will connect to the first subscription and then disconnected on the last unsubscribe.
// using refCount()
const simple = new Rx.Observable(observer => {
observer.next("one");
observer.next("two");
observer.next("three");
return () => console.log("Disposed");
});
// always returns the last value
const published = simple.publishReplay(2).refCount();
// even if we subscribe before connect, both will get the last value
// to dispose without running complete, we need to disconnect by unsubscribing
const sub1 = published.subscribe(createSubscriber("one"));
const sub2 = published.subscribe(creaSubscriber("two"));
sub1.unsubscribe();
sub2.unsubscribe();
The publish().refCount()
is done so often, that is has been turned in share()
.
Taxing processes that you don't want to repeat but you want multiple things to hook into the result, then turn it into a hot subscription.
Now we will just talk about the different primary operators that you will work with.
// do => get the next value and pass it back unchanged
// finally => only completes after the range has completed, runs right at the end of the final value
// filter => filters out given statement
// interval => call timeout
// startWith => set initial value
Rx.Observable.range(1, 10)
.do((a) => console.log(`From do ${a}`))
.map((a) => a * a)
.subscribe(createSubscriber('simple'));
Rx.Observable.range(1, 10)
.finally(() => console.log(`From finally`))
.map((a) => a * 2)
.subscribe(createSubscriber('finally'));
Rx.Observable.range(1, 10)
.filter((a) => a < 5)
.map((a) => a * 2)
.subscribe(createSubscriber('filter'));
Rx.Observable.interval(1000)
.startWith(-1)
.subscribe(createSubscriber('interval'));
// merge - merge many observables togethers
// concat - this concatenates observables to the end of another, can also take a list of Observables
Rx.Observable.interval(1000)
.merge(Rx.Observable.interval(500))
.take(5)
.subscribe(createSubscriber("merge1"));
Rx.Observable.merge(
Rx.Observable.interval(1000).map(i => `${i} seconds),
Rx.Observable.interval(500).map(i => `${i} half seconds))
.take(5)
.subscribe(createSubscriber('merge2'));
// different events for merged observables
Rx.Observable.merge(
socket.on$("login").map(user => processUser(user),
socket.on$("logout").map(() => null));
Rx.Observable.range(1, 5)
.concat(Rx.Observable.range(10,3))
.subscribe(createSubscriber("concat1"));
// map - a projection on every item that comes in
// mergeMap - select many, does projection and then has another thing that we will work on
// switchMap - similar to mergeMap but replaces with the latest value if another emission comes in
function arrayMap(arr, proj) {
let returnArray = [];
for (let i of arr) {
returnArray.push(proj(item));
}
return returnArray;
}
arrayMap([1, 2, 3], (a) => a * a);
// imagine array of dicts
const albums = [{}, {}];
function arrayMergeMap(arr, proj) {
let returnArray = [];
for (let i of arr) {
let projArray = proj(item);
for (let j of projArray) {
returnArray.push(proj(item));
}
}
return returnArray;
}
const tracks = arrayMergeMap(albums, (album) => album.tracks);
Rx.Observable.range(1, 3)
.mergeMap((i) =>
Rx.Observable.timer(i * 1000).map(() => `After ${i} seconds`)
)
.subscribe(createSubscriber('mergeMap'));
Rx.Observable.fromPromise(getTracks())
.mergeMap((tracks) => Rx.Observable.from(tracks))
.subscribe(createSubscriber('tracks'));
function getTracks() {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(['track 1', 'track 2', 'track 3']);
}, 1000);
});
}
// synchronous example
Rx.Observable.of('my query')
.do(() => console.log('Querying'))
.mergeMap((a) => query(a))
.do(() => console.log('After querying'))
.subscribe(createSubscriber('query'));
function query(value) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('This is the resolved value');
}, 1000);
});
}
// switch map
// reducer (acc, value) and works on value - doesn't emit until the completion
// scan - processes and emits as it comes in
Rx.Observable.range(1, 10)
.reduce((acc, value) => acc + value)
.subscribe(createSubscriber('reduce'));
Rx.Observable.range(1, 10)
.scan((acc, value) => acc + value)
.subscribe(createSubscriber('scan'));
There have been some big changes to how buffer
has been used.
Buffer takes in an observable.
toArray will convert results into an array. - still has a clean exit if the never() is implemented!
Rx.Observable.range(1, 100)
.bufferCount(25)
.subscribe(createSubscriber("items");
// will take 25 items and pushing them into an array
Rx.Observable.interval(500)
.bufferTime(2000)
.subscribe(createSubscriber("bufferTime");
// same behaviour!
// emitting event causes buffer to flush
Rx.Observable.interval(500)
.buffer(Rx.Observable.interval(2000))
.subscribe(createSubscriber("buffer");
//
// toArray
//
Rx.Observable.range(1, 10)
.toArray()
.subscribe(createSubscriber("range"));
const simple = new Rx.Observable((observer) => {
console.log('Generating sequence');
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.complete();
});
simple.first().subscribe(createSubscriber('first'));
simple.last().subscribe(createSubscriber('last'));
// displays 1 & 4
// if nothing is in there, there are EmptyError(s) thrown
// single.error thrown is more than one error thrown
simple.single().subscribe(createSubscriber('single'));
// take and skip won't throw errors
// take does the first however emissions
// skip will take the emissions after a number
simple.take(2).subscribe(createSubscriber('take'));
simple.skip(2).subscribe(createSubscriber('skip'));
// 3, 4
simple
.skip(2)
.take(2)
.subscribe(createSubscriber('skip'));
// skipWhile / takeWhile
Rx.Observable.interval(500)
.skipWhile((i) => i < 4)
.takeWhile((i) => i < 10)
.subscribe(createSubscriber('skipWhile/takeWhile'));
// what's until and take emissions until
Rx.Observable.interval(500)
.skipUntil(Rx.Observable.timer(1000))
.takeUntil(Rx.Observable.timer(4000))
.subscribe(createSubscriber('skipUntil'));
How can we combine observables in different ways?
function arrayZip(arr1, arr2, selectorFunc) {
const count = Math.min(arr1.length, arr2.length);
const results = [];
for (let i = 0; i < count; i++) {
const combined = selector(arr1[i], arr2[i]);
results.push(combined);
}
return results;
}
const arr1 = [32, 2, 52, 43, 54];
const arr2 = [1, 0, 10, 4, 1, 4, 6, 2];
const results = arrayZip(arr1, arr2, (left, right) => left * right);
console.log(results);
// in RxJS
Rx.Observable.range(1.1)
.zip(
Rx.Observable.interval(500),
(left, right) => `item: ${left}, at ${right * 500}`
)
.subscribe(createSubscriber('zip'));
// emits value when source emits
// can also pass (left, right) function like zip as second parameter
Rx.Observable.interval(1000)
.withLatestFrom(Rx.Observable.interval(500))
.subscribe(createSubscriber('withLatestFrom'));
// emit value if either do
Rx.Observable.interval(1000)
.combineLatest(Rx.Observable.interval(500))
.subscribe(createSubscriber('withLatestFrom'));
If an error happens, an observer stops emitting and can prevent values from emitting at all. Error handling is very important!
.catch(error => Rx.Observable.of(error))
can pass this down as an Observable.
.retry()
we can pass in with a numeral to ensure that we either keep retrying or retry a certain number of times.