RxJava delay for each item of list emitted

I'm struggling to implement something I assumed would be fairly simple in Rx.

I have a list of items, and I want to have each item emitted with a delay.

It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.

Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

The result is:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

But what I would expect to see is something like this:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

What am I doing wrong?


Solution 1:

The simplest way to do this seems to be just using concatMap and wrapping each item in a delayed Obserable.

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

Prints:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

Solution 2:

One way to do it is to use zip to combine your observable with an Interval observable to delay the output.

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();