我有一个List<Flow<T>>,想生成一个Flow<List<T>>.这几乎就是combine所做的——除了combine等待每一个Flow发出一个初始值,这不是我想要的.以这段代码为例:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}

With combine (and hence as-is), this is the output:

[a2, b1, c]
[a2, b2, c]

Whereas I'm interested in all the intermediary steps too. This is what I want from those three flows:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

现在我有两个变通办法,但没有一个是很好的……第一个很难看,并且不适用于可为空的类型:

val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}

通过强制所有流发出第一个不相关的值,combine transformer确实被调用,并允许我删除我知道不是实际值的空值.重复这一点,可读性更强但更重:

sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}

现在这个不错,但是我还是觉得我做得有点过头了.协程程序图书馆里有没有我遗漏的方法?

推荐答案

How about this:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}

It solves a few problems:

  • no need to introduce a new type
  • [] is not in the resulting Flow
  • abstracts away null-handling (or however it is solved) from the call-site, the resulting Flow deals with it itself

So, you won't notice any implementation specific workarounds, because you don't have to deal with it during collection:

runBlocking {
    instantCombine(a, b, c).collect {
        println(it)
    }
}

Output:

[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

Try it out here!

Edit: Updated answer to handle Flows which emit null values too.


*使用的低级数组是线程安全的.就像您在处理单个变量一样.

Kotlin相关问答推荐

在Kotlin中处理结果的高阶函数

"Kotlin中的表达式

用A*搜索算法解决特修斯和米诺陶尔难题

如何使用 Kotlin Maven 更改 Minecraft 插件中的 Shulker GUI 标题

为什么 Kotlin 在 sumOf 函数 lambda 中默认不将数字视为Int?

有没有一种简单的方法可以将数组/列表中的每个元素相互相乘 - Kotlin?

为 Gradle 子项目配置 Kotlin 扩展

T except one class

Jetpack BottomNavigation - java.lang.IllegalStateException:Already attached to lifecycleOwner

零安全的好处

如果我可以将 Flow 和 StateFlow 与生命周期范围 \ viewLifecycleOwner.lifecycleScope 一起使用,那么在 ViewModel 中使用 LiveData 有什么意义

Kotlin的web-framework框架

Jacoco在Gradle 7.0.2和Kotlin 1.5.10上失败

Kotlin:使用Gradle进行增量编译

使用范围的稀疏sparse值列表

Kotlin中的属性(properties)和参数(parameters)有什么区别?

如何在 Gradle Kotlin DSL 中使用来自 gradle.properties 的插件版本?

在 IntelliJ Idea 中未为 Kotlin @ConfigurationProperties 类生成 spring-configuration-metadata.json 文件

带有注释为@RegisterExtension的字段的 JUnit 5 测试在 Kotlin 中不起作用

Kotlin反射不可用