我还在Kotlin Slack上发布了同样的问题,并从Zach Klippenstein个我们如何检测反压的有趣答案中得到了.
首先,我们编写一个方法,它可以检测到它的lambda中的方法何时挂起.
suspend inline fun <T> onSuspended(
crossinline onSuspended: () -> Unit,
noinline block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn { cont ->
val result = block.startCoroutineUninterceptedOrReturn(cont)
if (result == COROUTINE_SUSPENDED) {
onSuspended()
}
result
}
然后,使用这个方法,我们可以创建另一个方法来包装流生成器,以便在它每次挂起自己时发现它.
fun <T> Flow<T>.onBackpressure(
onBackpressure: () -> Unit
): Flow<T> = flow {
collect {
onSuspended(onBackpressure) {
emit(it)
}
}
}
使用onBackpressure
方法,我们有了编写一个方法所需的一切,该方法每次流生成器挂起时执行任意操作.为了提供一个示例,这里有一个方法,它包装了一个buffer()
(不放弃它的任何特性)和日志(log)if和how long,生产者自上次emits 以来一直被暂停.
suspend fun main() {
val bufferedFlow = createFlow().bufferLoggingBackpressure()
// Here, the collector is much slower than the producer
bufferedFlow.collect {
delay(100)
println("Collecting: $it")
}
}
// Produces numbers at a fast pace
fun createFlow(): Flow<Int> = (0..Integer.MAX_VALUE).asFlow().onEach {
delay(10)
println("Emitting: $it")
}
fun <T> Flow<T>.bufferLoggingBackpressure(
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
): Flow<T> {
var suspended = false
var lastSuspension = 0L
return onBackpressure {
suspended = true
lastSuspension = System.nanoTime()
}.buffer(capacity = capacity, onBufferOverflow = onBufferOverflow).onEach {
if (suspended) {
println("Backpressure detected, suspended for ${(System.nanoTime() - lastSuspension).nanoseconds.inWholeMicroseconds}us")
suspended = false
}
}
}
有了这段代码,我们就能够对流程进行反压,这就是我自己问题的答案.