Is this a bug in Files.lines(), or am I misunderstanding something about parallel streams?
Since the current state of the issue is quite the opposite of the earlier statements made here, it should be noted, that there is now an explicit statement by Brian Goetz about the back-propagation of the unordered characteristic past a skip
operation is considered a bug. It’s also stated that it is now considered to have no back-propagation of the ordered-ness of a terminal operation at all.
There is also a related bug report, JDK-8129120 whose status is “fixed in Java 9” and it’s backported to Java 8, update 60
I did some tests with jdk1.8.0_60
and it seems that the implementation now indeed exhibits the more intuitive behavior.
THIS ANSWER IS OUTDATED - READ THIS ONE INSTEAD!
To quickly answer the question: The observed behavior is intended! There is no bug and all is happening according to the documentation. But let it be said, that this behavior should be documented and communicated better. It should be made more obvious how forEach
ignores ordering.
I'll first cover the concepts which allow the observed behavior. This provides the background for dissecting one of the examples given in the question. I will do this on a high level and then again on a very low level.
[TL;DR: Read on its own, the high level explanation will give a rough answer.]
Concept
Instead of talking about Stream
s, which is the type operated on or returned by stream-related methods, let's talk about stream operations and stream pipelines. The method calls lines
, skip
and parallel
are stream operations which build a stream pipeline[1] and - as others have noted - that pipeline is processed as a whole when the terminal operation forEach
is called[2].
A pipeline could be thought of as a series of operations which, one after another, are executed on the whole stream (e.g. filter all elements, map remaining elements to numbers, sum all numbers). But this is misleading! A better metaphor is that the terminal operation pulls single elements through each operation[3] (e.g. get the next unfiltered element, map it, add it to sum, request next element). Some intermediate operations may need to traverse several (e.g. skip
) or maybe even all (e.g. sort
) elements before they can return the requested next element and this is one of the sources for state in an operation.
Each operation signals its characteristics with these StreamOpFlag
s:
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
They are combined across the stream source, the intermediate operations and the terminal operation and make up the characteristics of the pipeline (as a whole), which are then used for optimizations[4]. Similarly, whether a pipeline is executed in parallel or not is a property of the entire pipeline[5].
So whenever you are making assumptions regarding these characteristics, you have to look carefully at all operations building the pipeline, regardless of the order in which they are applied, and what guarantees they make. When doing so keep in mind how the terminal operation pulls each individual element through the pipeline.
Example
Let's look at this special case:
BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
.skip(1L)
.parallel()
.forEach(System.out::println);
High Level
Regardless of whether your stream source is ordered or not (it is), by calling forEach
(instead of forEachOrdered
) you declare that order doesn't matter to you[6], which effectively reduces skip
from "skip the first n elements" to "skip any n elements"[7] (because without order the former becomes meaningless).
So you give the pipeline the right to ignore order if that promises a speedup. For parallel execution it apparently thinks so, which is why you get the observed output. Hence what you observe is the intended behavior and no bug.
Note that this does not conflict with skip
being stateful! As described above, being stateful does not imply that it somehow caches the whole stream (minus the skipped elements) and everything that follows is executed on these elements. It just means that the operation has some state - namely the number of skipped elements (well, it's not actually that easy but with my limited understanding of what's going on, I'd say it's a fair simplification).
Low Level
Let's look at it in more detail:
-
BufferedReader.lines
creates theStream
, lets call it_lines
:- creates an ordered
Spliterator
- hands it to
StreamSupport.stream
, which creates aReferencePipeline.Head
and transforms the spliterator flag to a stream op flag
- creates an ordered
-
.skip
creates a newStream
, let's call it_skip
:- calls
ReferencePipeline.skip
- which constructs a "slice" operation (generalization of skip & limit) with
SliceOps.makeRef
- this creates an anonymous instance of
ReferencePipeline.StatefulOp
, which references_lines
as its source
- calls
-
.parallel
sets the parallel flag for the entire pipeline as described above -
.forEach
actually starts the execution
So let's see how the pipeline is executed:
- Calling
_skip.forEach
creates aForEachOp
(let's call it_forEach
) and hands it to_skip.evaluate
, which does two things:- calls
sourceSpliterator
to create a spliterator around the source for this pipeline stage:- calls
opEvaluateParallelLazy
on itself (as it turns out) - this determines that the stream is unordered and creates an
UnorderedSliceSpliterator
(let's call it_sliceSpliterator
) withskip = 1
and no limit.
- calls
- calls
_forEach.evaluateParallel
which creates aForEachTask
(because it is unordered; let's call it_forEachTask
) and invokes it
- calls
- In
_forEachTask.compute
the task splits off the first 1024 lines, creates a new task for it (let's call it_forEachTask2
), realizes there are no lines left and finishes. - Inside the fork join pool,
_forEachTask2.compute
gets called, vainly tries to split again and finally starts copying its elements into the sink (a stream-aware wrapper around theSystem.out.println
) by calling_skip.copyInto
. - This essentially delegates the task to the the specified spliterator. This is
_sliceSpliterator
which was created above! So_sliceSpliterator.forEachRemaining
is responsible for handing the non-skipped elements to the println-sink:- it gets a chunk (in this case all) of the lines into a buffer and counts them
- it tries to request as many permits (I assume due to parallelization) via
acquirePermits
- with two elements in the source and one to be skipped, there is only one permit which it acquires (in general let's say n)
- it lets the buffer put the first n elements (so in this case only the first) into the sink
So UnorderedSliceSpliterator.OfRef.forEachRemaining
is where the order is finally and truly ignored. I did not compare this to the ordered variant but this are my assumption why it is done this way:
- under parallelization shoveling the spliterator's elements into the buffer may interleave with other tasks doing the same
- this will make tracking their order extremely hard
- doing that or preventing interleaving degrades performance and is pointless if order is irrelevant
- if the order is lost, there is little else to do but to process the first n permitted elements
Any questions? ;) Sorry for going on for so long. Maybe I should leave out the details and make a blog post of it....
Sources
[1] java.util.stream
- Stream operations and pipelines:
Stream operations are divided into intermediate and terminal operations, and are combined to form stream pipelines.
[2] java.util.stream
- Stream operations and pipelines:
Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.
[3] This metaphor represents my understanding of streams. The main source, beside the code, is this quote from java.util.stream
- Stream operations and pipelines (highlighting mine):
Processing streams lazily allows for significant efficiencies; in a pipeline such as the filter-map-sum example above, filtering, mapping, and summing can be fused into a single pass on the data, with minimal intermediate state. Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source.
[4] java.util.stream.StreamOpFlag
:
At each stage of the pipeline, a combined stream and operation flags can be calculated [... jadda, jadda, jadda about how flags are combined across source, intermediate and terminal operations ...] to produce the flags output from the pipeline. Those flags can then be used to apply optimizations.
In code you can see this in AbstractPipeline.combinedFlags
, which is set during construction (and on a few other occurrences) by combining the flag of the previous and the new operation.
[5] java.util.stream
- Parallelism (to which I can not directly link - scroll down a little):
When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the orientation of the stream on which it is invoked.
In code you can see this is in AbstractPipeline.sequential
, parallel
, and isParallel
, which set/check a boolean flag on the stream source, making it irrelevant when the setters are called while constructing a stream.
[6] java.util.stream.Stream.forEach:
Performs an action for each element of this stream. [...] The behavior of this operation is explicitly nondeterministic.
Contrast this with java.util.stream.Stream.forEachOrdered:
Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.
[7] This is also not clearly documented but my interpretation of this comment on Stream.skip
(heavily shortened by me):
[...] skip() [...] can be quite expensive on ordered parallel pipelines [...] since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order. [...] [R]emoving the ordering constraint [...] may result in significant speedups of skip() in parallel pipelines
The problem is that you are using parallel stream together with forEach and you are expecting that skip action relies on the correct elements order which is not the case here. Excerpt from the forEach documentation:
For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism.
I guess basically what happens is that skip operation is firstly performed on the second line, not on the first. If you make stream sequential or use forEachOrdered you can see that then it produces the expected result. Another approach would be to use Collectors.
Let me quote something of relevance—the Javadoc of skip
:
While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order.
Now, it is quite certain that Files.lines()
has a well-defined encounter order and is an ORDERED
stream (if it were not, there would be no guarantee even in sequential operation that the encounter order matches file order), therefore it is guaranteed that the resulting stream will deterministically consist of just the second line in your example.
Whether or not there is something else to this, the guarantee is definitely there.
I have an idea how to work-around this problem, which I can't see in the previous discussions. You can recreate the stream splitting the pipeline into two pipelines while keeping the whole thing lazy.
public static <T> Stream<T> recreate(Stream<T> stream) {
return StreamSupport.stream(stream.spliterator(), stream.isParallel())
.onClose(stream::close);
}
public static void main(String[] args) {
recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines()
.skip(1).parallel()).forEach(System.out::println);
}
When you recreate the stream from initial stream spliterator, then you effectively create a new pipeline. In most of the cases recreate
will work as no-op
, but the thing is that first and second pipelines don't share the parallel
and unordered
states. So even if you are using the forEach
(or any other unordered terminal operation), only the second stream becomes unordered.
Internally pretty similar thing is concatenating your stream with an empty stream:
Stream.concat(Stream.empty(),
new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5"))
.lines().skip(1).parallel()).forEach(System.out::println);
Though it has a little more overhead.