Can a Collector's combiner function ever be used on sequential streams?
Sample program:
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c
= Collector.of(ArrayList::new, List::add, nope());
IntStream.range(0, 10_000_000).boxed().collect(c);
}
}
So, to simplify matters here, there is no final transformation, so the resulting code is quite simple.
Now, IntStream.range()
produces a sequential stream. I simply box the results into Integer
s and then my contrived Collector
collects them into a List<Integer>
. Pretty simple.
And no matter how many times I run this sample program, the UnsupportedOperationException
never hits, which means my dummy combiner is never called.
I kind of expected this, but then I have already misunderstood streams enough that I have to ask the question...
Can a Collector
's combiner ever be called when the stream is guaranteed to be sequential?
A careful reading of the streams implementation code in ReduceOps.java reveals that the combine function is called only when a ReduceTask
completes, and ReduceTask
instances are used only when evaluating a pipeline in parallel. Thus, in the current implementation, the combiner is never called when evaluating a sequential pipeline.
There is nothing in the specification that guarantees this, however. A Collector
is an interface that makes requirements on its implementations, and there are no exemptions granted for sequential streams. Personally, I find it difficult to imagine why sequential pipeline evaluation might need to call the combiner, but someone with more imagination than me might find a clever use for it, and implement it. The specification allows for it, and even though today's implementation doesn't do it, you still have to think about it.
This should not surprising. The design center of the streams API is to support parallel execution on an equal footing with sequential execution. Of course, it is possible for a program to observe whether it is being executed sequentially or in parallel. But the design of the API is to support a style of programming that allows either.
If you're writing a collector and you find that it's impossible (or inconvenient, or difficult) to write an associative combiner function, leading you to want to restrict your stream to sequential execution, maybe this means you're heading in the wrong direction. It's time to step back a bit and think about approaching the problem a different way.
A common reduction-style operation that doesn't require an associative combiner function is called fold-left. The main characteristic is that the fold function is applied strictly left-to-right, proceeding one at a time. I'm not aware of a way to parallelize fold-left.
When people try to contort collectors the way we've been talking about, they're usually looking for something like fold-left. The Streams API doesn't have direct API support for this operation, but it's pretty easy to write. For example, suppose you want to reduce a list of strings using this operation: repeat the first string and then append the second. It's pretty easy to demonstrate that this operation isn't associative:
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
System.out.println(list.stream()
.collect(StringBuilder::new,
(a, b) -> a.append(a.toString()).append(b),
(a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE
Run sequentially, this produces the desired output:
aabaabcaabaabcdaabaabcaabaabcde
But when run in parallel, it might produce something like this:
aabaabccdde
Since it "works" sequentially, we could enforce this by calling sequential()
and back this up by having the combiner throw an exception. In addition, the supplier must be called exactly once. There's no way to combine the intermediate results, so if the supplier is called twice, we're already in trouble. But since we "know" the supplier is called only once in sequential mode, most people don't worry about this. In fact, I've seen people write "suppliers" that return some existing object instead of creating a new one, in violation of the supplier contract.
In this use of the 3-arg form of collect()
, we have two out of the three functions breaking their contracts. Shouldn't this be telling us to do things a different way?
The main work here is being done by the accumulator function. To accomplish a fold-style reduction, we can apply this function in a strict left-to-right order using forEachOrdered()
. We have to do a bit of setup and finishing code before and after, but that's no problem:
StringBuilder a = new StringBuilder();
list.parallelStream()
.forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());
Naturally, this works fine in parallel, though the performance benefits of running in parallel may be somewhat negated by the ordering requirements of forEachOrdered()
.
In summary, if you find yourself wanting to do a mutable reduction but you're lacking an associative combiner function, leading you to restrict your stream to sequential execution, recast the problem as a fold-left operation and use forEachRemaining()
on your accumulator function.
As observed in previous comments from @MarkoTopolnik and @Duncan there is no guarantee that Collector.combiner()
on sequential mode is called to produce a reduced result. In fact, the Java doc is a little bit subjective in this point, which can to lead an not appropriate interpretation.
(...) A parallel implementation would partition the input, create a result container for each partition, accumulate the contents of each partition into a subresult for that partition, and then use the combiner function to merge the subresults into a combined result.
According to NoBlogDefFound combinator is used only in parallel mode. See the partial quotation below:
combiner() is used to join two accumulators together into one. It is used when collector is executed in parallel, splitting input Stream and collecting parts independently first.
To show more clear this issue I re-write the first code and I put two approaches (serial and parallel).
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c =
Collector
.of(ArrayList::new, List::add, nope());
// approach sequential
Stream<Integer> sequential = IntStream
.range(0, 10_000_000)
.boxed();
System.out.println("isParallel:" + sequential.isParallel());
sequential
.collect(c);
// approach parallel
Stream<Integer> parallel = IntStream
.range(0, 10_000_000)
.parallel()
.boxed();
System.out.println("isParallel:" + parallel.isParallel());
parallel
.collect(c);
}
}
After running this code we can get the output:
isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
at com.stackoverflow.lambda.CollectorTest.lambda$nope$0(CollectorTest.java:18)
at com.stackoverflow.lambda.CollectorTest$$Lambda$3/2001049719.apply(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:174)
at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:160)
So, according this result we can infer that Collector's combiner
can be called only by the parallel execution.