我使用了PublishSubject,我正在向它发送消息,并且我还在监听结果.它工作得无懈可击,但现在我不确定如何使用Kotlin的协程(流或通道)来做同样的事情.

private val subject = PublishProcessor.create<Boolean>>()

...

fun someMethod(b: Boolean) {
    subject.onNext(b)
}

fun observe() {
    subject.debounce(500, TimeUnit.MILLISECONDS)
           .subscribe { /* value received */ }
}

Since I need the debounce operator I really wanted to do the same thing with flows so I created a channel and then I tried to create a flow from that channel and listen to changes, but I'm not getting any results.

private val channel = Channel<Boolean>()

...

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
    flow {
         channel.consumeEach { value ->
            emit(value)
         }
    }.debounce(500, TimeUnit.MILLISECONDS)
    .onEach {
        // value received
    }
}

怎么了?

推荐答案

Flow是一个冷的异步流,就像Observable一样.

流上的所有转换(例如mapfilter)都不会触发流收集或执行,只有终端操作符(例如single)才会触发它.

onEach方法只是一种转换.因此,您应该将其替换为终端流运算符collect.此外,您还可以使用BroadcastChannel来获得更干净的代码:

private val channel = BroadcastChannel<Boolean>(1)

suspend fun someMethod(b: Boolean) {
    channel.send(b)
}

suspend fun observe() {
  channel
    .asFlow()
    .debounce(500)
    .collect {
        // value received
    }
}

Update:问题提出时,有两个参数(如问题中所示)的过载为debounce.再也没有了.但是现在有一个以毫秒(长)为单位的参数

Kotlin相关问答推荐

创建具有共同父类型的两种不同类型对象的列表的最有效方法是什么?

Kotlin—从列表中枚举属性计算不同值的数量

Kotlin:将泛型添加到列表任何>

如何在操作系统版本上正确获取Room数据库的路径>;=26 sdk?

Kotlin 中的 maxOf() 和 max() 方法有什么区别?

init中的NPE抽象函数变量

如何使用 Firebase 和 Kotlin 在文本 (Jetpack Compose) 中显示当前用户名?

如何在 Kotlin 中使用具有继承的泛型

有没有办法重用 Job 实例?

Kotlin 方法重载

Kotlin 具体化的泛型不会按计划保持类型

使用 Paging 3 时保存并保留 LazyColumn 滚动位置

用mockk验证属性设置程序吗?

使用主构造函数时使用Kotlin getter/setter

Kotlin中保留的关键字是什么?

Kotlin扩展函数与成员函数?

递归方法调用在 kotlin 中导致 StackOverFlowError 但在 java 中没有

Kotlin中的函数接口

你如何在 Kotlin 中注释 Pair 参数?

Kotlin:获取文件的扩展名,例如.txt