这里有sourceFlow,我使用flatMapLatest根据sourceFlow的值生成新的流. 而collect()没有任何价值.此代码不打印任何内容.

fun main() {

    fun createNewFlow(): MutableSharedFlow<String> = MutableSharedFlow()
    runBlocking {
        val sourceFlow = MutableSharedFlow<Int>()

        // flow for transform
        val set = mapOf(
            1 to createNewFlow(),
            2 to createNewFlow(),
            3 to createNewFlow()
        )


        launch {
            // get flow from set by number from sourceFlow
            val res = sourceFlow.flatMapLatest { set[it]!! }
            res.collect {
                println(it)
            }
        }

        launch {
            sourceFlow.emit(1)
            set[1]!!.emit("1")
            set[1]!!.emit("1")
            sourceFlow.emit(2)
            set[2]!!.emit("2")
            set[2]!!.emit("2")
        }
    }
}

如果我将flatMapLatest更改为flatMapConcat,我会看到1两次,这显然是因为从集合[1]开始的流没有完成,但当我使用flatMapLatest时,我正在等待1, 1, 2, 2, 3, 3.我的样品有什么问题?

UPD: 我只是增加了延迟以得到println的结果. 因此,这实际上是一场代码竞赛

launch {
        delay(3000)
        sourceFlow.emit(1)
        set[1]!!.emit("1")
        set[1]!!.emit("1")
    }

推荐答案

正如@Louis-Wasserman在 comments 中指出的那样,这是一场竞赛(尽管不是一场线程不安全的数据竞赛).

flatMapLatest内部collect(并取消)子流.默认情况下,MutableSharedFlow在订阅之前不重放元素.

因此,这两个协议线之间存在着一场竞争.如果第一协程在第二协程呼叫emit之后成为订户,则不会收到任何值.您可以指定replay参数,以确保collect个调用在订阅之前接收元素.

Kotlin相关问答推荐

为什么可组合对象看似无状态(唯一传递的参数是函数,而不是状态),但会进行重组

可组合项在返回后返回时组合导航句柄

循环中的每个元素都应填充行中可用的所有可用空间

Eclipse:无法安装 Kotlin 插件

如何将 `throw` 放置在辅助函数中但仍然具有空安全性?

Kotlin 列表扩展功能

kotest 更改环境变量

JobIntentService 被销毁,当应用程序被销毁时

Kotlin 插件错误:无法为类 org.jetbrains.kotlin.gradle.tasks.KotlinCompile 生成代理类

@InlineOnly 注释是什么意思?

是否可以在 kotlin 中嵌套数据类?

Kotlin - mutableMapOf() 会保留我输入的顺序

Kotlin 有 array.indexOf 但我无法弄清楚如何做 array.indexOfBy { lambda }

如何在 Jetpack Compose 的 LazyColumn/LazyRow 中禁用和启用滚动?

Kotlin - 是否可以在类中的 init 块之前初始化伴随对象?

如何在 Gradle Kotlin DSL 中使用来自 gradle.properties 的插件版本?

比较Kotlin的NaN

从 java 活动 *.java 启动 kotlin 活动 *.kt?

函数引用和lambdas

Android Jetpack Compose - 图像无法zoom 到框的宽度和高度