Observables RxJS
About
RxJS
is a library for composing asynchronous and event-based programs by using observable
sequences.
It combines the Observer pattern
with the Iterator pattern
and functional programming with collections
.
It provides:
One core type
Observable
.Satellite types
Observer
,Schedulers
,Subjects
.And operators inspired by
Array
methods to allow handling asynchronous events as colletions.
Essential Concepts
Observable
Represents the ideia of an invokable collection or future values or events.
Observer
Is a collection of callbacks that knows how to listen to values delivered by the Observable.
Subscription
Represents the execution of an Observable, is primarily useful for cancelling the execution.
Operators
Are pure functions that enable a functional programming style of dealing with collections with operations like map
, filter
, concat
, reduce
, etc.
Subject
Is equivalent to an EventEmitter
, and the only way of multicasting a value or event to multiple Observers.
Schedulers
Are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g setTimeout
or requestAnimationFrame
or others.
Basic Example
Normally you register event listeners.
document.addEventListener("click", () => console.log());
With RxJS you create an observable instead.
fromEvent(document, "click").subscribe(() => console.log());
Observable
Observable
They are Push collections of multiple values.
In an Observable you may call:
next()
: sends a value to observers.error()
: sends a Javascript error or exception and finishes the Observable execution.complete()
: does not send any value and finishes the Observable execution.
Since Observables may be infinite, when you subscribe a subscription
object is returned, and may be used to unsubscribe
for clean up purposes.
// Here, this Observable is synchronously pushing 3 values, and after 1 second pushing the last.
const observable = new Observable((subscriber) => {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
} catch (err) {
subscriber.error(err);
}
});
const subscription = observable.subscribe((value) => console.log(value));
...
subscription.unsubscribe();
You can also return a custom unsubscribe
function.
const observable = new Observable((subscriber) => {
const intervalId;
try {
intervalId = setTimeout(() => {
subscriber.next(1);
subscriber.complete();
}, 1000);
} catch (err) {
subscriber.error(err);
}
// Custom method for disposing resources
return function unsubscribe() {
clearInterval(intervalId);
}
});
const subscription = observable.subscribe((value) => console.log(value));
...
subscription.unsubscribe();
Push vs Pull
Push and Pull are two different protocols that describe how a data Producer can communicate with a Consumer.
Pull
In Pull systems, the Consumer determines when it receives data from the Producer.
The Producer itself is unaware of when the data will be delivered to the Consumer.
Every Javascript function is a Pull System.
Push
In Push systems, the Producer determines when to send data to the Consumer.
The Consumer is unaware of when it will receive that data.
Promises are the most common type of Push system.
Operators
Operators
Operators are functions.
Pipeable Operators
Are the kind that can be piped, using the syntax observableInstance.pipe(operator)
.
With pipe you may create a chain of operators.
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified.
A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.
Creation Operators
Are another kind of operator, which can be called as standalone functions to create a new Observable
, for example of(1, 2, 3)
.
Subjects
Subjects
It is a special type of Observable that allows values to be multicasted to many Observers.
They maintain a registry of many listeners.
Regular Observable
are unicast.
From the perpective of an Observer
, it cannot tell whether the Observable execution is coming from a unicast Obeservable or a Subject.
In an Observable
you call next()
from inside the Observable function, and in Subjects you can call it from outside. (They are both Observable
and Observer
at the same time)
// You create one Subject
const subject = new Subject<number>();
// You can make multiple subscriptions, doing different things
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
// And you trigger values from the Subject object
subject.next(1);
subject.next(2);
They are similar to EventEmitters
, but are very useful to do cross-component event emitting.
There are some different variants os Subject
.
BehaviorSubject
BehaviorSubject
It has a notion of "the current value".
It sotres the lastes value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the "current value".
import { BehaviorSubject } from "rxjs";
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
ReplaySubject
ReplaySubject
Similar to BehaviorSubject
, in the way that you can send old values to new subscribers.
But instead of just the latest value, you can choose a buffer size of latests values.
You can also specify a window time in miliseconds, besides the buffer size, to determine how old the recored values can be.
import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers
const subject = new ReplaySubject(3, 500); // buffer 3 values for new subscribers with a limit of 500ms
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
AsyncSubject
AsyncSubject
Here, only the last value of the Observable is sent to its Observers, and only when the execution completes.
import { AsyncSubject } from "rxjs";
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
Void Subject
Void Subject
Sometimes the emitted value doesn't matter as much as the fact that a value was emitted.
Void subjects are used to emit signals, but it's value is not important.
const subject = new Subject<void>();
setTimeout(() => subject.next(), 1000);
Scheduler
Scheduler
It controls when a subscription starts and when notifications are delivered.
It consists of three components:
A scheduler is a data structure
, it knows how to store and queue tasks based on priority or other criteria.A scheduler is an execution context
, it denotes where and when the task is executed.A scheduler has a (virtual) clock
, it provides a notion of "time" but getter methodnow()
on the scheduler.
A scheduler lets you define in what execution context will an Observable deliver notifications to its Observer.
import { Observable, observeOn, asyncScheduler } from "rxjs";
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(observeOn(asyncScheduler));
console.log("just before subscribe");
observable.subscribe({
next(x) {
console.log("got value " + x);
},
error(err) {
console.error("something wrong occurred: " + err);
},
complete() {
console.log("done");
},
});
console.log("just after subscribe");
Examples in Angular
For Emitting New Data
@Component({ ... })
export class Component implements OnInit, OnDestroy {
private subscription: Subscription;
ngOnInit() {
// Here 'interval' will be the Observable
this.subscription = interval(1000).subscribe((count: number) => {
console.log(count);
});
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
// Writing the Observable from scratch
export class Component implements OnInit, OnDestroy {
private subscription: Subscription;
ngOnInit() {
const customObservable = Observable.create((observer) => {
let count = 0;
setInterval(() => {
observer.next(count);
count++;
}, 1000)
});
this.subscription = customObservable.subscribe((data: number) => {
console.log(data);
});
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
For errors
The observable is canceled when an error is thrown.
export class Component implements OnInit, OnDestroy {
private subscription: Subscription;
ngOnInit() {
const customObservable = Observable.create((observer) => {
let count = 0;
setInterval(() => {
observer.next(count);
if (count > 3) {
observer.error(new Error("Some error occurred!!"));
}
count++;
}, 1000);
});
this.subscription = customObservable.subscribe(
(data: number) => {
console.log(data);
},
(error: Error) => {
console.error(error.message);
}
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
For completion
It does not execute when an error was thrown.
export class Component implements OnInit, OnDestroy {
private subscription: Subscription;
ngOnInit() {
const customObservable = Observable.create((observer) => {
let count = 0;
setInterval(() => {
observer.next(count);
if (count === 3) {
observer.complete();
}
count++;
}, 1000);
});
this.subscription = customObservable.subscribe(
(data: number) => {
console.log(data);
},
(error: Error) => {
console.error(error.message);
},
() => {
console.log("Completed");
}
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
Last updated