map vs flatMap in reactor
I've found a lot of answers regarding RxJava, but I want to understand how it works in Reactor.
My current understanding is very vague, i tend to think of map as being synchronous and flatMap to be asynchronous but I can't really get my had around it.
Here is an example:
files.flatMap { it ->
Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
.map {destFile ->
destFile.createNewFile()
destFile
}
.flatMap(it::transferTo)
}.then()
I have files (a Flux<FilePart>
) and i want to copy it to some UPLOAD_ROOT
on the server.
This example is taken from a book.
I can change all the .map
to .flatMap
and vice versa and everything still works. I wonder what the difference is.
Solution 1:
-
map
is for synchronous, non-blocking, 1-to-1 transformations -
flatMap
is for asynchronous (non-blocking) 1-to-N transformations
The difference is visible in the method signature:
-
map
takes aFunction<T, U>
and returns aFlux<U>
-
flatMap
takes aFunction<T, Publisher<V>>
and returns aFlux<V>
That's the major hint: you can pass a Function<T, Publisher<V>>
to a map
, but it wouldn't know what to do with the Publishers
, and that would result in a Flux<Publisher<V>>
, a sequence of inert publishers.
On the other hand, flatMap
expects a Publisher<V>
for each T
. It knows what to do with it: subscribe to it and propagate its elements in the output sequence. As a result, the return type is Flux<V>
: flatMap
will flatten each inner Publisher<V>
into the output sequence of all the V
s.
About the 1-N aspect:
for each <T>
input element, flatMap
maps it to a Publisher<V>
. In some cases (eg. an HTTP request), that publisher will emit only one item, in which case we're pretty close to an async map
.
But that's the degenerate case. The generic case is that a Publisher
can emit multiple elements, and flatMap
works just as well.
For an example, imagine you have a reactive database and you flatMap from a sequence of user IDs, with a request that returns a user's set of Badge
. You end up with a single Flux<Badge>
of all the badges of all these users.
Is map
really synchronous and non-blocking?
Yes: it is synchronous in the way the operator applies it (a simple method call, and then the operator emits the result) and non-blocking in the sense that the function itself shouldn't block the operator calling it. In other terms it shouldn't introduce latency. That's because a Flux
is still asynchronous as a whole. If it blocks mid-sequence, it will impact the rest of the Flux
processing, or even other Flux
.
If your map function is blocking/introduces latency but cannot be converted to return a Publisher
, consider publishOn
/subscribeOn
to offset that blocking work on a separate thread.
Solution 2:
The flatMap method is similar to the map method with the key difference that the supplier you provide to it should return a Mono<T>
or Flux<T>
.
Using the map method would result in a Mono<Mono<T>>
whereas using flatMap results in a Mono<T>
.
For example, it is useful when you have to make a network call to retrieve data, with a java API that returns a Mono, and then another network call that needs the result of the first one.
// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);
// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end
// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono
// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected
Also, it allows for handling errors precisely:
public UserApi {
private HttpClient httpClient;
Mono<User> findUser(String username) {
String queryUrl = "http://my-api-address/users/" + username;
return Mono.fromCallable(() -> httpClient.get(queryUrl)).
flatMap(response -> {
if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
return Mono.just(response.data);
});
}
}