RxJava delay for each item of list emitted
I'm struggling to implement something I assumed would be fairly simple in Rx.
I have a list of items, and I want to have each item emitted with a delay.
It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.
Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
The result is:
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
But what I would expect to see is something like this:
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
What am I doing wrong?
Solution 1:
The simplest way to do this seems to be just using concatMap
and wrapping each item in a delayed Obserable.
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
Prints:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
Solution 2:
One way to do it is to use zip
to combine your observable with an Interval
observable to delay the output.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();