Difference between revisions of "Reactive functional programming"

From Wiki2
Line 1: Line 1:
==rxjs==
===core ideas===
===core ideas===
from http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables
from http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables
Line 29: Line 28:
;In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
;In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.


===internally===
;When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.


    function map(fn) {
;Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver. :Here, all three types of callbacks are provided as 3 arguments, the first callback argument as the next handler
      var source=this
      result = Rx.Observable.create(function subscribe(observer){
        source.subscribe(
          function(x) {observer.next(fn(x));},
          function(err){ oberver.error(err));},
          function() {observer.complete();}
        )
      })
    }
    Rx.Observable.prototype.map = map


  observable.subscribe(
    x => console.log('Observer got a next value: ' + x),
    err => console.error('Observer got an error: ' + err),
    () => console.log('Observer got a complete notification')
  );


;A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions. :(be carefull not to think you can observable.unsubscribe())


===operators===
  var observable = Rx.Observable.interval(1000);
Rx.Observable.of(4,5,6,7,8)  
  var subscription = observable.subscribe(x => console.log(x));
(45678|)
  // Later:
  // This cancels the ongoing Observable execution which
  // was started by calling subscribe with an Observer.
  subscription.unsubscribe();


Rx.Observable.interval(500).take(4),  a kind of filter, also: first, takeLast, last, skip, skipLast
;A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.
---0---1---2---3|


[4,5,6,7].do(x=>console.log('dog is '+x)) //doesn't touch the Observable, passes it through
:Since a Subject is an Observer, this also means you may provide a Subject as the argument to the subscribe of any Observable, like the example below shows:


   Rx.Observable.interval(500).take(4)
   var subject = new Rx.Subject();
---0---1---2---3|  foo     
(8,9,10,11|)        more
foo.concat(more)
---0---1---2---3(8,9,10,11|)


(a|)   foo
   subject.subscribe({
----0----1----2  bar
    next: (v) => console.log('observerA: ' + v)
bar.startWith(foo)
   });
 
   subject.subscribe({
other combination: 'or' style
     next: (v) => console.log('observerB: ' + v)
---0---1---2---4
  });
-a-b-c-d-e-f-
  merge
-a-(0b)-c-(1d)-e-(2f)---4
 
'and' style
---0---1---2---4
-5-6-7---8---
foo.combineLatest(bar, (x,y)=>x+y)
---5,6,7-8-9-10--12
---H---e---l---l
--0-1---0-1-0--
foo.withLatestFrom(bar, (c,n)=>n==1?..toUpperCase():c.toLowerCase())
---h---E---L--l
 
zip
first+first, second+second
  ----0----2---4----5|
  -5-6-7-8
  foo.zip(bar, (x+y)=>x+y)
  ----5----8---1----13
 
  o mo better...
  (hekk|)
  ---0---1---2---3
  foo.zip(bar, (x+y)=>x)
  ---h---e---k---k
 
transformation
  accumulates horizantally
    scan
      ----h---e---d---f--
      scan(acc,x,'d')
      d--(dh)-(dhe)-(dhef)
 
      to add up clicks
      ---x----x--x
      clicks.map((x)=>1)
      ---1----1--1-
      .scan((accc,x)=>acc+x,0)
      0--1----2--3
 
    buffer
      ----h----e----l---l----o    (foo)
      foo.bufferCount(2)    
      ---------he-------ll---[o] 
      or
      foo.bufferTime(900ms)
      -------h-------el-------[o]
 
      -----------1---1---------1 (bar)
      foo.buffer(bar)
      -----------he--l---------lo
 
    delay
      0---1---2---3---4 
      foo.delay(tommorrow)
      -------------0---1---2---3---4
 
      --0--1--2--3--4|
      var result = foo.delayWhen(x =>
        Rx.Observable.interval(x*x*100).take(1)
      );
   rate limiting (filtering)operators
    debounce //drops things if they happen too fast
      0---1---2---3---4 
      debounceTime(1000)-------//waits for silence
      -----------------------4
      throttleTime(1000) //emits then silence
      0------2------3------4
 
    distinct
      --a--b--a--c--b|
      foo.distinct()
      --a--b-----c--- 
     distinctUntilChanged
      --a--b--a--a--b|
      --a--b--a-----b|
  error handling
    catch - replace error with observable
      --a--b--c--d--2|    (foo)
      map(toUpperCase)
      --A--B--C--D--#      (bar)
      catch(# => -Z|)
      --A--B--C--D--Z|
      var result = bar.catch(error => Rx.Observable.of('Z'));  


      repeat
   var observable = Rx.Observable.from([1, 2, 3]);
        --a--b--c--d|    (foo)
        map(toUpperCase)
        --A--B--C--D|      (bar)
        repeat(3)
        --A--B--C--D--A--B--C--D--A--B--C--D|    
    retry on error resubscribe
      --a--b--c--d--2|    (foo)
      map(toUpperCase)
      --A--B--C--D--#      (bar)
      retry(2))
      --A--B--C--D------A--B--C--D------A--B--C--D--#
    retryWhen
      var result = bar.retryWhen(errorObs => errorObs.delay(3000))
      --A--B--C--D-----------A--B--C--D-----------A--B--C--D-----------
 


  observable.subscribe(subject); // You can subscribe providing a Subject


:Which executes as:
[https://www.youtube.com/watch?v=XRYN2xt11Ek netflix reactive extensions]


*[https://github.com/Reactive-Extensions/RxJS github library reactive extensions ]
  observerA: 1
*[http://jhusain.github.io/learnrx netflix staff training exercises]
  observerB: 1
*https://medium.com/javascript-scene/the-two-pillars-of-javascript-pt-2-functional-programming-a63aa53a41a4#.7v5td6gdw
  observerA: 2
  observerB: 2
  observerA: 3
  observerB: 3
:With the approach above, we essentially just converted a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers

Revision as of 13:32, 14 September 2016

core ideas

from http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables

Observables are like functions with zero arguments, but generalize those to allow multiple values.
observer.next(42) is like return 42
 var foo = Rx.Observable.create(function (observer) {
   console.log('Hello');
   observer.next(42);
 });
 foo.subscribe(function (x) {
   console.log(x);
 });
Subscribing to an Observable is analogous to calling a Function.
 console.log('before');
 foo.subscribe(function (x) {
   console.log(x);
 });
 console.log('after');
Observables are able to deliver values either synchronously or asynchronously
each oberver.next is like a return statement
Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc.
Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
The Observable does not even maintain a list of attached Observers.
In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.
Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.
Here, all three types of callbacks are provided as 3 arguments, the first callback argument as the next handler
 observable.subscribe(
   x => console.log('Observer got a next value: ' + x),
   err => console.error('Observer got an error: ' + err),
   () => console.log('Observer got a complete notification')
 );
A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions.
(be carefull not to think you can observable.unsubscribe())
 var observable = Rx.Observable.interval(1000);
 var subscription = observable.subscribe(x => console.log(x));
 // Later:
 // This cancels the ongoing Observable execution which
 // was started by calling subscribe with an Observer.
 subscription.unsubscribe();
A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters
they maintain a registry of many listeners.
Since a Subject is an Observer, this also means you may provide a Subject as the argument to the subscribe of any Observable, like the example below shows:
 var subject = new Rx.Subject();
 subject.subscribe({
   next: (v) => console.log('observerA: ' + v)
 });
 subject.subscribe({
   next: (v) => console.log('observerB: ' + v)
 });
 var observable = Rx.Observable.from([1, 2, 3]);
 observable.subscribe(subject); // You can subscribe providing a Subject
Which executes as:
 observerA: 1
 observerB: 1
 observerA: 2
 observerB: 2
 observerA: 3
 observerB: 3
With the approach above, we essentially just converted a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers