How to chain two Completable in RxJava2
You are looking for andThen
operator.
Returns a Completable that first runs this Completable and then the other completable.
firstCompletable
.andThen(secondCompletable)
In general, this operator is a "replacement" for a flatMap
on Completable
:
Completable andThen(CompletableSource next)
<T> Maybe<T> andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T> andThen(Publisher<T> next)
<T> Single<T> andThen(SingleSource<T> next)
TL;DR: the other answers miss a subtlety. Use doThingA().andThen(doThingB())
if you want the equivalent of concat
, use doThingA().andThen(Completable.defer(() -> doThingB())
if you want the equivalent of flatMap
.
EDIT: a more complete reference
-
flatMap()
is the mapping version ofmerge()
-
concatMap()
is the mapping version ofconcat()
- For a
Completable
you needdefer()
to make function calls lazy like in the mapping functions forSingle
orObservable
(or preferably make it so that nothing happens until you hit subscribe - this is a good convention to follow and is used in the official Rx libraries as well as any Rx extensions I've encountered, for advanced users this refers to cold completables only but most people can ignore that). - the only difference between
concat(a, b)
anda.andThen(b)
is syntax
Some examples:
-
foo(a).andThen(bar(b))
will:- call
foo(a)
- immediately call
bar(b)
even if the completable returned by step1
returns an error - subscribe to whatever step
1
returns - Will then subscribe to the result of
bar(b)
only if the last step completed successfully
- call
-
foo(a).andThen(Completable.defer(() -> bar(b))
will:- call
foo(a)
- subscribe to the result of step
1
- only if the completable returned by by
foo(a)
succeeds then callsbar(b)
- call
I'm going to leave out the treatment of merge()
since it gets a bit more complicated, but long story short that's the one to call if you want "parallelism".
The above answers are sort of correct, but I found them misleading because they miss a subtlety about eager evaluation.
doThingA().andThen(doThingB())
will call doThingB()
immediately but only subscribe to the observable returned by doThingB()
when the observable returned by doThingA()
completes.
doThingA().andThen(Completable.defer(() -> doThingB())
will call doThingB()
only after thing A has completed.
This is important only if doThingB()
has side effects before a subscribe event. E.g. Single.just(sideEffect(1)).toCompletable()
An implementation that doesn't have side effects before the subscribe event (a true cold observable) might be Single.just(1).doOnSuccess(i -> sideEffect(i)).toCompletable()
.
In the case that's just bitten me thing A is some validation logic and doThingB()
kicks off an asynchronous database update immediately that completes a VertX ObservableFuture. This is bad. Arguably doThingB()
should be written to only update the database upon subscribe, and I'm going to try and design things that way in the future.
Try
Completable.concat
Returns a Completable which completes only when all sources complete, one after another.
http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#concat(java.lang.Iterable)
Observe that the answer that has more votes here is a little bit misleading. See the example below, my idea is to show some testing scenarios and show how the completable logic with the operator andThen
behaves.
private fun doThingAFail(): Completable {
print("Do thingA Called\n")
return Completable.fromCallable {
print("calling stream A\n")
throw(Exception("The excep"))
}
}
private fun doThingB(): Completable {
print("Do thingB Called\n")
return Completable.fromCallable {
print("calling stream B\n")
}
}
private fun doThingA(): Completable {
print("Do thingA Called\n")
return Completable.fromCallable {
print("calling stream A\n")
}
}
Observe that for the test below:
@Test
fun testCallAPlusB() {
doThingA().andThen(doThingB())
}
the output would be:
Do thingA Called
Do thingB Called
Quick note here: Observe that we are not subscribing to these Completables in this snippet.
For the test:
@Test
fun theTestSubscribe() {
doThingA().andThen(doThingB()).subscribe()
}
The output would be:
Do thingA Called
Do thingB Called
calling stream A
calling stream B
And lastly, in case first completable fails, the second completable would not be executed.
@Test
fun theTestFailThingA() {
doThingAFail().andThen(doThingB()).subscribe()
}
the output would be:
Do thingA Called
Do thingB Called
calling stream A
The key concept here is that the logic inside the method and inside the observable are executed not at the same time.
"Do thingA Called" and "Do thingB Called" lines will be printed once we call doThingA()
and doThingB()
methods. While the "calling stream A" and
"calling stream B" lines will only be called when someone subscribes to doThingA
and doThingB
methods.
The second concept here is how the andThen operator will handle errors. In the example above, in case doThingA()
completable ends up with an error, the stream will end and not print the "calling stream B" line.