As @Spitzbueb said, you could do something similar with CompletableDeferred.

However, if you don't need to support the clear() and count() methods, you could also probably simplify by replacing the ConcurrentHashMap with a simple MutableSharedFlow<Unit> that broadcasts "pings" from redis.

In onMessage, you could emit Unit into the mutable shared flow to notify subscribers, and then you can simply implement your request mechanism by awaiting the first element on the shared flow and making the readSubset request:

class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {

    private val events = MutableSharedFlow<Unit>()

    suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
        val currentMsgs = appMsgRepo.readSubset(start)
        if (currentMsgs.isNotEmpty()) {
            return currentMsgs
        }
        val newMessages = withTimeoutOrNull(timeoutMillis) {
            events.first()
            appMsgRepo.readSubset(start)
        }
        return newMessages ?: emptyList()
    }

    override fun onMessage(message: Message, pattern: ByteArray?) {
        LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
        events.tryEmit(Unit)
    }

    companion object {
        private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
        private val UTF8: Charset = StandardCharsets.UTF_8
    }
}

The controller can then simply call requestMessages (provided you make your controller use suspend functions with Spring WebFlux).


In Kotlin coroutines there is the Deferred type, which is similar to CompletableFuture in the sense that it represents a value that is not yet available but probably will be in the future (if no error occurs/exception is thrown). @Joffrey pointed out that there is also a CompletableDeferred, which is even closer to ComplatableFuture enabling the user to manually call complete or exceptionallyComplete.

Deferreds can easily be created with the async extension function on CoroutineScope. If you want to set a timeout, Kotlin has you covered with the withTimeout function that cancels the block of code after a given time.

Note that withTimeout should be inside async and not the other way around.

Take a look at this example: https://pl.kotl.in/uYe12ds7g