Long polling with kotlin coroutines
As @Spitzbueb said, you could do something similar with CompletableDeferred
.
However, if you don't need to support the clear()
and count()
methods, you could also probably simplify by replacing the ConcurrentHashMap
with a simple MutableSharedFlow<Unit>
that broadcasts "pings" from redis.
In onMessage
, you could emit Unit
into the mutable shared flow to notify subscribers, and then you can simply implement your request mechanism by awaiting the first element on the shared flow and making the readSubset
request:
class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {
private val events = MutableSharedFlow<Unit>()
suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
val currentMsgs = appMsgRepo.readSubset(start)
if (currentMsgs.isNotEmpty()) {
return currentMsgs
}
val newMessages = withTimeoutOrNull(timeoutMillis) {
events.first()
appMsgRepo.readSubset(start)
}
return newMessages ?: emptyList()
}
override fun onMessage(message: Message, pattern: ByteArray?) {
LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
events.tryEmit(Unit)
}
companion object {
private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
private val UTF8: Charset = StandardCharsets.UTF_8
}
}
The controller can then simply call requestMessages
(provided you make your controller use suspend
functions with Spring WebFlux).
In Kotlin coroutines there is the Deferred
type, which is similar to CompletableFuture
in the sense that it represents a value that is not yet available but probably will be in the future (if no error occurs/exception is thrown). @Joffrey pointed out that there is also a CompletableDeferred
, which is even closer to ComplatableFuture
enabling the user to manually call complete
or exceptionallyComplete
.
Deferreds can easily be created with the async
extension function on CoroutineScope
. If you want to set a timeout, Kotlin has you covered with the withTimeout
function that cancels the block of code after a given time.
Note that withTimeout
should be inside async
and not the other way around.
Take a look at this example: https://pl.kotl.in/uYe12ds7g