在Kotlin Coroutines中,如何验证ifwhen是否发生任何流背压,即,如果我有一个流正在使用buffer(),我如何判断和记录每次生产者和/或消费者由于背压而暂停自己?

我想这样做是为了微调流缓冲区大小,以及消费者端的并行性.为了示例起见,我将提供一个简单的示例:

suspend fun main() {
    val bufferedFlow = createFlow().buffer()

    // Here, the collector is much slower than the producer
    bufferedFlow.collect {
        delay(100)
        println("Collecting: $it")
    }
}

// Produces numbers at a fast pace
fun createFlow(): Flow<Int> = (0..Integer.MAX_VALUE).asFlow().onEach {
    delay(10)
    println("Emitting: $it")
}

在本例中,消费者被生产者淹没,但是由于生产者支持背压,所以它被暂停,直到消费者消费缓冲区中的一个项目.当然,在这个例子中,哪一边是fast,哪一边是slow是显而易见的,但有些情况下事情并不那么清楚.

如果一个人想检测和记录每次生产者或消费者由于来自另一方的反压力而暂停,他如何做到这一点?

推荐答案

我还在Kotlin Slack上发布了同样的问题,并从Zach Klippenstein个我们如何检测反压的有趣答案中得到了.

首先,我们编写一个方法,它可以检测到它的lambda中的方法何时挂起.

suspend inline fun <T> onSuspended(
    crossinline onSuspended: () -> Unit,
    noinline block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn { cont ->
    val result = block.startCoroutineUninterceptedOrReturn(cont)
    if (result == COROUTINE_SUSPENDED) {
        onSuspended()
    }
    result
}

然后,使用这个方法,我们可以创建另一个方法来包装流生成器,以便在它每次挂起自己时发现它.

fun <T> Flow<T>.onBackpressure(
    onBackpressure: () -> Unit
): Flow<T> = flow {
    collect {
        onSuspended(onBackpressure) {
            emit(it)
        }
    }
}

使用onBackpressure方法,我们有了编写一个方法所需的一切,该方法每次流生成器挂起时执行任意操作.为了提供一个示例,这里有一个方法,它包装了一个buffer()(不放弃它的任何特性)和日志(log)ifhow long,生产者自上次emits 以来一直被暂停.

suspend fun main() {
    val bufferedFlow = createFlow().bufferLoggingBackpressure()

    // Here, the collector is much slower than the producer
    bufferedFlow.collect {
        delay(100)
        println("Collecting: $it")
    }
}

// Produces numbers at a fast pace
fun createFlow(): Flow<Int> = (0..Integer.MAX_VALUE).asFlow().onEach {
    delay(10)
    println("Emitting: $it")
}

fun <T> Flow<T>.bufferLoggingBackpressure(
    capacity: Int = Channel.BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
): Flow<T> {
    var suspended = false
    var lastSuspension = 0L

    return onBackpressure {
        suspended = true
        lastSuspension = System.nanoTime()
    }.buffer(capacity = capacity, onBufferOverflow = onBufferOverflow).onEach {
        if (suspended) {
            println("Backpressure detected, suspended for ${(System.nanoTime() - lastSuspension).nanoseconds.inWholeMicroseconds}us")
            suspended = false
        }
    }
}

有了这段代码,我们就能够对流程进行反压,这就是我自己问题的答案.

Kotlin相关问答推荐

如何在Kotlin中反射多个对象以查找特定类型的属性

当我通过媒体通知更改音乐时不要更新我的 UI

如何优雅地声明一个StateFlow?

kotlin中如何使用子类调用父静态方法?

如何在 kotlin 中的数据类中为变量提供多种类型

Kotlin supervisorScope 即使包裹在 try catch 中也会失败

Kotlin 函数中接收者和参数的类型相同

如何将glide显示的图像下载到设备存储中.Kotlin

嵌套数组 indexOf()

Kotlin 有垃圾收集器吗?如果是这样,它基于哪种算法?

如何为 material.Slider 视图创建绑定适配器?

OnClickListener 未在 ConstraintLayout 上触发

将 @Component.Builder 与构造函数参数一起使用

Kotlin suspend fun

如何在kotlin语言中将字符转换为ascii值

使用范围的稀疏sparse值列表

如何在Kotlin中获得KType?

不推荐使用仅限生命周期的LifecycleEvent

通过在 kotlin-gradle 中使用子项目Unresolved reference: implementation

Kotlin Flow 收集后无法执行代码