Skip to content

Latest commit

 

History

History
229 lines (167 loc) · 9.34 KB

File metadata and controls

229 lines (167 loc) · 9.34 KB

RxJS

Subscribe is not really addEventListener, it is more like an invoke.

Event-driven and reactive? Are they the same?

Interactive programs work at their own pace and mostly deal with communication, while reactive programs only work in respond to external demands and mostly deal with accurate interrupt handling. Real-time programs are usually reactive.

Real-time is a push-based world.

Treating events as collections and manipulating sets of events with "operators"

Few applications are completely synchronous, and writing async code is necessary to keep applications responsive.

Imagine you have a normal array, you can filter, map, reduce all you can. But what happen if the array is asynchronous and streaming?

Application is all about "data flow".

We have many PUSH APIs:

  • DOM events
  • WebSockets
  • Node Streams
  • Service Workers
  • Server-Sent Events
  • XHR (1 value, good for Promises)
  • setInterval
  • Animation (Cancellable)

Each has their own way of dealing with asynchronous data like:

  • Callback functions - callback hell, poor concurrency
  • Promises - only ever yield a single value, not good for recurring events. Can't be cancelled?
  • Event emitters

What does SQL, spreadsheet has in common with Reactive system?

Thinking in Stream

You have to twist your mind and think in stream.

Imagine every variable is a stream. To get it, you need to figure out where it comes from. It could come from other variables, or it could come from an event. Figure out your side effects, these are the exit points for your data flow.

Memory Cleaning

No need for external state tracking. No need to cleanup after yourself either.

Observables

You can treat observable like arrays, arrays that are without values as yet.

Note: Observables and Event Emitters are just different variations on the Observer design pattern.

Observable == Stream of events from a data source like mouse events, network requests, array of strings, etc.

Observer == Handlers or listeners to do "your things" like combine or transformation, etc.

Observable is a datatype by Reactive Extensions (Rx). It abstracts away the concept of a data source that represent a stream of events. We treat mouse, click, network events as a "database" that we can query from.

It is being proposed for ES7. What does observable represents? It is a collection over time. Essentially, it is stream of events.

RxJS is push-based, so the source of events (Observable) will push new values to the consumer (Observer), without the consumer needing to request the next value.

// Observable always start from a place, a source if you will, to get data from.
// In this case, the data is just a simple click event generated by the user.
var clicks = Rx.Observable.fromEvent(button, 'click');

// Filter out clicks that happen on the right side of the screen and logs only the first 10 clicks
Rx.Observable.fromEvent(document, 'click')
  .filter(c => c.clientX > window.innerWidth / 2)
  .take(10)
  .subscribeOnNext(c => console.log(c.clientX, c.clientY));

We are subscribed to sequences, not only to discrete values.

Observables don't do anything until at least one Observer subscribes to them.

Observable is not a replacement for Enumerable. I would not recommend trying to take something that is naturally pull-based and force it to be push-based.

// Autocomplete example
var q = document.querySelector('#q');
var resultList = document.querySelector('#result');

var Observable = Rx.Observable;

var keyups = Rx.Observable.fromEvent(q, 'keyup');

keyups.throttle(500)
  .map(() => q.value)
  .do(() => q.classList.add('spinner))
  .flatMapLatest(query => Rx.DOM.ajax({
    method: 'GET',
    url: '/autocomplete?q=' + query,
    responseType: 'json'
  }))
  .do(() => q.classList.remove('spinner')) // Do side effect
  .map(r => r.response)
  .map(results => results.reduce((html, result) => `${html}<li>${result}</li>`))
  .subscribe(resultsHTML => resultList.innerHTML = resultsHTML);

Observer

Observer is the handler.

var observer = Observer.create(function next(x) {
});

myObservable.subscribe(observer);
myObservable.subscribe(function next(x) {}); // Same

myObservable.subscribe(success, error, complete);

Operators

Operators are methods on Observable that allow you to compose new observables.

In RxJS, methods that transform (map) or query sequences are called operators.

create and fromEvent are all creation operators. They create Observables for common sources.

Observables are immutable, and every operator applied to them creates a new Observables.

  • map
  • flatMap
  • filter
  • reduce
  • catch

Schedulers

RxJS with React

var Counter = React.createClass({
  componentWillMount() {
    this.onButtonClick = new Rx.Subject();
    
    this.subscription = (
      this.onButtonClick.start
      .startWith(0)
      .scan(acc => acc + 1)
      .subscribe(count => this.setState({count}))
    );
  },
  
  componentWillUnmount() {
    this.subscription.dispose();
  },
  
  render() {
    return (
      <div>
        {this.state.count}
        <button onClick={() => this.onButtonClick.onNext()}>Increment</button>
      </div>
    );
  }
});
// Auto-complete example
var words = Rx.Observable.fromEvent(input, "keyup")
            .map(function() { return input.value })
            .throttle(500)
            .distinctUntilChanged()
            .flatMapLatest(
              function(term) { return search(term) }
            );

words.subscribe(function(word) {});

Blog

Videos