Reinventing the wheel is often necessary in order to understand how a library works under the hood and to better learn something complex.
It is a very good option, contrary to popular belief, in the case of a developer wanting to improve his/her knowledge.
Max Gallo, at Codemotion Rome 2019, performed a live coding session, where he showed us how we can reverse engineer the RxJS library, the Reactive Extensions for JavaScript language which allows you to write reactive programs with a functional approach.
Max Gallo is an engineer working at DAZN, and he is experienced as a Full Stack Developer, iOS Developer and UI/UX Designer.
RxJS is a library with a clear API to work with both asynchronous and synchronous code, thanks to pipeline operators and using the concept of Observable.
The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and automatically notifies them of any state changes (usually by calling one of their methods). (Wikipedia)
What is the goal of this topic? A lot of people are afraid of Observable and RxJS library, so Max’s intention was to show us how we can build something like RxJS, of course, not all of the standard library, just some line of working demo code, and he did with a VI editor. Cool! We can now create custom operators.
An operator is simple a function, they are the horse-power behind observables, providing an elegant, declarative and common API to complex tasks in both async/sync world.
If you’ve used RxJS before and want to better understand some internal aspects of the library and the magic behind Observables, as well as the operators, then this post is for you.
Firstly, Max began with showing us some RxJS code, how to create an Observable from an Array with some operators within the pipeline operator, introduced with the latest RxJS 6 release (it was actually already added in RxJS 5.5), in substitution of dot-chaining operators. Then, a step forward, he rewrote a couple of the most common function operators: from and map, fully compatible with RxJS. So we can now write our custom operators! Wow!
RxJS
Let’s explore this. The purpose of the library is to offer a wide range of operators to transform, manipulate and create a stream of data from a source such as an event from UI, from a XHR response or from some sort of data (an array, a collection or whatever you want to stream and observe).
A stream is a collection of future values and it’s called Subject or Observable; it can emit three different kinds of event with a value (object, array, primitive value), an error or a completed event when the full cycle of values are finished.
Observable is an Object with a subscribe method on it. This subscribe method takes the observer as an argument and is used to invoke the execution of the Observable. In the RxJS library, the subscribe method returns a reference to an unsubscribe, which allows us to spot listen to the values emitted by Observable.
Observables can either be asynchronous or synchronous, depending on the underlying implementation. An example of an asynchronous observable is one that wraps an XHR call or a timer function, while a stream of values comes from an array can be synchronous.
However, we capture these emitted events only asynchronously, with an Observer, by defining three parameters:
- a function for the value emitted is succeeded;
- a function when an error is emitted;
- a function when ‘completed’ is emitted.
Observer is a collection of callback functions, which reacts to the value arriving via Observable.
You can recap the relationship from Observable, Observer, Subscribe and Subscription with a type definition in TypeScript:
Observable.subscribe(observer:Observer):Subscription;
So far, we have read what is an Observable and how to subscribed to them. With RxJS we can program in a functional way with asynchronous values, we can use Array’s map, filter or any other methods and operators to modify the original Observable.
Those operators are chainable and composable and to work with them we have a method on the Observable object named pipe. This pipe method takes single or multiple operators as an input and returns a new Observable.
Here is an example of using a filter and map operator in RxJS:
const { from } = require('rxjs');
const { map, filter } = require('rxjs/operators');
const observable = from([1, 2, 3, 4, 5])
.pipe(
map ( x => x + 1),
filter( x => x % 2 === 0),
map( x => x - 1)
);
observable.subscribe(
val => console.log('odd:', val),
error => console.error(error),
() => console.log('complete')
);
The syntax looks like very familiar to the functional world where we have:
- Explicit subscription with Observable.subscribe;
- A set of operators;
- Observable and Pipeline operators are library-specific today but can become language-specific in the future;
Reinventing RxJS
Let’s explore RxJS by starting writing own operator. We will start with from operator to generate a stream of values.
To be compliant with RxJS specification, take some initial data and return an object with the pipe function. The pipe method accepts a set of functions as parameters. We are going to use the rest operators from ES6 because we don’t know how many functions we will have. The output of the pipe is an Observable object with a subscribe method used for consuming it:
function from(initialData) {
return {
pipe(...pipeFunction) {
return {
subscribe(onNext, onError, onComplete) {
}
}
}
}
}
The subscribe method has an object called dataObservable with a subscribe method too, which takes the function that will be the function we pass to the subscribe method to take the emitted value. This innermost subscribe method emits the value with a forEach passing the next method, in this case a simple console.log function. At this time we don’t apply any pipe function, we do it early.
Before we move on and made our code more reusable, we must define a function called createObservable:
function createObservable(operator) {
return {
subscribe(next) {
operator(next);
}
}
}
As we can see from the picture, RxJS can chain more than one operator, where we got an observable source and produce another observable for the next operator. So we have the previous, the current and the next observable. When we have to produce the next value, we want to apply a pipe function, so we need to define it inside our from function:
function from(initialData){
return {
pipe: (...pipeFunctions) => {
return {
subscribe: (onNext, onError, onComplete) => {
const dataObservable = createObservable(x => initialData.forEach(x))
let currentObservable = dataObservable;
pipeFunctions.forEach(pipeFunc => {
currentObservable = pipeFunc(currentObservable)
})
currentObservable.subscribe(onNext);
onComplete();
}
}
}
}
}
Now we can define our custom map and filter:
function map(mapFunction){
return sourceObservable => {
const currentObservable = createObservable(destinationNext => {
sourceObservable.subscribe(value => {
const newValue = mapFunction(value)
destinationNext(newValue);
})
})
return currentObservable;
}
}
function filter(filterFunction){
return sourceObservable => {
const currentObservable = createObservable(destinationNext => {
sourceObservable.subscribe(value => {
if(filterFunction(value)){
destinationNext(value);
}
})
})
return currentObservable;
}
}
The filter function takes one parameter that returns a source observable, we construct the observable and subscribe to it where we need to check if my filter function of value is true and, if that is true, we get to the next destination with this value.
So to recap, I create an observable from my initial data, I pipe all my functions one with another, and at the end I subscribe to the final observable.
So far, all that we see is synchronous, we don’t talk about order and speed. How I can control the order and the speed of the emission of my value.
Scheduler in RxJS controls the order of event emissions and the speed of those event emissions.
In RxJS we have five kinds of schedulers available, three of them are very useful and common: asyncScheduler, queueScheduler and asapScheduler.
We can use Virtual Scheduler to control the speed of our Observable, especially in test ambient:
const { interval } = require('rxjs');
const { take, filter } = require('rxjs/operators');
const source = interval(1000)
.pipe(take(3600));
source.subscribe(
console.log,
console.err,
() => console.log('--> Completed!'),
);
console.log('--> Subscribed!')
So, to better understand how something works under the hood, try to disassemble, simplify and challenge it by reinventing the wheel to learn – and please share it with us. Good luck!