Is there an elegant way to process a stream in chunks?
My exact scenario is inserting data to database in batches, so I want to accumulate DOM objects then every 1000, flush them.
I implemented it by putting code in the accumulator to detect fullness then flush, but that seems wrong - the flush control should come from the caller.
I could convert the stream to a List then use subList in an iterative fashion, but that too seems clunky.
It there a neat way to take action every n elements then continue with the stream while only processing the stream once?
Solution 1:
Elegance is in the eye of the beholder. If you don't mind using a stateful function in groupingBy
, you can do this:
AtomicInteger counter = new AtomicInteger();
stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
.values()
.forEach(database::flushChunk);
This doesn't win any performance or memory usage points over your original solution because it will still materialize the entire stream before doing anything.
If you want to avoid materializing the list, stream API will not help you. You will have to get the stream's iterator or spliterator and do something like this:
Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;
while(true) {
List<Integer> chunk = new ArrayList<>(size);
for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
if (chunk.isEmpty()) break;
database.flushChunk(chunk);
}
Solution 2:
If you have guava dependency on your project you could do this:
StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);
See https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-
Solution 3:
You can create a stream of chunks (List<T>
) of a stream of items and a given chunk size by
- grouping the items by the chunk index (element index / chunk size)
- ordering the chunks by their index
- reducing the map to their ordered elements only
Code:
public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
AtomicInteger index = new AtomicInteger(0);
return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
.entrySet().stream()
.sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}
Example usage:
Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));
Output:
Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]