我有一个简单的API的外部服务,基于注册:

  • API.listen(listener)将注册监听程序以在它们进来时接收更新
  • API.stopListening(listener)将删除监听器,该监听器将不再接收更新

我认为Kotlin共享流将是一种很好的表示,并且使用起来更方便,所以我正在try 在这两个API之间架起桥梁.

但是,我不知道何时/如何删除监听程序,何时应该停止流.

fun API.toFlow(): SharedFlow<Int> {
  val flow = MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
  val listener = Listener { newValue -> flow.tryEmit(newValue) }
  this.listen(listener)
  // TODO: remove listener at some point
  return flow
}

推荐答案

对于这种情况,有一个专门的流构建器,即callbackFlow:

fun API.toFlow() = callbackFlow {
    val listener = Listener { newValue -> trySendBlocking(newValue) }
    this@toFlow.listen(listener)

    awaitClose { stopListening(listener) }
}
//.shareIn(...)
//.stateIn(...)

在lambda中,您定义并注册侦听器.对于应该发送到流的每个值,您需要调用send个函数中的一个(或trySendtrySendBlocking,取决于错误处理的需要).

当流程完成时(在构建器块内调用cancelclose之后,或者取消收集协程之后),调用awaitClose的块.这可用于在完成监听后进行清理,如取消注册监听程序.

这个流构建器创建一个cold流,这意味着每次收集流时,都会再次执行构建器块(即使只调用了API.toFlow()一次).如果您想要为多个收集者共享此流量,您可以通过呼叫shareIn将其转换为SharedFlow.不过,您示例中的流的配置看起来很像StateFlow,因此您可以直接调用stateIn而不是shareIn.

关于通过调用shareInstateIn来停止hot生成的流的 comments ,让我们来看看当收集callbackFlow生成的流时实际上发生了什么.

让我们假设我们创建了一个StateFlow(与一般的SharedFlow相比,它将隐含地具有replay = 1onBufferOverflow = BufferOverflow.DROP_OLDEST,外加distinctUntilChanged()),如下所示:

.stateIn(
    scope = flowScope,
    started = SharingStarted.WhileSubscribed(),
    initialValue = initial,
)

现在,我们需要一个flowScope的值,它将是应该用于运行流的CoroutinesScope.之所以需要它,是因为流现在是hot,即使没有人收集它,它也会运行(cold流不需要,因为收集协程作用域用于收集).

此外,我们需要一个initial的值来立即为流提供一个值.AStateFlow总是有一个值,即使底层流(这里是callbackFlow)还没有发出新值.

例如,这两个值可以作为参数提供给API.toFlow()函数.

关于您的问题,参数started是最有趣的:它定义了底层流是如何启动的.对于SharingStarted.WhileSubscribed(),当至少有一个收集器订阅StateFlow时,基础流开始.当最后一个收集器取消订阅时,流将被停止.

对于callbackFlow,这意味着每次当收集器订阅StateFlow并且是唯一订阅StateFlow的收集器时,都会执行构建器块.每个后续订阅者将收到与第一个订阅者接收到的值相同的值.这样,Collection 者就可以订阅和取消StateFlow的订阅,这callbackFlow人只会注意到第一个.

只有当最后一个收集器取消订阅时,callbackFlow才会通知它应该关闭.在这种情况下,执行awaitClose关闭波长,并且流程完成.

然而,StateFlow仍然处于活动状态(StateFlow永远不会完成).如果新的收集器订阅了流,则基础流将重新启动.对于我们的callbackFlow,这意味着再次执行构建器块,并创建新的流(使用新的侦听器).

正如我所理解的,这是你想要的行为,SharingStarted.WhileSubscribed()提供了这个开箱即用的.还有其他SharingStarted种行为.

最后一点:如果你频繁地向你的StateFlow添加和删除订户,你可能会遇到底层callbackFlow反复停止和重新启动的情况,这可能会代价高昂,具体取决于你的Listener的工作方式.在这种情况下,您可以将时间段指定为SharingStarted.WhileSubscribed()的可选参数,这样即使在最后一个收集器取消订阅之后,底层流也会保持活动状态.如果在该时间段内有新的收集器订阅,则StateFlow恢复,就好像什么都没有发生一样,底层callbackFlow永远不会停止和重新启动. 常用值是5000毫秒,但您可以放入任何适合您的用例的值.

Kotlin相关问答推荐

Kotlin 海峡没有结束

Kotlin:有限的并行性并不是限制并行性

如何接受任何派生类KClass

Scala与Kotlin中的迭代

为什么 Kotlin 中没有 init 块的注释

在 kotlin 中使具体化字段可选

在 Kotlin 协程中切换 IO 和 UI 的正确方法是什么?

如何将jooq multiset的结果映射到Hashmap(Java Map)?

嵌套数组 indexOf()

如何使用 Kotlin KClass 属性 simpleName 生成空值

Kotlin 无法找到或加载主类

@InlineOnly 注释是什么意思?

使用最新的 com.jakewharton.rxbinding3:rxbinding:3.0.0-alpha2 库时未找到 RxTextView 和其他小部件

为什么我在使用 Jetpack Compose clickable-Modifier 时收到后端内部错误:Exception during IR lowering error?

Foo::class.java 和 Foo::javaClass 有什么区别?

在 Koin 中提供一个 Instance 作为其接口

如何在 Kotlin 中创建一个打开新活动(Android Studio)的按钮?

Kotlin中的属性(properties)和参数(parameters)有什么区别?

IllegalStateException:function = , count = 3, index = 3

Kotlin中对象和数据类的区别是什么?