RxJS Tutorial from Codingcompiler. RxJS is a responsive programming library that uses Observables to make writing asynchronous or callback-based code easier. This project is a rewrite of Reactive-Extensions/RxJS (RxJS 4) with better performance, better modularity, better debug call stack, while maintaining most backwards compatibility, with only some destructive changes. (breaking changes) is to reduce the outer layer of the API.
RxJS Tutorial – Getting Started
RxJS is a library that writes asynchronous and event-based programs by using observable sequences. It provides a core type of Observable , dependent types (Observer, Schedulers, Subjects) and operators inspired by [Array#extras] (map, filter, reduce, every, etc.). These array operators can take asynchronous events as Collection to handle.
RxJS can be used as a handle events Lodash .
ReactiveX combined observer pattern , the iterative mode and functional programming set used to satisfy all to an ideal way to manage the sequence of events required.
The basic concepts used in RxJS to solve asynchronous event management are:
- Observable: Represents a concept that is a collection of future values or events that can be called.
- Observer: A collection of callback functions that know how to listen for values provided by Observable.
- Subscription: Indicates the execution of the Observable, mainly used to cancel the execution of the Observable.
- Operators: A pure function that uses a functional programming style to handle collections using operators such as map, filter, concat, , flatMapand so on.
- Subject: Equivalent to EventEmitter and is the only way to push values or events to multiple Observers.
- Schedulers (scheduler): used to control the concurrency and centralized dispatcher, allows us to calculate the coordinate occurs when, for example setTimeout, or requestAnimationFrame other.
RxJS First Example
Register the regular way of writing event listeners.
var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));
To use RxJS, create an observable instead.
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscribe(() => console.log('Clicked!'));
Purity
What makes RxJS powerful is its ability to use pure functions to generate values. This means your code is less prone to errors.
Usually you will create a non-pure function, and use the shared variable code outside of this function, which will make your application state mess.
var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));
With RxJS, you will isolate the application state.
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
The scan operator works like the array’s reduce . It takes an initial value that is exposed to the callback function as a parameter. The return value after each callback function runs will be used as the argument for the next callback function run.
Liquidity (Flow)
RxJS provides a set of operators to help you control how events flow through observables.
The following code shows how to control up to one click in a second, first look at using normal JavaScript:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) {
console.log(`Clicked ${++count} times`);
lastClick = Date.now();
}
});
Use RxJS:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
Other flow control operators are filter , delay , debounceTime , take , takeUntil , distinct , distinctUntilChanged , and so on.
Values
For values that flow through observables, you can convert them.
The following code shows how to accumulate mouse x coordinates for each click, first look at using normal JavaScript:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count)
lastClick = Date.now();
}
});
Use RxJS:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.map(event => event.clientX)
.scan((count, clientX) => count + clientX, 0)
.subscribe(count => console.log(count));
Other operators that produce values are pluck , pairwise , sample , and so on.
Observable (Observable Object)
Observables are lazy push collections of multiple values. It fills in the blanks in the table below:
Single value | Multiple values | |
Pull | Function | Iterator |
Push | Promise | Observable |
Example – When you subscribe to the Observable in the code below, the value 1, 2, 3, is pushed immediately (synchronously) , then the value is pushed after 1 second 4, and then the stream is completed:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
To call the Observable and see these values, we need to subscribe to the Observable:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
The result of the console execution:
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
Pull vs Push in RxJS
Pull and push are two different protocols that describe how a data producer communicates with a data consumer .
What is pull?
In the pull system, it is up to the consumer to decide when to receive data from the producer. The producer itself does not know when the data was delivered to the consumer.
Every JavaScript function is a pull system. A function is the producer of the data, and the code that calls the function consumes the function by “fetching” a single return value from the function call .
ES2015 introduces the generator function and iterators ( function*), which is another type of pull system. Call iterator.next() code is that consumers, it was “removed” from the iterator (producer) multiple values.
Producer | consumer | |
Pull | Passive: Generates data when requested. | Proactive: Decide when to request data. |
Push | Active: Generate data at your own pace. | Passive: Respond to the data received. |
What is push?
In the push system, the producer decides when to send the data to the consumer. The consumer itself does not know when it will receive the data.
In today’s JavaScript world, Promises is the most common type of push system. Promise (producer) passes a parsed value to the registered callback function (consumer), but unlike the function, it is up to Promise to decide when to “push” the value to the callback function.
RxJS introduces Observables, a new JavaScript push system. Observable is the producer of multiple values and “pushes” the value to the observer (consumer).
- Function is a lazy evaluation operation that returns a single value synchronously when called.
- The Generator is a lazy evaluation operation that synchronously returns zero to (possibly) an infinite number of values.
- Promise is an operation that may or may not return a single value.
- An Observable is a lazy evaluation operation that returns zero (possibly) an infinite number of values synchronously or asynchronously from the moment it is called.
Observables as a generalization of functions
Contrary to popular sayings, Observables are neither like Event Emitters nor Promises with multiple values. In some cases, when using RxJS’s Subjects for multicasting, Observables may behave like EventEmitters, but usually Observables behave like EventEmitters.
Observables are functions that have no parameters but can be generalized to multiple values.
Consider the following code:
function foo() {
console.log('Hello');
return 42;
}
var x = foo.call(); // Equivalent to foo()
console.log(x);
var y = foo.call(); // Equivalent to foo()
console.log(y);
The output we are looking forward to seeing:
"Hello"
42
"Hello"
42
You can override the above code with Observables:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
});
foo.subscribe(function (x) {
console.log(x);
});
foo.subscribe(function (y) {
console.log(y);
});
The output is the same:
"Hello"
42
"Hello"
42
This is because both functions and Observables are lazy. If you don’t call the function, console.log(‘Hello’) it won’t execute. The same is true for Observables, and if you don’t “call” it (using it subscribe), console.log(‘Hello’) it won’t execute.
In addition, “call” or “subscribe” is a stand-alone operation: two function calls trigger two separate side effects, and two Observable subscriptions also trigger two separate side effects.
EventEmitters share side effects and will execute as early as possible regardless of whether a subscriber exists or not. Observables, on the other hand, does not share side effects and is delayed.
Subscribing to Observable is similar to calling a function.
Some people claim that Observables are asynchronous. that is not true. If you surround a function call with a log, like this:
console.log('before');
console.log(foo.call());
console.log('after');
You will see an output like this:
"before"
"Hello"
42
"after"
Use Observables to do the same thing:
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
The output is:
"before"
"Hello"
42
"after"
This proves that foothe subscription is fully synchronous, just like a function.
Observables pass values can be synchronous or asynchronous.
So what is the difference between Observable and function? Observable can “return” multiple values over time , which is not possible with functions. You can’t do this:
function foo() {
console.log('Hello');
return 42;
return 100; // Dead code, never executed
}
The function can only return one value. But Observables can do this:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100); // "return" another value
observer.next(200); // You can also "return" the value
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
Synchronous output:
"before"
"Hello"
42
100
200
"after"
But you can also “return” the value asynchronously:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // Asynchronous execution
}, 1000);
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
Output:
"before"
"Hello"
42
100
200
"after"
300
in conclusion:
- func.call() Means ” Give me a value synchronously “
- observable.subscribe() Means ” Give me any number of values, whether synchronous or asynchronous “
Anatomy of Observable
Observables to use Rx.Observable.create or create the operator to create and use the viewer to subscribe to it, and then execute it and send next/ error/ complete notification to the observer, and execution may be cleaning up . All four aspects are encoded in the Observables instance, but some aspects are related to other types, such as Observer and Subscription.
The core concerns of Observable:
- Create Observables
- Subscribe to Observables
- Executing Observables
- Clean up Observables
Create Observables
Rx.Observable.create is Observable an alias constructor that takes a single argument: subscribe the function.
The following example creates an Observable that sends a string to the observer every second ‘hi’.
var observable = Rx.Observable.create(function subscribe(observer) {
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});
Observables can use create to create, but usually we use so-called creation operators , like of, , from, interval and so on.
In the example above, the subscribe function is the most important piece to describe the Observable. Let’s see what the subscription means.
Subscribe to Observables
Observable examples of objects observable can subscribe to , like this:
observable.subscribe(x => console.log(x));
Observable.subscribe And Observable.create(function subscribe(observer) {…})the subscribe same with the name, this is not a coincidence. In the library, they are different, but from reality, you can think that they are conceptually equivalent.
This indicates that subscribecalls between multiple observers same Observable is not shared. When a call observer observable.subscribe when Observable.create(function subscribe(observer) {…})the subscribe function is to serve only given observer. For observable.subscribe each call to set up an independent triggers for a given observer.
Subscribing to an Observable is like calling a function and providing a callback function that receives the data.
This is completely different from the event handling method API like addEventListener/ removeEventListener. Use observable.subscribe, does not register a given observer as a listener in the Observable. Observable won’t even maintain an additional list of observers.
subscribe A call is a simple way to start an “Observable Execution” and pass a value or event to the observer of this execution.
Executing Observables
Observable.create(function subscribe(observer) {…})…The code in the code stands for “Observable Execution”, which is a lazy operation that is executed only after each observer subscribes. Over time, execution produces multiple values in a synchronous or asynchronous manner.
An Observable implementation can pass three types of values:
- “Next” notification: Send a value such as a number, a string, an object, and so on.
- “Error” notification: Send a JavaScript error or exception.
- “Complete” notification: No values are sent anymore.
The “Next” notification is the most important and most common type: they represent the actual data passed to the observer. The “Error” and “Complete” notifications may only occur once during the execution of the Observable and only one of them will be executed.
These constraints are best expressed in the so-called Observable syntax or contract , written as a regular expression like this:
next*(error|complete)?
In the Observable execution, zero to an infinite number of “Next” notifications may be sent. If you send an “Error” or “Complete” notification, then no further notifications will be sent.
The following is an example of Observable execution, which sends three “Next” notifications, followed by a “Complete” notification:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
Observable strictly adheres to its own specifications, so the following code will not send a “Next” notification 4:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); //Because of violation of the statute,So will not send
});
In subscribe using try/catch code block to wrap arbitrary code is a good idea if the captured abnormal, sends “Error” notification:
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // Send an error if an exception is caught
}
});
Clean up Observable execution
Because Observable execution can be infinite, and observers usually want to abort execution for a limited amount of time, we need an API to cancel execution. Because each execution is specific to its corresponding observer, once the observer completes the received value, it must have a way to stop execution to avoid wasting computing power or memory resources.
When invoked observable.subscribe, the observer is attached to the newly created Observable implementation. This call also returns an object, ie Subscription(subscription):
var subscription = observable.subscribe(x => console.log(x));
Subscription represents ongoing execution and it has a minimal API to allow you to cancel execution. Want to know more subscribe to relevant content, see Subscriptiontype . Use subscription.unsubscribe()You can cancel the execution of:
var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
//Later:
subscription.unsubscribe();
When you subscribe to the Observable, you get a Subscription that indicates the execution in progress. Just call the unsubscribe()method can cancel the execution.
When we use create()the time to create a method Observable, Observable must define how to clean up resources for implementation. You can in function subscribe()return a self-defined unsubscribe function.
For example, this is how we used to clean up set Interval the implementation of the collection interval:
var observable = Rx.Observable.create(function subscribe(observer) {
// Tracking interval resource
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
// Provide a way to cancel and clean up the interval resource
return function unsubscribe() {
clearInterval(intervalID);
};
});
As observable.subscribe similar Observable.create(function subscribe() {…}), from subscribe returning unsubscribe be equivalent to the concept subscription.unsubscribe. In fact, if we leave the ReactiveX type around these concepts, the only thing left is pretty simple JavaScript.
function subscribe(observer) {
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
//Later:
unsubscribe(); // Clean up resources
Why do we want to use Rx types like Observable, Observer, and Subscription? The reason is to ensure the security of the code (such as the Observable protocol) and the combinability of the operators.
RxJS Observer
What is an observer?
The observer is the consumer of the value sent by the Observable. The observer is just a collection of callback functions, each of which corresponds to a notification type sent by the Observable: next, error and complete. The following example is a typical observer object:
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
To an observer, it needs to be provided to the Observable subscribe methods:
observable.subscribe(observer);
The observer is just an object with three callback functions, each of which corresponds to a notification type sent by the Observable.
Observers in RxJS may also be partial . If you don’t provide a callback function, the execution of the Observable will work, but some notification types will be ignored because there is no corresponding callback function in the observer.
The following example is not complete the callback function observer:
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
};
When subscribing to an Observable, you might only provide a callback function as a parameter and not attach it to the observer object, for example:
observable.subscribe(x => console.log('Observer got a next value: ' + x));
In the observable.subscribe interior, it creates a viewer object and use the first callback parameter as the next processing method. Three types of callback functions can be provided directly as arguments:
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
RxJS Subscription
What is a Subscription?
Subscription is an object that represents a cleanable resource, usually the execution of an Observable. Subscription has an important way, that is unsubscribe, it does not require any parameters, just to clean up the resources occupied by the Subscription. In the previous version of RxJS, the Subscription was called “Disposable“.
var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// Later:
// This will cancel the ongoing Observable carried out
// Observable execution is initiated by using the observer to call the subscribe method.
subscription.unsubscribe();
Subscription is basically only a unsubscribe() function that is used to release resources or to cancel Observable execution.
Subscription can also be combined, so a Subscription to call unsubscribe() a method, there may be multiple Subscription unsubscribe. You can do this by adding a Subscription to another:
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// subscription with childSubscription Will cancel the subscription
subscription.unsubscribe();
}, 1000);
When executed, we saw in the console:
second: 0
first: 0
second: 1
first: 1
second: 2
Subscriptions there is a remove (other Subscription) way to undo a sub Subscription has been added.