我是Kotlin Channel的新手.

下面的代码是produce扩展函数的实践代码,它返回ReceiveChannel.对于repeat(10)的每一次迭代,用新的ReceiveChannel重新分配变量cur.这就是我在filter扩展函数中调用cancel()方法的原因.

我本以为这会正常结束,但事实并非如此:

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
    numbers.cancel() // Canceled here
}

fun main() {
    runBlocking {
        var cur = numbersFrom(2) // infinitely send numbers...
        repeat(10) {
            val prime = cur.receive() // retrieve only one element
            println(prime)
            cur = filter(cur, prime) // new Receiver
        }
        cur.cancel() // Canceled here
    }
}

我try 了以下几种方法,但得到了这样的例外:

线程"main"中出现异常kotlinx.coroutines.JobCancerationException:BlockingCoroutine已取消;JOB=BlockingCoroutine{取消}@26a7b76d

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
    numbers.cancel()
}

fun main() {
    runBlocking {
        var cur = numbersFrom(2) // infinitely send numbers...
        repeat(10) {
            val prime = cur.receive() // retrieve only one element
            println(prime)
            cur = filter(cur, prime) // new Receiver
        }
        cur.cancel()
        coroutineContext.cancel()
    }
}

此外,此代码自然结束,因为它取消了所有子代协程:

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
    numbers.cancel()
}

fun main() {
    runBlocking {
        var cur = numbersFrom(2) // infinitely send numbers...
        repeat(10) {
            val prime = cur.receive() // retrieve only one element
            println(prime)
            cur = filter(cur, prime) // new Receiver
        }
        coroutineContext.cancelChildren()
    }
}

有没有不用coroutineContext.cancelChildren()就能自然完成的方法?

推荐答案

问题出在filter方法上.

你循环所有的数字,then取消数字频道. 由于Numbers通道创建了一个unlimited系列的数字,因此循环永远不会结束,代码中的下一行,也就是通道被取消的地方,永远不会到达.

我真的不明白您试图用代码实现什么,所以我不能建议一个具体的解决方案--除了将调用放到其他地方的cancel(),在那里它实际上将被执行.

Kotlin相关问答推荐

最好的方法来创建一个 map 在kotlin从两个列表

在Kotlin项目中使用Free Fair AspectJ插件(使用Gradle)

为什么Kotlin不用static inner class来实现带有object关键字的单例呢?

Kotlin:我可以将函数分配给 main 的伴随对象中的变量吗?

在 kotlin 中重载函数时,我在一些非常基本的代码上不断收到类型不匹配

使用 LazyListScope 嵌套可组合项

如何在 Compose 中创建可重用的修饰符?

kotlin 单例异常是好是坏?

Dagger 2 ContributesAndroidInjector 为模块提供活动

Kotlin - 当表达式返回函数类型

如何禁用智能投射突出显示 Kotlin?

Dagger2 Qualifier 不适用于 Kotlin?

未在IntelliJ IDEA上运行临时文件

如果我可以将 Flow 和 StateFlow 与生命周期范围 \ viewLifecycleOwner.lifecycleScope 一起使用,那么在 ViewModel 中使用 LiveData 有什么意义

Kapt不适用于Android Studio 3.0中的AutoValue

Kotlin中具有多个参数的绑定适配器

在Kotlin中使用@Service时引发异常

如何在Kotlin中将字符串转换为InputStream?

Failure delivering result on activity result

Kotlin 是否支持部分应用程序?