kdocs
GitHub
Lang - Web
Lang - Web
  • Base
    • Css
    • Javascript
    • Typescript
      • New Project
  • Frameworks
    • Angular
      • Directives
      • Components
      • Templates
        • Bindings
        • Control Flow
        • Variables
      • Signals
      • Pipes
      • Services
        • Dependency Injection
      • Forms
        • Reactive Form
        • Template-Driven Form
      • Router
      • HTTP Client
      • Observables RxJS
      • Testing
        • Components
        • Directives
        • Pipes
        • Services
      • Optimization & Performance
      • Security
Powered by GitBook
On this page
  • About
  • Essential Concepts
  • Observable
  • Push vs Pull
  • Pull
  • Push
  • Operators
  • Pipeable Operators
  • Creation Operators
  • Subjects
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject
  • Void Subject
  • Scheduler
  • Examples in Angular
  • For Emitting New Data
  • For errors
  • For completion
  1. Frameworks
  2. Angular

Observables RxJS

PreviousHTTP ClientNextTesting

Last updated 5 months ago

About

RxJS is a library for composing asynchronous and event-based programs by using observable sequences.

It combines the Observer pattern with the Iterator pattern and functional programming with collections.

It provides:

  • One core type Observable.

  • Satellite types Observer, Schedulers, Subjects.

  • And operators inspired by Array methods to allow handling asynchronous events as colletions.

Essential Concepts

Observable

Represents the ideia of an invokable collection or future values or events.

Observer

Is a collection of callbacks that knows how to listen to values delivered by the Observable.

Subscription

Represents the execution of an Observable, is primarily useful for cancelling the execution.

Operators

Are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc.

Subject

Is equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.

Schedulers

Are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g setTimeout or requestAnimationFrame or others.

Basic Example

Normally you register event listeners.

document.addEventListener("click", () => console.log());

With RxJS you create an observable instead.

fromEvent(document, "click").subscribe(() => console.log());

Observable

They are Push collections of multiple values.

In an Observable you may call:

  • next(): sends a value to observers.

  • error(): sends a Javascript error or exception and finishes the Observable execution.

  • complete(): does not send any value and finishes the Observable execution.

Since Observables may be infinite, when you subscribe a subscription object is returned, and may be used to unsubscribe for clean up purposes.

// Here, this Observable is synchronously pushing 3 values, and after 1 second pushing the last.
const observable = new Observable((subscriber) => {
    try {
        subscriber.next(1);
        subscriber.next(2);
        subscriber.next(3);
        setTimeout(() => {
            subscriber.next(4);
            subscriber.complete();
        }, 1000);
    } catch (err) {
        subscriber.error(err);
    }
});

const subscription = observable.subscribe((value) => console.log(value));

...

subscription.unsubscribe();

You can also return a custom unsubscribe function.

const observable = new Observable((subscriber) => {
    const intervalId;

    try {
        intervalId = setTimeout(() => {
            subscriber.next(1);
            subscriber.complete();
        }, 1000);
    } catch (err) {
        subscriber.error(err);
    }

    // Custom method for disposing resources
    return function unsubscribe() {
        clearInterval(intervalId);
    }
});

const subscription = observable.subscribe((value) => console.log(value));

...

subscription.unsubscribe();

Push vs Pull

Push and Pull are two different protocols that describe how a data Producer can communicate with a Consumer.

Pull

In Pull systems, the Consumer determines when it receives data from the Producer.

The Producer itself is unaware of when the data will be delivered to the Consumer.

  • Every Javascript function is a Pull System.

Push

In Push systems, the Producer determines when to send data to the Consumer.

The Consumer is unaware of when it will receive that data.

  • Promises are the most common type of Push system.

Operators

Operators are functions.

Pipeable Operators

Are the kind that can be piped, using the syntax observableInstance.pipe(operator).

With pipe you may create a chain of operators.

A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified.

  • A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.

Creation Operators

Are another kind of operator, which can be called as standalone functions to create a new Observable, for example of(1, 2, 3).

Subjects

It is a special type of Observable that allows values to be multicasted to many Observers.

They maintain a registry of many listeners.

Regular Observable are unicast.

From the perpective of an Observer, it cannot tell whether the Observable execution is coming from a unicast Obeservable or a Subject.

In an Observable you call next() from inside the Observable function, and in Subjects you can call it from outside. (They are both Observable and Observer at the same time)

// You create one Subject
const subject = new Subject<number>();

// You can make multiple subscriptions, doing different things
subject.subscribe({
    next: (v) => console.log(`observerA: ${v}`),
});
subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
});

// And you trigger values from the Subject object
subject.next(1);
subject.next(2);

They are similar to EventEmitters, but are very useful to do cross-component event emitting.

There are some different variants os Subject.

BehaviorSubject

It has a notion of "the current value".

It sotres the lastes value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the "current value".

import { BehaviorSubject } from "rxjs";
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
    next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);

subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
});

subject.next(3);

// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject

Similar to BehaviorSubject, in the way that you can send old values to new subscribers.

But instead of just the latest value, you can choose a buffer size of latests values.

You can also specify a window time in miliseconds, besides the buffer size, to determine how old the recored values can be.

import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers
const subject = new ReplaySubject(3, 500); // buffer 3 values for new subscribers with a limit of 500ms

subject.subscribe({
    next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

AsyncSubject

Here, only the last value of the Observable is sent to its Observers, and only when the execution completes.

import { AsyncSubject } from "rxjs";
const subject = new AsyncSubject();

subject.subscribe({
    next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

Void Subject

Sometimes the emitted value doesn't matter as much as the fact that a value was emitted.

Void subjects are used to emit signals, but it's value is not important.

const subject = new Subject<void>();
setTimeout(() => subject.next(), 1000);

Scheduler

It controls when a subscription starts and when notifications are delivered.

It consists of three components:

  • A scheduler is a data structure, it knows how to store and queue tasks based on priority or other criteria.

  • A scheduler is an execution context, it denotes where and when the task is executed.

  • A scheduler has a (virtual) clock, it provides a notion of "time" but getter method now() on the scheduler.

A scheduler lets you define in what execution context will an Observable deliver notifications to its Observer.

import { Observable, observeOn, asyncScheduler } from "rxjs";

const observable = new Observable((observer) => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
}).pipe(observeOn(asyncScheduler));

console.log("just before subscribe");
observable.subscribe({
    next(x) {
        console.log("got value " + x);
    },
    error(err) {
        console.error("something wrong occurred: " + err);
    },
    complete() {
        console.log("done");
    },
});
console.log("just after subscribe");

Examples in Angular

For Emitting New Data

@Component({ ... })
export class Component implements OnInit, OnDestroy {
    private subscription: Subscription;

    ngOnInit() {
        // Here 'interval' will be the Observable
        this.subscription = interval(1000).subscribe((count: number) => {
            console.log(count);
        });
    }

    ngOnDestroy() {
        this.subscription.unsubscribe();
    }
}

// Writing the Observable from scratch

export class Component implements OnInit, OnDestroy {
    private subscription: Subscription;

    ngOnInit() {
        const customObservable = Observable.create((observer) => {
            let count = 0;
            setInterval(() => {
                observer.next(count);
                count++;
            }, 1000)
        });

        this.subscription = customObservable.subscribe((data: number) => {
            console.log(data);
        });
    }

    ngOnDestroy() {
        this.subscription.unsubscribe();
    }
}

For errors

The observable is canceled when an error is thrown.

export class Component implements OnInit, OnDestroy {
    private subscription: Subscription;

    ngOnInit() {
        const customObservable = Observable.create((observer) => {
            let count = 0;
            setInterval(() => {
                observer.next(count);
                if (count > 3) {
                    observer.error(new Error("Some error occurred!!"));
                }
                count++;
            }, 1000);
        });

        this.subscription = customObservable.subscribe(
            (data: number) => {
                console.log(data);
            },
            (error: Error) => {
                console.error(error.message);
            }
        );
    }

    ngOnDestroy() {
        this.subscription.unsubscribe();
    }
}

For completion

It does not execute when an error was thrown.

export class Component implements OnInit, OnDestroy {
    private subscription: Subscription;

    ngOnInit() {
        const customObservable = Observable.create((observer) => {
            let count = 0;
            setInterval(() => {
                observer.next(count);
                if (count === 3) {
                    observer.complete();
                }
                count++;
            }, 1000);
        });

        this.subscription = customObservable.subscribe(
            (data: number) => {
                console.log(data);
            },
            (error: Error) => {
                console.error(error.message);
            },
            () => {
                console.log("Completed");
            }
        );
    }

    ngOnDestroy() {
        this.subscription.unsubscribe();
    }
}
RxJS
overview/rxjs
Logo
RxJS
rxjs/subject
Logo
RxJS
rxjs/operators
Logo
RxJS
rxjs/schedulers
RxJS
rxjs
Logo
Logo