Reader#lines() parallelizes badly due to nonconfigurable batch size policy in its spliterator
I cannot achieve good parallelization of stream processing when the stream source is a Reader
. Running the code below on a quad-core CPU I observe 3 cores being used at first, then a sudden drop to just two, then one core. Overall CPU utilization hovers around 50%.
Note the following characteristics of the example:
- there are just 6,000 lines;
- each line takes about 20 ms to process;
- the whole procedure takes about a minute.
That means that all the pressure is on the CPU and I/O is minimal. The example is a sitting duck for automatic parallelization.
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
... class imports elided ...
public class Main
{
static final AtomicLong totalTime = new AtomicLong();
public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
Files.lines(inputPath).parallel().map(Main::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(),
realTime = System.nanoTime()-start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
final long took = System.nanoTime()-localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 25; j++) w.print(text);
w.println();
}
}
return inputPath;
}
}
My typical output:
Cores: 4
CPU time: 110.23 s
Real time: 53.60 s
CPU utilization: 51.41%
For comparison, if I use a slightly modified variant where I first collect into a list and then process:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
.forEach(w::println);
I get this typical output:
Cores: 4
CPU time: 138.43 s
Real time: 35.00 s
CPU utilization: 98.87%
What could explain that effect, and how can I work around it to get full utilization?
Note that I have originally observed this on a reader of servlet input stream so it's not specific to a FileReader
.
Here is the answer, spelled out in the source code of Spliterators.IteratorSpliterator
, the one used by BufferedReader#lines()
:
@Override
public Spliterator<T> trySplit() {
/*
* Split into arrays of arithmetically increasing batch
* sizes. This will only improve parallel performance if
* per-element Consumer actions are more costly than
* transferring them into an array. The use of an
* arithmetic progression in split sizes provides overhead
* vs parallelism bounds that do not particularly favor or
* penalize cases of lightweight vs heavyweight element
* operations, across combinations of #elements vs #cores,
* whether or not either are known. We generate
* O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
* potential speedup.
*/
Iterator<? extends T> i;
long s;
if ((i = it) == null) {
i = it = collection.iterator();
s = est = (long) collection.size();
}
else
s = est;
if (s > 1 && i.hasNext()) {
int n = batch + BATCH_UNIT;
if (n > s)
n = (int) s;
if (n > MAX_BATCH)
n = MAX_BATCH;
Object[] a = new Object[n];
int j = 0;
do { a[j] = i.next(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
est -= j;
return new ArraySpliterator<>(a, 0, j, characteristics);
}
return null;
}
Also noteworthy are the constants:
static final int BATCH_UNIT = 1 << 10; // batch array size increment
static final int MAX_BATCH = 1 << 25; // max batch array size;
So in my example, where I use 6,000 elements, I get just three batches because the batch size step is 1024. That precisely explains my observation that initially three cores are used, dropping to two and then one as the smaller batches complete. In the meantime I tried a modified example with 60,000 elements and then I get almost 100% CPU utilization.
To solve my problem I have developed the code below which allows me to turn any existing stream into one whose Spliterator#trySplit
will partition it into batches of specified size. The simplest way to use it for the use case from my question is like this:
toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)
On a lower level, the class below is a spliterator wrapper which changes the wrapped spliterator's trySplit
behavior and leaves other aspects unchanged.
import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
final int c = toWrap.characteristics();
this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
this.spliterator = toWrap;
this.est = est;
this.batchSize = batchSize;
}
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
this(toWrap, toWrap.estimateSize(), batchSize);
}
public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!spliterator.tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
This problem is to some extent fixed in Java-9 early access builds. The Files.lines
was rewritten and now upon splitting it actually jumps into the middle of memory-mapped file. Here's the results on my machine (which has 4 HyperThreading cores = 8 hardware threads):
Java 8u60:
Start processing
Cores: 8
CPU time: 73,50 s
Real time: 36,54 s
CPU utilization: 25,15%
Java 9b82:
Start processing
Cores: 8
CPU time: 79,64 s
Real time: 10,48 s
CPU utilization: 94,95%
As you can see, both real time and CPU utilization is greatly improved.
This optimization has some limitations though. Currently it works only for several encodings (namely UTF-8, ISO_8859_1 and US_ASCII) as for arbitrary encoding you don't know exactly how line-break is encoded. It's limited to the files of no more than 2Gb size (due to limitations of MappedByteBuffer
in Java) and of course does not work for some non-regular files (like character devices, named pipes which cannot be memory-mapped). In such cases the old implementation is used as the fallback.
The parallel execution of streams is based on a fork-join model. For ordered streams, the parallel execution only works, if the stream can be split into parts, strictly following one another. In general, that's not possible with streams generated by BufferedReader. However, in theory parallel execution should be possible for unordered streams:
BufferedReader reader = ...;
reader.lines().unordered().map(...);
I am not sure if the stream returned by BufferedReader supports this kind of parallel execution. A very simple alternative is to create an intermediate list:
BufferedReader reader = ...;
reader.lines().collect(toList()).parallelStream().map(...);
In this case, the parallel execution starts after all lines have been read. This might be a problem, if reading the lines takes a long time. In this case, I recommend using an ExecutorService for parallel execution instead of parallel streams:
ExecutorService executor = ...;
BufferedReader reader = ...;
reader.lines()
.map(line -> executor.submit(() -> ... line ...))
.collect(toList())
.stream()
.map(future -> future.get())
.map(...);