Observable vs Flowable rxJava2

I have been looking at new rx java 2 and I'm not quite sure I understand the idea of backpressure anymore...

I'm aware that we have Observable that does not have backpressure support and Flowable that has it.

So based on example, lets say I have flowable with interval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

This is going to crash after around 128 values, and thats pretty obvious I am consuming slower than getting items.

But then we have the same with Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

This will not crash at all, even when I put some delay on consuming it still works. To make Flowable work lets say I put onBackpressureDrop operator, crash is gone but not all values are emitted either.

So the base question I can not find answer currently in my head is why should I care about backpressure when I can use plain Observable still receive all values without managing the buffer? Or maybe from the other side, what advantages do backpressure give me in favour of managing and handling the consuming?


Solution 1:

What backpressure manifests in practice is bounded buffers, Flowable.observeOn has a buffer of 128 elements that gets drained as fast as the dowstream can take it. You can increase this buffer size individually to handle bursty source and all the backpressure-management practices still apply from 1.x. Observable.observeOn has an unbounded buffer that keeps collecting the elements and your app may run out of memory.

You may use Observable for example:

  • handling GUI events
  • working with short sequences (less than 1000 elements total)

You may use Flowable for example:

  • cold and non-timed sources
  • generator like sources
  • network and database accessors

Solution 2:

Backpressure is when your observable (publisher) is creating more events than your subscriber can handle. So you can get subscribers missing events, or you can get a huge queue of events which just leads to out of memory eventually. Flowable takes backpressure into consideration. Observable does not. Thats it.

it reminds me of a funnel which when it has too much liquid overflows. Flowable can help with not making that happen:

with tremendous backpressure:

enter image description here

but with using flowable, there is much less backpressure :

enter image description here

Rxjava2 has a few backpressure strategies you can use depending on your usecase. by strategy i mean Rxjava2 supplies a way to handle the objects that cannot be processed because of the overflow (backpressure).

here are the strategies. I wont go through them all, but for example if you want to not worry about the items that are overflowed you can use a drop strategy like this:

observable.toFlowable(BackpressureStrategy.DROP)

As far as i know there should be a 128 item limit on the queue, after that there can be a overflow (backpressure). Even if its not 128 its close to that number. Hope this helps someone.

if you need to change the buffer size from 128 it looks like it can be done like this (but watch any memory constraints:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

in software developement usually back pressure strategy means your telling the emitter to slow down a bit as the consumer cannot handle the velocity your emitting events.

Solution 3:

The fact that your Flowable crashed after emitting 128 values without backpressure handling doesn't mean it will always crash after exactly 128 values: sometimes it will crash after 10, and sometimes it will not crash at all. I believe this is what happened when you tried the example with Observable - there happened to be no backpressure, so your code worked normally, next time it may not. The difference in RxJava 2 is that there is no concept of backpressure in Observables anymore, and no way to handle it. If you're designing a reactive sequence that will probably require explicit backpressure handling - then Flowable is your best choice.