promise.all in rxjs

The Promise land – Promise.all in RXJS

Promise all is an excellent feature in the promised land :-), it lets us run concurrent async requests in parallel, and in addition, it even notifies us when all of the promises have been resolved. Basically mapping us from an array or iterable of promises and leaving us with only one promise to subscribe. If any of the input promises happen to reject for some reason or any of the non-promised inputs throw an error this will cause the newly created promise to reject also with this first rejection message/error. Nowadays it’s much more recommended to use the new API Promise.allSettled() – this method returns a promise that resolves after all of the given promises have either been fulfilled or rejected, If all goes well we will receive an array of objects that each describes the outcome of each promise. Note that the return value will be different since each promise result is now represented as an object containing status and value fields:

{status: "fulfilled", value: ....},//  where status can be fulfilled or rejected
{status: "rejected",  reason: Error: some error}

The Rxjs land

RXJS is different in philosophy by declaring that everything is a stream. meaning the data is flowing over time instead of a single result at a single point in time – we then need to distinguish two use cases – have the streams completed or not

Option 1 – forkJoin vs Promise.all

The forkJoin operator takes an array of observables and returns a single observable that emits an array of the last values emitted by each of the observables. It will only emit values when all of the observables have been completed.

import { forkJoin } from 'rxjs';

const obs1 = of(1, 2, 3);
const obs2 = of(4, 5, 6);

const result = forkJoin(obs1, obs2);
result.subscribe(value => console.log(value));

// Logs [3, 6]

When all observables are complete, emit the last emitted value from each. When choosing to use fork-join to simulate the promise.all behavior we are counting that all streams will emit the “complete” event and we will receive the last emitted value from each stream. This is a good case for a list of async HTTP calls that we wish to run in parallel – notice we can provide a map to the fork-join and not just an array so each result is mapped to a key, much more convenient this way in my opinion.

A real-life example when you need data from 3 different APIs concurrently

import { ajax } from 'rxjs/ajax';
import { forkJoin } from 'rxjs';

forkJoin(
  {
    url1: ajax.getJSON('https://url1.com'),
    url2: ajax.getJSON('https://url2.com'),
    url3: ajax.getJSON('https://url3.com'),
  }
)
  .subscribe(console.log);

// { url1: object, url2: object, url3: object }

NOTICE! if one of the inner observables will emit an error you will lose all the data that has already been completed – if this is not your intention then pipe to the inner observable and handle the error with a catch statement.

.pipe(catchError(error => // handle it here));

* catchError – Catches errors on the observable to be handled by returning a new observable or throwing an error.

Make sure to use forkJoin only when you are certain you will encounter the complete event on the stream!

Option 2 – Zip vs Promise.all

The zip operator will receive as an input an array of observables and will return a new observable that you can subscribe to. It will emit once all the input observables emit – meaning it will wait for all the Inputs to emit at least once before emitting its own result. It will not emit until all the input observables had emitted. If any of the input streams had emitted a complete event and the zip already emitted then the zip stream will be closed too. Make sure you handle error events as before in the inner streams. The zip stream emits an array that you can destruct into your result object using the spread operator.

(zip(stream1$, stream2$))
    .subscribe(([stream1Result, stream2Result]) => console.log(`${stream1Result} vs ${stream2Result}`));

Another example

import { zip, of } from 'rxjs';

const obs1 = of(1, 2, 3);
const obs2 = of(4, 5, 6);

const result = zip(obs1, obs2);
result.subscribe(value => console.log(value));

// Logs [1, 4], [2, 5], [3, 6]

In this example, obs1 and obs2 are two observables. The zip operator takes these observables as input and returns an observable that emits an array of values, where each value is an array of values from obs1 and obs2. The resulting observable only emits a value when it has received a value from both obs1 and obs2.

Option 3 – CombineLatest vs Promise.all

The CombineLatest operator behaves similarly to Zip, but while Zip emits only when each Observable source has previously emitted an item, CombineLatest emits an item whenever any of the source Observables emits combing all of the latest results from each stream.

(combineLatest(stream1$, stream2$))
    .subscribe(([stream1Result, stream2Result]) => console.log(`${stream1Result} vs ${stream2Result}`));

Option 4- mergeWith vs Promise.all

The mergeWith operator in RxJS is used to combine two observables into a single observable by merging their emissions.

It can be used as an alternative to forkJoin in situations where you want to emit values from both observables as soon as they are available, rather than waiting for both observables to complete before emitting a single value.

import { merge, of } from 'rxjs';

const obs1 = of(1, 2, 3);
const obs2 = of(4, 5, 6);

const result = obs1.pipe(mergeWith(obs2));
result.subscribe(value => console.log(value));

// Logs 1, 2, 3, 4, 5, 6

By Piping a base observable with other observables as input you can merge the observables together – creating a new observable that will emit on every value from one of the sources. Once all of the sources are completed the new merged observable will be complete. On any error from one of the observables merged together the resulting observable will error too.

import { fromEvent, map, mergeWith, interval } from 'rxjs';

const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));
const numbers$ = interval(1000).pipe(map(() => 'tick 1'));
const numbers2$ = interval(2000).pipe(map(() => 'tick 2'));

mousemoves$
  .pipe(mergeWith(numbers$, numbers2$))
  .subscribe(x => console.log(x));


// 'tick1'
// 'tick 1'
// 'tick 2'
// 'click'
// 'click'
// 'tick 1'
// 'tick 1'
// 'tick 2'
// 'click'

Any thoughts? please share!

Read more about rxjs here

Finding The Alternatives to Promise.all in RXJS

Also published on Medium.

Yoni Amishav


Tech lead, blogger, node js Angular and more ...


Post navigation


Leave a Reply

Free Email Updates
Get the latest content first.
We respect your privacy.
%d