Can I duplicate a Stream in Java 8?
Sometimes I want to perform a set of operations on a stream, and then process the resulting stream two different ways with other operations.
Can I do this without having to specify the common initial operations twice?
For example, I am hoping a dup()
method such as the following exists:
Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10
It is not possible to duplicate a stream in this way. However, you can avoid the code duplication by moving the common part into a method or lambda expression.
Supplier<IntStream> supplier = () ->
IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);
It is not possible in general.
If you want to duplicate an input stream, or input iterator, you have two options:
A. Keep everything in a collection, say a List<>
Suppose you duplicate a stream into two streams s1
and s2
. If you have advanced n1
elements in s1
and n2
elements with s2
, you must keep |n2 - n1|
elements in memory, just to keep pace. If your stream is infinite, there may be no upper bound for the storage required.
Take a look at Python's tee()
to see what it takes:
This itertool may require significant auxiliary storage (depending on how much temporary data needs to be stored). In general, if one iterator uses most or all of the data before another iterator starts, it is faster to use
list()
instead oftee()
.
B. When possible: Copy the state of the generator that creates the elements
For this option to work, you'll probably need access to the inner workings of the stream. In other words, the generator - the part that creates the elements - should support copying in the first place. [OP: See this great answer, as an example of how this can be done for the example in the question]
It will not work on input from the user, since you'll have to copy the state of the entire "outside world". Java's Stream
do not support copying, since it is designed to be as general as possible; for example, to work with files, network, keyboard, sensors, randomness etc. [OP: Another example is a stream that reads a temperature sensor on demand. It cannot be duplicated without storing a copy of the readings]
This is not only the case in Java; this is a general rule. You can see that std::istream
in C++ only supports move semantics, not copy semantics ("copy constructor (deleted)"), for this reason (and others).
It's possible if you're buffering elements that you've consumed in one duplicate, but not in the other yet.
We've implemented a duplicate()
method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:
Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();
(note: we currently need to box the stream, as we haven't implemented an IntSeq
yet)
Internally, there is a LinkedList
buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate.
Here's how the algorithm works:
static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
final LinkedList<T> gap = new LinkedList<>();
final Iterator<T> it = stream.iterator();
@SuppressWarnings("unchecked")
final Iterator<T>[] ahead = new Iterator[] { null };
class Duplicate implements Iterator<T> {
@Override
public boolean hasNext() {
if (ahead[0] == null || ahead[0] == this)
return it.hasNext();
return !gap.isEmpty();
}
@Override
public T next() {
if (ahead[0] == null)
ahead[0] = this;
if (ahead[0] == this) {
T value = it.next();
gap.offer(value);
return value;
}
return gap.poll();
}
}
return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
More source code here
In fact, using jOOλ, you'll be able to write a complete one-liner like so:
Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
.map1(s -> s.filter(n -> n % 7 == 0))
.map2(s -> s.filter(n -> n % 5 == 0));
// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)
// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);
You can also move the stream generation into separate method/function that returns this stream and call it twice.
Either,
- Move the initialisation into a method, and simply call the method again
This has the advantage of being explicit about what you are doing, and also works for infinite streams.
- Collect the stream and then re-stream it
In your example:
final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();
Then
final IntStream s = IntStream.of(arr);