我是Kotlin Channel
的新手.
下面的代码是produce
扩展函数的实践代码,它返回ReceiveChannel
.对于repeat(10)
的每一次迭代,用新的ReceiveChannel
重新分配变量cur
.这就是我在filter
扩展函数中调用cancel()
方法的原因.
我本以为这会正常结束,但事实并非如此:
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
numbers.cancel() // Canceled here
}
fun main() {
runBlocking {
var cur = numbersFrom(2) // infinitely send numbers...
repeat(10) {
val prime = cur.receive() // retrieve only one element
println(prime)
cur = filter(cur, prime) // new Receiver
}
cur.cancel() // Canceled here
}
}
我try 了以下几种方法,但得到了这样的例外:
线程"main"中出现异常kotlinx.coroutines.JobCancerationException:BlockingCoroutine已取消;JOB=BlockingCoroutine{取消}@26a7b76d
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
numbers.cancel()
}
fun main() {
runBlocking {
var cur = numbersFrom(2) // infinitely send numbers...
repeat(10) {
val prime = cur.receive() // retrieve only one element
println(prime)
cur = filter(cur, prime) // new Receiver
}
cur.cancel()
coroutineContext.cancel()
}
}
此外,此代码自然结束,因为它取消了所有子代协程:
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
numbers.cancel()
}
fun main() {
runBlocking {
var cur = numbersFrom(2) // infinitely send numbers...
repeat(10) {
val prime = cur.receive() // retrieve only one element
println(prime)
cur = filter(cur, prime) // new Receiver
}
coroutineContext.cancelChildren()
}
}
有没有不用coroutineContext.cancelChildren()
就能自然完成的方法?