How do Kotlin coroutines work internally?
How does Kotlin implement coroutines internally?
Coroutines are said to be a "lighter version" of threads, and I understand that they use threads internally to execute coroutines.
What happens when I start a coroutine using any of the builder functions?
This is my understanding of running this code:
GlobalScope.launch { <---- (A)
val y = loadData() <---- (B) // suspend fun loadData()
println(y) <---- (C)
delay(1000) <---- (D)
println("completed") <---- (E)
}
- Kotlin has a pre-defined
ThreadPool
at the beginning. - At
(A)
, Kotlin starts executing the coroutine in the next available free thread (SayThread01
). - At
(B)
, Kotlin stops executing the current thread, and starts the suspending functionloadData()
in the next available free thread (Thread02
). - When
(B)
returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03
). -
(C)
executes onThread03
. - At
(D)
, theThread03
is stopped. - After 1000ms,
(E)
is executed on the next free thread, sayThread01
.
Am I understanding this correctly? Or are coroutines implemented in a different way?
Update on 2021: Here's an excellent article by Manuel Vivo that complements all the answers below.
Coroutines are a completely separate thing from any scheduling policy that you describe. A coroutine is basically a call chain of suspend fun
s. Suspension is totally under your control: you just have to call suspendCoroutine
. You'll get a callback object so you can call its resume
method and get back to where you suspended.
Here's some code where you can see that suspension is a very direct and trasparent mechanism, fully under your control:
import kotlin.coroutines.*
import kotlinx.coroutines.*
var continuation: Continuation<String>? = null
fun main(args: Array<String>) {
val job = GlobalScope.launch(Dispatchers.Unconfined) {
while (true) {
println(suspendHere())
}
}
continuation!!.resume("Resumed first time")
continuation!!.resume("Resumed second time")
}
suspend fun suspendHere() = suspendCancellableCoroutine<String> {
continuation = it
}
All the code above executes on the same, main thread. There is no multithreading at all going on.
The coroutine you launch
suspends itself each time it calls suspendHere()
. It writes the continuation callback to the continuation
property, and then you explicitly use that continuation to resume the coroutine.
The code uses the Unconfined
coroutine dispatcher which does no dispatching to threads at all, it just runs the coroutine code right there where you invoke continuation.resume()
.
With that in mind, let's revisit your diagram:
GlobalScope.launch { <---- (A)
val y = loadData() <---- (B) // suspend fun loadData()
println(y) <---- (C)
delay(1000) <---- (D)
println("completed") <---- (E)
}
- Kotlin has a pre-defined
ThreadPool
at the beginning.
It may or may not have a thread pool. A UI dispatcher works with a single thread.
The prerequisite for a thread to be the target of a coroutine dispatcher is that there is a concurrent queue associated with it and the thread runs a top-level loop that takes Runnable
objects from this queue and executes them. A coroutine dispatcher simply puts the continuation on that queue.
- At
(A)
, Kotlin starts executing the coroutine in the next available free thread (SayThread01
).
It can also be the same thread where you called launch
.
- At
(B)
, Kotlin stops executing the current thread, and starts the suspending functionloadData()
in the next available free thread (Thread02
).
Kotlin has no need to stop any threads in order to suspend a coroutine. In fact, the main point of coroutines is that threads don't get started or stopped. The thread's top-level loop will go on and pick another runnable to run.
Furthermore, the mere fact that you're calling a suspend fun
has no significance. The coroutine will only suspend itself when it explicitly calls suspendCoroutine
. The function may also simply return without suspension.
But let's assume it did call suspendCoroutine
. In that case the coroutine is no longer running on any thread. It is suspended and can't continue until some code, somewhere, calls continuation.resume()
. That code could be running on any thread, any time in the future.
- When
(B)
returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03
).
B
doesn't "return after execution", the coroutine resumes while still inside its body. It may suspend and resume any number of times before returning.
(C)
executes onThread03
.- At
(D)
, theThread03
is stopped.- After 1000ms,
(E)
is executed on the next free thread, sayThread01
.
Again, no threads are being stopped. The coroutine gets suspended and a mechanism, usually specific to the dispatcher, is used to schedule its resumption after 1000 ms. At that point it will be added to the run queue associated with the dispatcher.
For specificity, let's see some examples of what kind of code it takes to dispatch a coroutine.
Swing UI dispatcher:
EventQueue.invokeLater { continuation.resume(value) }
Android UI dispatcher:
mainHandler.post { continuation.resume(value) }
ExecutorService dispatcher:
executor.submit { continuation.resume(value) }
Coroutines work by creating a switch over possible resume points:
class MyClass$Coroutine extends CoroutineImpl {
public Object doResume(Object o, Throwable t) {
switch(super.state) {
default:
throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine");
case 0: {
// code before first suspension
state = 1; // or something else depending on your branching
break;
}
case 1: {
...
}
}
return null;
}
}
The resulting code executing this coroutine is then creating that instance and calls the doResume()
function everytime it needs to resume execution, how that is handled depends on the scheduler used for execution.
Here is an example compilation for a simple coroutine:
launch {
println("Before")
delay(1000)
println("After")
}
Which compiles to this bytecode
private kotlinx.coroutines.experimental.CoroutineScope p$;
public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable);
Code:
0: invokestatic #18 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
3: astore 5
5: aload_0
6: getfield #22 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
9: tableswitch { // 0 to 1
0: 32
1: 77
default: 102
}
32: aload_2
33: dup
34: ifnull 38
37: athrow
38: pop
39: aload_0
40: getfield #24 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope;
43: astore_3
44: ldc #26 // String Before
46: astore 4
48: getstatic #32 // Field java/lang/System.out:Ljava/io/PrintStream;
51: aload 4
53: invokevirtual #38 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
56: sipush 1000
59: aload_0
60: aload_0
61: iconst_1
62: putfield #22 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
65: invokestatic #44 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
68: dup
69: aload 5
71: if_acmpne 85
74: aload 5
76: areturn
77: aload_2
78: dup
79: ifnull 83
82: athrow
83: pop
84: aload_1
85: pop
86: ldc #46 // String After
88: astore 4
90: getstatic #32 // Field java/lang/System.out:Ljava/io/PrintStream;
93: aload 4
95: invokevirtual #38 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
98: getstatic #52 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
101: areturn
102: new #54 // class java/lang/IllegalStateException
105: dup
106: ldc #56 // String call to \'resume\' before \'invoke\' with coroutine
108: invokespecial #60 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
111: athrow
I compiled this with kotlinc 1.2.41
From 32 to 76 is the code for printing Before
and calling delay(1000)
which suspends.
From 77 to 101 is the code for printing After
.
From 102 to 111 is error handling for illegal resume states, as denoted by the default
label in the switch table.
So as a summary, the coroutines in kotlin are simply state-machines that are controlled by some scheduler.