我用Swift Concurrency实现了一个作业(job)调度器.这些工作只是关闭.此调度程序并行处理一定数量的作业(job),并要求其他作业(job)等待.它使用一个参与者来封装所有可变数据.

我设法让它发挥作用.但我觉得很麻烦.我怎样才能让它变得更好?我可以以不同的方式实现它吗?热烈欢迎所有建议.

class CustomJob {
    var completion: () -> ()
    init(completion: @escaping () -> Void) {
        self.completion = completion
    }
}

actor JobQueue {
    private var maxRunningCount: Int
    private var runningJobCount = 0
    private var pendingJobs = [CustomJob]()
    
    init(maxRunningCount: Int) {
        self.maxRunningCount = maxRunningCount
    }
    
    func addJob(job: CustomJob) {
        pendingJobs.append(job)
    }
    
    // I found that I need to increment the runningJobCount here.
    func nextJob() -> CustomJob? {
        if runningJobCount == maxRunningCount {
            print("The next job needs to wait")
            return nil
        }
        if runningJobCount < maxRunningCount && pendingJobs.count > 0 {
            runningJobCount += 1
            return pendingJobs.removeFirst()
        } else {
            return nil
        }
    }
    
    
    func finishOneJob() {
        runningJobCount -= 1
    }
}

class JobScheduler {
    
    let jobQueue: JobQueue
    
    init(maxRunningCount: Int) {
        jobQueue = JobQueue(maxRunningCount: maxRunningCount)
    }
    
    func scheduleJob(job: @escaping () -> ()) {
        Task {
            await jobQueue.addJob(job: CustomJob(completion: job))
            run()
        }
    }
    
    private func run() {
        Task {
            if let job = await jobQueue.nextJob() {
                Task {
                    await self.executeJob(job: job)
                    await self.jobQueue.finishOneJob()
                    run()
                }
            }
        }
    }
    
    private func executeJob(job: CustomJob) async {
        return await withCheckedContinuation { continuation in
            job.completion()
            continuation.resume()
        }
    }
}

我使用一个调度小组安排了5项工作并进行测试.

// MARK: - TEST

let processor = JobScheduler(maxRunningCount: 2)
let group = DispatchGroup()

for job in 1...5 {
    group.enter()
    print("Job \(job) scheduled")
    processor.scheduleJob {
        print("Job \(job) starts")
        sleep(2)
        print("Job \(job) complete")
        group.leave()
    }
}

group.wait()
print("Done")

推荐答案

我将首先说明如何在Swift并发中通过同步工作做到这一点,但然后解释为什么您可能想通过同步工作做到这一点.不幸的现实是,正确的解决方案将根据所启动的工作类型而有所不同.您可能不想要一刀切的通用调度器,但要真正考虑应用程序的特定需求,并为您的特定用例 Select 合适的模式.


那么,首先,我们如何在Swift并发中针对同步任务做到这一点?

  1. 限制Swift并发性内的并发性.

    为了限制Swift并发中的并发性,我们将使用任务组,但每次超过一定阈值的迭代都调用next:

    await withTaskGroup(of: Void.self) { group in
        var index = 0
        for await work in sequence {
            index += 1
            if index > 4 { await group.next() }
    
            group.addTask { await work() }
        }
    }
    

    看到How to constrain concurrency (like maxConcurrentOperationCount) with Swift Concurrency?

  2. 事后我们如何将工作添加到AsyncSequence中?

    将作业(job)添加到AsyncSequence的一种简单方法是使用Swift Async 算法rithms包中的AsyncChannel.例如:

    actor Scheduler {
        typealias Work = @Sendable () async -> Void
    
        private let channel = AsyncChannel<Work>()
    
        func start() async {
            await withTaskGroup(of: Void.self) { group in
                var index = 0
                for await work in channel {
                    index += 1
                    if index > 4 { await group.next() }
    
                    group.addTask { await work() }
                }
            }
        }
    
        func addWork(_ work: @escaping Work) async {
            await channel.send(work)
        }
    }
    

    然后我们可以做这样的事情:

    struct ContentView: View {
        let scheduler = Scheduler()
    
        var body: some View {
            VStack {
                Button("Channel") {
                    Task {
                        await scheduler.addWork {
                            …
                        }
                    }
                }
            }
            .task {
                await scheduler.start()
            }
        }
    }
    

    因此,它会在视图启动时启动调度程序,并在每次点击按钮时添加工作.例如,当我在仪器中运行该功能时,我在每次点击按钮时添加了一个路标RST,并在工作运行时添加了一个"间隔".然后我运行该应用程序,连续点击按钮十次,您可以看到它一次只运行四个任务:

    enter image description here


在概述了如何在Swift并发中处理同步工作后,我们应该注意到您的示例是用于处理同步任务的.但我们必须避免从Swift并发中调用缓慢的同步函数.Swift并发的合作线程池每个中央处理器核心只有一个线程.整个并发系统基于一个契约,永远不会阻碍协作线程池中任何线程的向前推进.

有两种可能的解决方案:

首先,如果这是您自己的某种计算算法,您可以周期性地yield,从而防止Swift并发系统出现僵局.

其次,如果这不可能,Apple明确建议将其移至Swift并发系统之外(例如,将其移至GCD).然后,您可以将该遗留模式包装在"延续"中,以桥回Swift并发代码库.(通常,我们建议避免将GCD与Swift并发混合,但这是规则的例外.)参见百分之三.有关更多信息,请参阅WWDC 2022视频Visualize and optimize Swift concurrency,其中讨论了从Swift并发系统中获取缓慢的同步工作.


无论如何,如果您确实试图限制缓慢、同步任务的并发性,那么简单的遗留解决方案是operation queue:

let queue = OperationQueue()
queue.maxConcurrentOperationCount = 4

并且:

queue.addOperation { … }

这是同步工作的受约束并行性的简单解决方案,您可能会以实时方式启动工作.这会产生完全相同的行为:

enter image description here

另一种遗留方法是GCD的concurrentPerform,但仅在预先启动固定数量的同步工作项时使用.或者在Combine中,我们可能会使用flatMap(maxPublishers:transform:).有关一些遗留选项的调查,请参阅How to avoid thread explosion when using DispatchSemaphores?

Swift相关问答推荐

如何分解阿拉伯字母?

它是RxSwift中直接用于可扩展视图的有效的可扩展BehaviorRelay值?

从AppKit打开SwiftUI设置

可选,类似于自定义类型的初始化

Swift Property Wrappers-Initialization 失败并显示无法将‘Double’类型的值转换为预期的参数类型

有没有一种方法可以访问封闭实例ObservableObject以从属性包装器中的任何位置调用objectWillChange.send()

TabView 无法在屏幕上正确显示

我如何读取并通知可可 macos 中某个类的计算(computed)属性?

为什么 NumberFormatter 在 Swift 中将 19,999,999,999,999,999 格式化为 20,000,000,000,000,000?

有什么方法可以快速为泛型参数分配默认值?

使用 Swiftui 水平修剪 Shape()

在 Swift 中实现自定义异步序列

使用 Date.ParseStrategy 将字符串解析为日期

Apple 的自然语言 API 返回意外结果

在运行时访问 UIView 宽度

让我的函数计算数组 Swift 的平均值

Swift 中的 CommonHMAC

Swift 2.0 中的 do { } catch 不处理从这里抛出的错误

WKWebView 确实从本地文档文件夹加载资源

Void 函数中意外的非 Void 返回值 (Swift 2.0)