I wanted to know how can I send/emit items to a Kotlin.Flow, so my use case is:

在使用者/视图模型/演示者中,我可以subscribe使用collect函数:

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

But the issue is in the Repository side, with RxJava we could use a Behaviorsubject expose it as an Observable/Flowable and emit new items like this:

behaviourSubject.onNext(true)

但每当我建立一个新的流程:

flow {

}

我只能collect岁.如何将值发送到流?

推荐答案

如果您想获得订阅/Collection 的latest值,则应使用ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>()

这将复制BehaviourSubject,以将通道作为流公开:

// Repository
fun observe() {
  return channel.asFlow()
}

Now to send an event/value to that exposed Flow simple send to this channel.

// Repository
fun someLogicalOp() {
  channel.send(错误) // This gets sent to the ViewModel/Presenter and printed.
}

慰问:

错误

如果您只希望接收after个值,那么开始收集时应该使用BroadcastChannel.

To make it clear:

Behaves as an Rx's PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(符合事实的)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(错误)
}

错误

Only 错误 gets printed as the first event was sent before collect { }.


Behaves as an Rx's BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(符合事实的)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(错误)
}

符合事实的

错误

两个事件都会打印出来,您总是会得到latest的值(如果存在).

Also, want to mention Kotlin's team development on DataFlow (name pending):

哪个似乎更适合这个用例(因为它将是cold stream).

Kotlin相关问答推荐

使用数据存储首选项Kotlin Jetpack Compose

为什么onEach不是挂起函数,而Collect是?

Kotlin中函数引用的否定

将文本与文本字段的内容对齐

禁用 Gradle执行消息

将 Integer 转换为 Unit 编译成功

如何在kotlin中使用协程每秒调用一个函数

Kotlin JS JSON 反序列化

使用 Hilt 注入 CoroutineWorker

kotlin,如何从函数返回类类型

Map.mapTo 到另一个map

有没有办法在spring webflux和spring data react中实现分页

如何在主线程上使用 Kotlin 协程 await()

无法解决:androidx.lifecycle:lifecycle-viewmodel-ktx:1.1.1

如何在使用 Gradle 的 AppEngine 项目中使用 Kotlin

Kotlin:如何从另一个类访问字段?

如何在Kotlin中使用ViewModelProviders

Kotlin:使用自定义设置器时没有lateinit的解决方法?

旋转 kotlin 数组

任务':app:kaptDebugKotlin'的Kotlin执行失败