Rxjs One Observable Feeding into Another

I have a rather clunky looking set of code where the data from one observable is feed into another, like such:

let source = this.myService.getFoo()
   .subscribe(result => {
      let source2 = this.myService.getMoo(result)
         .subscribe(result2 => { // do stuff });
   });

I know that there are ways to combine and chain but I need data from source to be feed into source2. The nesting of subscribes looks terrible and I'm pretty certain there is a better way to do this.

Thanks!


Solution 1:

For transforming items emitted by an Observable into another Observable, you probably want to use the flatMap operator. It creates an inner Observable and flats its result to the outer stream.

const source = this.myService
    .getFoo()
    .pipe(
        flatMap(result => this.myService.getMoo(result))
     )
    .subscribe(result2 => {
        // do some stuff
    });

Here are some flat operators you can use that behave differently:

  • flatMap/mergeMap - creates an Observable immediately for any source item, all previous Observables are kept alive
  • concatMap - waits for the previous Observable to complete before creating the next one
  • switchMap - for any source item, completes the previous Observable and immediately creates the next one
  • exhaustMap - map to inner observable, ignore other values until that observable completes

Solution 2:

You can use merge to "merge" the two streams so any emission from any of the two Observables is sent to observers:

let source = this.myService.getFoo()
  .merge(this.myService.getMoo(result))
  .subscribe(...);

... or if you want to subscribe to the second Observable only after the first one emitted a value you can use mergeMapTo:

let source = this.myService.getFoo()
  .mergeMapTo(this.myService.getMoo(result))
  .subscribe(...);

You can do a similar thing with concatMap that will wait until the inner Observable compeletes.

However, it really depends on what you're trying to achieve and what you want to receive in your observers.