I've got a Sequence (from File.walkTopDown) and I need to run a long-running operation on each of them. I'd like to use Kotlin best practices / coroutines, but I either get no parallelism, or way too much parallelism and hit a "too many open files" IO error.

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async { // I *think* I want async and not "launch"...
            ImageProcessor.fromFile(file)
        }
    }

This doesn't seem to run it in parallel, and my multi-core CPU never goes above 1 CPU's worth. Is there a way with coroutines to run "NumberOfCores parallel operations" worth of Deferred jobs?

I looked at Multithreading using Kotlin Coroutines which first creates ALL the jobs then joins them, but that means completing the Sequence/file tree walk completly bfore the heavy processing join step, and that seems... iffy! Splitting it into a collect and a process step means the collection could run way ahead of the processing.

val jobs = ... the Sequence above...
    .toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }

推荐答案

我让它和一个频道一起工作.但也许我对你的方式是多余的?

val pipe = ArrayChannel<Deferred<ImageFile>>(20)
launch {
    while (!(pipe.isEmpty && pipe.isClosedForSend)) {
        imageFiles.add(pipe.receive().await())
    }
    println("pipe closed")
}
File("/Users/me/").walkTopDown()
        .onFail { file, ex -> println("ERROR: $file caused $ex") }
        .forEach { pipe.send(async { ImageFile.fromFile(it) }) }
pipe.close()

Kotlin相关问答推荐

Kotlin 海峡没有结束

在Jetpack Compose中创建波浪式文本动画:顺序中断问题

Kotlin:将泛型添加到列表任何>

两个LocalDateTime之间的Kotlin差异

在 detekt 配置文件中找不到某些属性

使用 StateFlow 时如何移除监听器?

为 Gradle 子项目配置 Kotlin 扩展

类型是什么意

创建首选项屏幕时找不到androidx.preference.PreferenceScreen

retrofit 响应代码 405 并带有消息method not allowed here

将 Firebase 数据快照反序列化为 Kotlin 数据类

带有迭代器函数的 Kotlin 无限序列

哪里可以找到aapt2日志(log)?

如何在使用 Gradle 的 AppEngine 项目中使用 Kotlin

在Kotlin中不带类直接引用枚举实例

是否在Kotlin中重写enum toString()?

Kotlin:具有多个不同类型设置器的单个属性

Kotlin,什么时候按map授权?

Kotlin - 错误:Could not find or load main class _DefaultPackage

Kotlin - 为什么我会得到 KotlinNullPointerException