Reactive functional programming
From Wiki2
core ideas
Observable
from http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables
- Observables are like functions with zero arguments, but generalize those to allow multiple values.
- observer.next(42) is like return 42
var foo = Rx.Observable.create(function (observer) { console.log('Hello'); observer.next(42); });
foo.subscribe(function (x) { console.log(x); });
- Subscribing to an Observable is analogous to calling a Function.
console.log('before'); foo.subscribe(function (x) { console.log(x); }); console.log('after');
- Observables are able to deliver values either synchronously or asynchronously
- each oberver.next is like a return statement
- Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc.
- Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
- The Observable does not even maintain a list of attached Observers.
- In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
- When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.
- Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.
- Here, all three types of callbacks are provided as 3 arguments, the first callback argument as the next handler
observable.subscribe( x => console.log('Observer got a next value: ' + x), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification') );
Subscription
- A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions.
- (be carefull not to think you can observable.unsubscribe())
var observable = Rx.Observable.interval(1000); var subscription = observable.subscribe(x => console.log(x)); // Later: // This cancels the ongoing Observable execution which // was started by calling subscribe with an Observer. subscription.unsubscribe();
Subject
- A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters
- they maintain a registry of many listeners.
- Since a Subject is an Observer, this also means you may provide a Subject as the argument to the subscribe of any Observable, like the example below shows:
var subject = new Rx.Subject();
subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) });
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
- Which executes as:
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
- With the approach above, we essentially just converted a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers
Multicasted Observables
- A multicasted Observable uses a Subject under the hood to make multiple Observers see the same Observable execution.
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`: multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) });
// This is, under the hood, `source.subscribe(subject)`: multicasted.connect();
- Because connect() does source.subscribe(subject) under the hood, connect() returns a Subscription, which you can unsubscribe from in order to cancel the shared Observable execution.
- refCount makes the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount();
- One of the variants of Subjects is the BehaviorSubject, which has a notion of "the current value". It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the "current value" from the BehaviorSubject.
- A ReplaySubject is similar to a BehaviorSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution.
- The AsyncSubject is a variant where only the last value of the Observable execution is sent to its observers, and only when the execution completes.
Operators
- An Operator is a function take creates a new Observable based on the current Observable. This is a pure operation
- the previous Observable stays unmodified. :An Operator is essentially a pure function which takes one Observable as input and generates another Observable as output. Subscribing to the output Observable will also subscribe to the input Observable.