Parallel operations on Kotlin collections?
Solution 1:
The Kotlin standard library has no support for parallel operations. However, since Kotlin uses the standard Java collection classes, you can use the Java 8 stream API to perform parallel operations on Kotlin collections as well.
e.g.
myCollection.parallelStream()
.map { ... }
.filter { ... }
Solution 2:
As of Kotlin 1.1, parallel operations can also be expressed quite elegantly in terms of coroutines. Here is pmap
on lists:
fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
map { async(CommonPool) { f(it) } }.map { it.await() }
}
Note that coroutines are still an experimental feature.
Solution 3:
There is no official support in Kotlin's stdlib yet, but you could define an extension function to mimic par.map
:
fun <T, R> Iterable<T>.pmap(
numThreads: Int = Runtime.getRuntime().availableProcessors() - 2,
exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
transform: (T) -> R): List<R> {
// default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
val defaultSize = if (this is Collection<*>) this.size else 10
val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))
for (item in this) {
exec.submit { destination.add(transform(item)) }
}
exec.shutdown()
exec.awaitTermination(1, TimeUnit.DAYS)
return ArrayList<R>(destination)
}
(github source)
Here's a simple usage example
val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }
If needed it allows to tweak threading by providing the number of threads or even a specific java.util.concurrent.Executor
. E.g.
listOf("foo", "bar").pmap(4, transform = { it + "!" })
Please note, that this approach just allows to parallelize the map
operation and does not affect any downstream bits. E.g. the filter
in the first example would run single-threaded. However, in many cases just the data transformation (ie. map
) requires parallelization. Furthermore, it would be straightforward to extend the approach from above to other elements of Kotlin collection API.
Solution 4:
You can use this extension method:
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
See Parallel Map in Kotlin for more info