Reactive functional programming
From Wiki2
rxjs
core ideas
without Rxjs
function subscribe(observer) {
observer.next(42); observer.next(100); observer.next(200); observer.complete();
}
var observer = {
next: function (x) { console.log('next ' + x); }, error: function (err) { console.log('error ' + err); }, complete: function () { console.log('done'); },
};
subscribe(observer)
Observable
from http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables
- Observables are like functions with zero arguments, but generalize those to allow multiple values.
- observer.next(42) is like return 42
var foo = Rx.Observable.create(function (observer) { console.log('Hello'); observer.next(42); });
foo.subscribe(function (x) { console.log(x); });
- Subscribing to an Observable is analogous to calling a Function.
console.log('before'); foo.subscribe(function (x) { console.log(x); }); console.log('after');
- Observables are able to deliver values either synchronously or asynchronously
- each oberver.next is like a return statement
- Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc.
- Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
- The Observable does not even maintain a list of attached Observers.
- In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
- When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.
- Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.
- Here, all three types of callbacks are provided as 3 arguments, the first callback argument as the next handler
observable.subscribe( x => console.log('Observer got a next value: ' + x), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification') );
Subscription
- A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions.
- (be carefull not to think you can observable.unsubscribe())
var observable = Rx.Observable.interval(1000); var subscription = observable.subscribe(x => console.log(x)); // Later: // This cancels the ongoing Observable execution which // was started by calling subscribe with an Observer. subscription.unsubscribe();
Subject
- A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters
- they maintain a registry of many listeners.
- Since a Subject is an Observer, this also means you may provide a Subject as the argument to the subscribe of any Observable, like the example below shows:
var subject = new Rx.Subject();
subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) });
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
- Which executes as:
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
- With the approach above, we essentially just converted a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers
Multicasted Observables
- A multicasted Observable uses a Subject under the hood to make multiple Observers see the same Observable execution.
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`: multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) });
// This is, under the hood, `source.subscribe(subject)`: multicasted.connect();
- Because connect() does source.subscribe(subject) under the hood, connect() returns a Subscription, which you can unsubscribe from in order to cancel the shared Observable execution.
- refCount makes the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount();
- One of the variants of Subjects is the BehaviorSubject, which has a notion of "the current value". It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the "current value" from the BehaviorSubject.
- A ReplaySubject is similar to a BehaviorSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution.
- The AsyncSubject is a variant where only the last value of the Observable execution is sent to its observers, and only when the execution completes.
Operators
- An Operator is a function take creates a new Observable based on the current Observable. This is a pure operation
- the previous Observable stays unmodified. :An Operator is essentially a pure function which takes one Observable as input and generates another Observable as output. Subscribing to the output Observable will also subscribe to the input Observable.
Instance operators versus static operators
- Instance operators are functions that use the this keyword to infer what is the input Observable.
- they get their imput from what's in front of them
var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen(); observable.subscribe(x => console.log(x));
- instance operators are methods on Observable instances ala...
Rx.Observable.prototype.multiplyByTen = function multiplyByTen() { var input = this; return Rx.Observable.create(function subscribe(observer) { input.subscribe({ next: (v) => observer.next(10 * v), error: (err) => observer.error(err), complete: () => observer.complete() }); }); }
- Static operators are pure functions attached to the Observable class, and usually are used to create Observables from scratch.
- The most common type of static operators are the so-called Creation Operators. Instead of transforming an input Observable to an output Observable, they simply take a non-Observable argument, like a number, and create a new Observable.
- Some Combination Operators may be static, such as merge, combineLatest, concat
internally
function map(fn) { var source=this result = Rx.Observable.create(function subscribe(observer){ source.subscribe( function(x) {observer.next(fn(x));}, function(err){ oberver.error(err));}, function() {observer.complete();} ) }) } Rx.Observable.prototype.map = map
operators
from egghead https://egghead.io/courses/rxjs-beyond-the-basics-operators-in-depth
- also see http://rxmarbles.com/#scan
Rx.Observable.of(4,5,6,7,8) (45678|)
Rx.Observable.interval(500).take(4), a kind of filter, also: first, takeLast, last, skip, skipLast ---0---1---2---3|
[4,5,6,7].do(x=>console.log('dog is '+x)) //doesn't touch the Observable, passes it through
Rx.Observable.interval(500).take(4) ---0---1---2---3| foo (8,9,10,11|) more foo.concat(more) ---0---1---2---3(8,9,10,11|)
(a|) foo ----0----1----2 bar bar.startWith(foo)
other combination: 'or' style
---0---1---2---4 -a-b-c-d-e-f- merge -a-(0b)-c-(1d)-e-(2f)---4
'and' style
---0---1---2---4 -5-6-7---8--- foo.combineLatest(bar, (x,y)=>x+y) ---5,6,7-8-9-10--12 ---H---e---l---l --0-1---0-1-0-- foo.withLatestFrom(bar, (c,n)=>n==1?..toUpperCase():c.toLowerCase()) ---h---E---L--l
zip first+first, second+second
----0----2---4----5| -5-6-7-8 foo.zip(bar, (x+y)=>x+y) ----5----8---1----13
o mo better... (hekk|) ---0---1---2---3 foo.zip(bar, (x+y)=>x) ---h---e---k---k
transformation
accumulates horizantally scan ----h---e---d---f-- scan(acc,x,'d') d--(dh)-(dhe)-(dhef)
to add up clicks ---x----x--x clicks.map((x)=>1) ---1----1--1- .scan((accc,x)=>acc+x,0) 0--1----2--3
buffer ----h----e----l---l----o (foo) foo.bufferCount(2) ---------he-------ll---[o] or foo.bufferTime(900ms) -------h-------el-------[o]
-----------1---1---------1 (bar) foo.buffer(bar) -----------he--l---------lo
delay 0---1---2---3---4 foo.delay(tommorrow) -------------0---1---2---3---4
--0--1--2--3--4| var result = foo.delayWhen(x => Rx.Observable.interval(x*x*100).take(1) ); rate limiting (filtering)operators debounce //drops things if they happen too fast 0---1---2---3---4 debounceTime(1000)-------//waits for silence -----------------------4 throttleTime(1000) //emits then silence 0------2------3------4
distinct --a--b--a--c--b| foo.distinct() --a--b-----c--- distinctUntilChanged --a--b--a--a--b| --a--b--a-----b| error handling catch - replace error with observable --a--b--c--d--2| (foo) map(toUpperCase) --A--B--C--D--# (bar) catch(# => -Z|) --A--B--C--D--Z| var result = bar.catch(error => Rx.Observable.of('Z'));
repeat --a--b--c--d| (foo) map(toUpperCase) --A--B--C--D| (bar) repeat(3) --A--B--C--D--A--B--C--D--A--B--C--D| retry on error resubscribe --a--b--c--d--2| (foo) map(toUpperCase) --A--B--C--D--# (bar) retry(2)) --A--B--C--D------A--B--C--D------A--B--C--D--# retryWhen var result = bar.retryWhen(errorObs => errorObs.delay(3000)) --A--B--C--D-----------A--B--C--D-----------A--B--C--D-----------