Correct way of throwing exceptions with Reactor

There are a couple of ways that could be considered as a convenient way of exception throwing:

Handle your element using Flux/Mono.handle

One of the way that could simplify handling of an element which may result in an error or empty stream is operator handle.

The following code shows how we can use it in order to solve our problem:

Mono.just(userId)
    .map(repo::findById)
    .handle((user, sink) -> {
        if(!isValid(user)){
            sink.error(new InvalidUserException());
        } else if (isSendable(user))
            sink.next(user);
        }
        else {
            //just ignore element
        }
    })

as we can see, the .handle operator requires to pass BiConsumer<T, SynchronousSink<> in order to handle an element. Here we have two parameters in our BiConsumer. The first one is an element from the upstream where the second one is SynchronousSink which helps us to supply element to downstream synchronously. Such a technique expands an ability to supply different results of our element's processing. For example, in case the element is invalid, we can supply error to the same SycnchronousSync which will cancel upstream and produce onError signal to downstream. In turn, we can "filter" using the same handle operator. Once the handle BiConsumer is executed and no element has been supplied, Reactor will consider that as kind of filtering and will request for an additional element for us. Finally, in case the element is valid, we can simply call SynchronousSink#next and propagate our element downstream or apply some mapping on it, so we will have handle as the map operator here. Moreover, we can safely use that operator with no-performance impact and provide complex element verification such as validation of element or error sending to downstream.

Throws using #concatMap + Mono.error

One of the options to throw an exception during mapping is to replace map with concatMap. In its essence, concatMap does almost the same flatMap does. The only difference is that concatMap allows only one substream at a time. Such behavior simplifies internal implementation a lot and does not impact performance. So we can use the following code in order to throw an exception in a more functional way:

Mono.just(userId)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Mono.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

In the sample above in case of an invalid user, we return exception using Mono.error. The same we can do for flux using Flux.error:

Flux.just(userId1, userId2, userId3)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Flux.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

Note, in both cases we return cold stream which has only one element. In Reactor, there is a couple of optimizations that improve performance in the case returned stream is a cold scalar stream. Thus, it is recommended to use Flux/Mono concatMap + .just, empty, error as a result when we need more complex mapping, that could end up with return null or throw new ....

Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a null value for you since this violates Reactive Streams spec (see Rule 2.13) Thus, in case if repo.findById returns null, Reactor will throw NullPointerException for you.

Wait, Why concatMap is better than flatMap?

In its essence, flatMap is designed to merge elements from the multiple substreams that is executing at a time. It means that flatMap should have asynchronous streams underneath so, they could potentially process data on the multiple threads or that could be a several network calls. Subsequently, such expectations impact implementation a lot so flatMap should be able to handle data from the multiple streams (Threads) (means usage of concurrent data structures), enqueue elements if there is a draining from another stream (means additional memory allocation for Queues for each substream) and do not violate Reactive Streams specification rules (means really complex implementation). Counting all these facts and the fact that we replace a plain map operation (which is synchronous) onto the more convenient way of throwing an exception using Flux/Mono.error (which does not change synchronicity of execution) leads to the fact that we do not need such a complex operator and we can use much simpler concatMap which is designed for asynchronous handling of a single stream at a time and has a couple of optimization in order to handle scalar, cold stream.

Throws exception using switchIfEmpty

So, another approach to throw an exception when the result is empty is switchIfEmpty operator. The following code demonstrates how we can use that approach :

Mono.just(userId)
    .flatMap(repo::findById)
    .switchIfEmpty(Mono.error(new UserNotFoundExeception()))

As we can see, in this case repo::findById should have Mono of User as the return type. Therefore, in case a User instance will not be found, the result stream will be empty. Thus, Reactor will call an alternative Mono, specified as switchIfEmpty parameter.

Throw your exception as is (e.g. in your map, filter and other similar operators)

It could be counted as a less readable code or bad practice (my own opinion), but you can throw your exception as is (e.g. .map(v -> throw ...)) with Project Reactor. Even though, in someway doing so can violate Reactive Streams specification (in this context violate from the semantic perspective, because your operator under the hood is a Subscriber in a chain of Subscribers, therefore - semantically, throwing an exception in lambda could be mapped to throwing an exception in the onNext method which violates the spec's rule 2.13). However, since Reactor will catch the thrown exception for you and propagate it then as the onError signal to your downstream, it is not prohibited to do that.

Takeaways

  1. Use .handle operator in order to provide complex element processing
  2. Use concatMap+ Mono.error when we need to throw an exception during mapping but such a technique is most suitable for cases of asynchronous element processing.
  3. Use flatMap + Mono.error when we have already had flatMap in place
  4. Null as a return type is forbidden so instead of null in your downstream map you will get unexpected onError with NullPointerException
  5. Use switchIfEmpty in all cases when you need to send an error signal if the result of calling some specific function finished with the empty stream