Parallel streams, collectors and thread safety

See the simple example below that counts the number of occurences of each word in a list:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

At the end, wordsCount is {a=2, b=1, c=1}.

But my stream is very large and I want to parallelise the job, so I write:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

However I have noticed that wordsCount is a simple HashMap so I wonder if I need to explicitly ask for a concurrent map to ensure thread safety:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?


Solution 1:

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

It is safe to use a non-concurrent collector in a collect operation of a parallel stream.

In the specification of the Collector interface, in the section with half a dozen bullet points, is this:

For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

This means that the various implementations provided by the Collectors class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.

I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList, which is not thread-safe.

The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.

Solution 2:

All collectors, if they follow the rules in the specification, are safe to run in parallel or sequential. Parallel-readiness is a key part of the design here.

The distinction between concurrent and non-concurrent collectors have to do with the approach to parallelization.

An ordinary (non-concurrent) collector operates by merging sub-results. So the source is partitioned into a bunch of chunks, each chunk is collected into a result container (like a list or a map), and then the sub-results are merged into a bigger result container. This is safe and order-preserving, but for some kinds of containers -- especially maps -- can be expensive, since merging two maps by key is often expensive.

A concurrent collector instead creates one result container, whose insertion operations are guaranteed to be thread-safe, and blasts elements into it from multiple threads. With a highly concurrent result container like ConcurrentHashMap, this approach may well perform better than merging ordinary HashMaps.

So, the concurrent collectors are strictly optimizations over their ordinary counterparts. And they don't come without a cost; because elements are being blasted in from many threads, concurrent collectors generally cannot preserve encounter order. (But, often you don't care -- when creating a word count histogram, you don't care which instance of "foo" you counted first.)

Solution 3:

It is safe to use non-concurrent collections and non-atomic counters with parallel streams.

If you take a look at the documentation of Stream::collect, you find the following paragraph:

Like reduce(Object, BinaryOperator), collect operations can be parallelized without requiring additional synchronization.

And for the method Stream::reduce:

While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization and with greatly reduced risk of data races.

This might be a bit surprising. However, note that parallel streams are based on a fork-join model. That means the concurrent execution works as follows:

  • split sequence into two parts with about the same size
  • process each part individually
  • collect the results of both parts and combine them into one result

In the second step, the three steps are recursively applied to the sub-sequences.

An example should make that clear. The

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

The only purpose of the class Trace is log the constructor and method calls. If you execute this statement, it prints the following lines:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

You can see, that four Trace objects have been created, accumulate has been called once on each object, and combine has been used three times to combine the four objects into one. Each object can only be accesses by one thread at a time. That makes the code thread-safe, and the same applies to the method Collectors::toMap.