对于这种情况,有一个专门的流构建器,即callbackFlow
:
fun API.toFlow() = callbackFlow {
val listener = Listener { newValue -> trySendBlocking(newValue) }
this@toFlow.listen(listener)
awaitClose { stopListening(listener) }
}
//.shareIn(...)
//.stateIn(...)
在lambda中,您定义并注册侦听器.对于应该发送到流的每个值,您需要调用send
个函数中的一个(或trySend
或trySendBlocking
,取决于错误处理的需要).
当流程完成时(在构建器块内调用cancel
或close
之后,或者取消收集协程之后),调用awaitClose
的块.这可用于在完成监听后进行清理,如取消注册监听程序.
这个流构建器创建一个cold流,这意味着每次收集流时,都会再次执行构建器块(即使只调用了API.toFlow()
一次).如果您想要为多个收集者共享此流量,您可以通过呼叫shareIn
将其转换为SharedFlow
.不过,您示例中的流的配置看起来很像StateFlow
,因此您可以直接调用stateIn
而不是shareIn
.
关于通过调用shareIn
或stateIn
来停止hot生成的流的 comments ,让我们来看看当收集callbackFlow
生成的流时实际上发生了什么.
让我们假设我们创建了一个StateFlow
(与一般的SharedFlow
相比,它将隐含地具有replay = 1
和onBufferOverflow = BufferOverflow.DROP_OLDEST
,外加distinctUntilChanged()
),如下所示:
.stateIn(
scope = flowScope,
started = SharingStarted.WhileSubscribed(),
initialValue = initial,
)
现在,我们需要一个flowScope
的值,它将是应该用于运行流的CoroutinesScope
.之所以需要它,是因为流现在是hot,即使没有人收集它,它也会运行(cold流不需要,因为收集协程作用域用于收集).
此外,我们需要一个initial
的值来立即为流提供一个值.AStateFlow
总是有一个值,即使底层流(这里是callbackFlow
)还没有发出新值.
例如,这两个值可以作为参数提供给API.toFlow()
函数.
关于您的问题,参数started
是最有趣的:它定义了底层流是如何启动的.对于SharingStarted.WhileSubscribed()
,当至少有一个收集器订阅StateFlow
时,基础流开始.当最后一个收集器取消订阅时,流将被停止.
对于callbackFlow
,这意味着每次当收集器订阅StateFlow
并且是唯一订阅StateFlow
的收集器时,都会执行构建器块.每个后续订阅者将收到与第一个订阅者接收到的值相同的值.这样,Collection 者就可以订阅和取消StateFlow
的订阅,这callbackFlow
人只会注意到第一个.
只有当最后一个收集器取消订阅时,callbackFlow
才会通知它应该关闭.在这种情况下,执行awaitClose
关闭波长,并且流程完成.
然而,StateFlow
仍然处于活动状态(StateFlow
永远不会完成).如果新的收集器订阅了流,则基础流将重新启动.对于我们的callbackFlow
,这意味着再次执行构建器块,并创建新的流(使用新的侦听器).
正如我所理解的,这是你想要的行为,SharingStarted.WhileSubscribed()
提供了这个开箱即用的.还有其他SharingStarted
种行为.
最后一点:如果你频繁地向你的StateFlow
添加和删除订户,你可能会遇到底层callbackFlow
反复停止和重新启动的情况,这可能会代价高昂,具体取决于你的Listener
的工作方式.在这种情况下,您可以将时间段指定为SharingStarted.WhileSubscribed()
的可选参数,这样即使在最后一个收集器取消订阅之后,底层流也会保持活动状态.如果在该时间段内有新的收集器订阅,则StateFlow
恢复,就好像什么都没有发生一样,底层callbackFlow
永远不会停止和重新启动.
常用值是5000毫秒,但您可以放入任何适合您的用例的值.