Parallel operations on Kotlin collections?

Solution 1:

The Kotlin standard library has no support for parallel operations. However, since Kotlin uses the standard Java collection classes, you can use the Java 8 stream API to perform parallel operations on Kotlin collections as well.

e.g.

myCollection.parallelStream()
        .map { ... }
        .filter { ... }

Solution 2:

As of Kotlin 1.1, parallel operations can also be expressed quite elegantly in terms of coroutines. Here is pmap on lists:

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(CommonPool) { f(it) } }.map { it.await() }
}

Note that coroutines are still an experimental feature.

Solution 3:

There is no official support in Kotlin's stdlib yet, but you could define an extension function to mimic par.map:

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}

(github source)

Here's a simple usage example

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }

If needed it allows to tweak threading by providing the number of threads or even a specific java.util.concurrent.Executor. E.g.

listOf("foo", "bar").pmap(4, transform = { it + "!" })

Please note, that this approach just allows to parallelize the map operation and does not affect any downstream bits. E.g. the filter in the first example would run single-threaded. However, in many cases just the data transformation (ie. map) requires parallelization. Furthermore, it would be straightforward to extend the approach from above to other elements of Kotlin collection API.

Solution 4:

You can use this extension method:

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}

See Parallel Map in Kotlin for more info