Reactive functional programming

From Wiki2
Revision as of 11:46, 25 May 2016 by Tim (talk | contribs)

rxjs

internally:

   function map(fn) {
     var source=this
     result = Rx.Observable.create(function subscribe(observer){
       source.subscribe(
         function(x) {observer.next(fn(x));},
         function(err){ oberver.error(err));},
         function() {observer.complete();}
       )
     })
   }
   Rx.Observable.prototype.map = map


operators

Rx.Observable.of(4,5,6,7,8) 
(45678|)
Rx.Observable.interval(500).take(4),  a kind of filter, also: first, takeLast, last, skip, skipLast
---0---1---2---3|
[4,5,6,7].do(x=>console.log('dog is '+x)) //doesn't touch the Observable, passes it through
 Rx.Observable.interval(500).take(4)
---0---1---2---3|   foo       
(8,9,10,11|)        more
foo.concat(more) 
---0---1---2---3(8,9,10,11|)
(a|)   foo
----0----1----2  bar
bar.startWith(foo)

other combination: 'or' style

---0---1---2---4
-a-b-c-d-e-f-
  merge
-a-(0b)-c-(1d)-e-(2f)---4

'and' style

---0---1---2---4
-5-6-7---8--- 
foo.combineLatest(bar, (x,y)=>x+y)
---5,6,7-8-9-10--12

---H---e---l---l
--0-1---0-1-0-- 
foo.withLatestFrom(bar, (c,n)=>n==1?..toUpperCase():c.toLowerCase())
---h---E---L--l

zip first+first, second+second

 ----0----2---4----5|
 -5-6-7-8
 foo.zip(bar, (x+y)=>x+y)
 ----5----8---1----13
 o mo better...
  (hekk|)
  ---0---1---2---3
  foo.zip(bar, (x+y)=>x)
  ---h---e---k---k

transformation

 accumulates horizantally
   scan
     ----h---e---d---f--
     scan(acc,x,'d')
     d--(dh)-(dhe)-(dhef)
     to add up clicks
     ---x----x--x
     clicks.map((x)=>1)
     ---1----1--1-
     .scan((accc,x)=>acc+x,0)
     0--1----2--3
   buffer
     ----h----e----l---l----o    (foo)
     foo.bufferCount(2)   
     ---------he-------ll---[o]  
     or
     foo.bufferTime(900ms)
     -------h-------el-------[o]
     -----------1---1---------1 (bar)
     foo.buffer(bar)
     -----------he--l---------lo
   delay
     0---1---2---3---4  
     foo.delay(tommorrow)
     -------------0---1---2---3---4
     --0--1--2--3--4|
     var result = foo.delayWhen(x =>
       Rx.Observable.interval(x*x*100).take(1)
     );
 rate limiting (filtering)operators
   debounce //drops things if they happen too fast
     0---1---2---3---4  
     debounceTime(1000)-------//waits for silence
     -----------------------4
     throttleTime(1000) //emits then silence
     0------2------3------4
   distinct
     --a--b--a--c--b|
     foo.distinct()
     --a--b-----c---  
   distinctUntilChanged
     --a--b--a--a--b|
     --a--b--a-----b|
 error handling
   catch - replace error with observable
     --a--b--c--d--2|     (foo)
     map(toUpperCase)
     --A--B--C--D--#      (bar)
     catch(# => -Z|)
     --A--B--C--D--Z|
     var result = bar.catch(error => Rx.Observable.of('Z')); 
     repeat
       --a--b--c--d|     (foo)
       map(toUpperCase)
       --A--B--C--D|      (bar)
        repeat(3)
       --A--B--C--D--A--B--C--D--A--B--C--D|   
   retry on error resubscribe
     --a--b--c--d--2|     (foo)
     map(toUpperCase)
     --A--B--C--D--#      (bar)
     retry(2))
     --A--B--C--D------A--B--C--D------A--B--C--D--#
   retryWhen
     var result = bar.retryWhen(errorObs => errorObs.delay(3000))
     --A--B--C--D-----------A--B--C--D-----------A--B--C--D-----------
 


netflix reactive extensions