Kotlin Coroutine/Flow Timeout without cancelling the running coroutine?
Solution 1:
One way to do this is with a simple select
clause:
import kotlinx.coroutines.selects.*
val someFlow = MutableStateFlow("someInitialValue")
val deferred = async {
someFlow.value = someNetworkCall()
}
// await the first of the 2 things, without cancelling anything
select<Unit> {
deferred.onAwait {}
onTimeout(SOME_NUMBER_MILLIS) {
someFlow.value = someDefaultValue
}
}
You would have to watch out for race conditions though, if this runs on a multi-threaded dispatcher. If the async finished just after the timeout, there is a chance the default value overwrites the network response.
One way to prevent that, if you know the network can't return the same value as the initial value (and if no other coroutine is changing the state) is with the atomic update method:
val deferred = async {
val networkCallValue = someNetworkCall()
someFlow.update { networkCallValue }
}
// await the first of the 2 things, without cancelling anything
val initialValue = someFlow.value
select<Unit> {
deferred.onAwait {}
onTimeout(300) {
someFlow.update { current ->
if (current == initialValue) {
"someDefaultValue"
} else {
current // don't overwrite the network result
}
}
}
}
If you can't rely on comparisons of the state, you can protect access to the flow with a Mutex
and a boolean:
val someFlow = MutableStateFlow("someInitialValue")
val mutex = Mutex()
var networkCallDone = false
val deferred = async {
val networkCallValue = someNetworkCall()
mutex.withLock {
someFlow.value = networkCallValue
networkCallDone = true
}
}
// await the first of the 2 things, without cancelling anything
select<Unit> {
deferred.onAwait {}
onTimeout(300) {
mutex.withLock {
if (!networkCallDone) {
someFlow.value = "someDefaultValue"
}
}
}
}
Solution 2:
Probably the easiest way to solve the race condition is to use select() as in @Joffrey's answer. select()
guarantees to execute only a single branch.
However, I believe mutating a shared flow concurrently complicates the situation and introduces another race condition that we need to solve. Instead, we can do it really very easily:
flow {
val network = async { someNetworkCall() }
select {
network.onAwait{ emit(it) }
onTimeout(1000) {
emit("initial")
emit(network.await())
}
}
}
There are no race conditions to handle. We have just two simple execution branches, depending on what happened first.
If we need a StateFlow
then we can use stateIn() to convert a regular flow. Or we can use a MutableStateFlow
as in the question, but mutate it only inside select()
, similarly to above:
select {
network.onAwait{ someFlow.value = it }
onTimeout(1000) {
someFlow.value = "initial"
someFlow.value = network.await()
}
}
Solution 3:
You can launch two coroutines simultaneously and cancel the Job
of the first one, which responsible for emitting default value, in the second one:
val someFlow = MutableStateFlow("someInitialValue")
val firstJob = launch {
delay(SOME_NUMBER_MILLIS)
ensureActive() // Ensures that current Job is active.
someFlow.update {"DefaultValue"}
}
launch {
val networkCallValue = someNetworkCall()
firstJob.cancelAndJoin()
someFlow.update { networkCallValue }
}