In RxJava, how to pass a variable along when chaining observables?
I am chaining async operations using RxJava, and I'd like to pass some variable downstream:
Observable
.from(modifications)
.flatmap( (data1) -> { return op1(data1); })
...
.flatmap( (data2) -> {
// How to access data1 here ?
return op2(data2);
})
It seems like a common pattern but I couldn't find information about it.
Solution 1:
The advice I got from the Couchbase forum is to use nested observables:
Observable
.from(modifications)
.flatmap( (data1) -> {
return op1(data1)
...
.flatmap( (data2) -> {
// I can access data1 here
return op2(data2);
})
});
EDIT: I'll mark this as the accepted answer as it seems to be the most recommended. If your processing is too complex to nest everything you can also check the solution with function calls.
Solution 2:
Another possibility is to map the result of op1
to a org.apache.commons.lang3.tuple.Pair
that contains the variable and pass that along:
Observable
.from(modifications)
.flatmap( (data1) -> {
return op1(data1).map( obj -> { return Pair.of(data1,obj); });
})
...
.flatmap( (dataPair) -> {
// data1 is dataPair.getLeft()
return op2(dataPair.getRight());
})
It works but it feels a bit uncomfortable to have variables hidden inside a Pair/Triple/... and it gets very verbose if you use the Java 6 notation.
I wonder if there is a better solution, maybe some RxJava operator could help?
Solution 3:
flatmap can take a second arg:
Observable.just("foo")
.flatMap(foo -> Observable.range(1, 5), Pair::of)
.subscribe(pair -> System.out.println("Result: " + pair.getFirst() + " Foo: " + pair.getSecond()));
source: https://medium.com/rxjava-tidbits/rxjava-tidbits-1-use-flatmap-and-retain-original-source-value-4ec6a2de52d4
Solution 4:
One possibility would be to use a function call:
private static Observable<T> myFunc(final Object data1) {
return op1(data1)
...
.flatmap( (data2) -> {
// I can access data1 here
return op2(data2);
});
}
Observable
.from(modifications)
.flatmap( (data1) -> { return myFunc(data1); })
BUT: correct me if I'm wrong but it doesn't feel like the reactive-programming way of doing it
Solution 5:
Actually we have library, that simplify call chains.
https://github.com/pakoito/Komprehensions
Adding as Gradle dependency:
implementation 'io.reactivex.rxjava2:rxjava:2.2.1'
implementation 'com.github.pakoito.Komprehensions:komprehensions-rx2:1.3.2'
Usage (Kotlin):
val observable = doFlatMap(
{ Observable.from(modifications) },
{ data1 -> op1(data1) },
{ data1, data2 -> op2(data2) },
{ data1, data2, data3 -> op3(data1, data2, data3) }
)