我用Swift Concurrency实现了一个作业(job)调度器.这些工作只是关闭.此调度程序并行处理一定数量的作业(job),并要求其他作业(job)等待.它使用一个参与者来封装所有可变数据.
我设法让它发挥作用.但我觉得很麻烦.我怎样才能让它变得更好?我可以以不同的方式实现它吗?热烈欢迎所有建议.
class CustomJob {
var completion: () -> ()
init(completion: @escaping () -> Void) {
self.completion = completion
}
}
actor JobQueue {
private var maxRunningCount: Int
private var runningJobCount = 0
private var pendingJobs = [CustomJob]()
init(maxRunningCount: Int) {
self.maxRunningCount = maxRunningCount
}
func addJob(job: CustomJob) {
pendingJobs.append(job)
}
// I found that I need to increment the runningJobCount here.
func nextJob() -> CustomJob? {
if runningJobCount == maxRunningCount {
print("The next job needs to wait")
return nil
}
if runningJobCount < maxRunningCount && pendingJobs.count > 0 {
runningJobCount += 1
return pendingJobs.removeFirst()
} else {
return nil
}
}
func finishOneJob() {
runningJobCount -= 1
}
}
class JobScheduler {
let jobQueue: JobQueue
init(maxRunningCount: Int) {
jobQueue = JobQueue(maxRunningCount: maxRunningCount)
}
func scheduleJob(job: @escaping () -> ()) {
Task {
await jobQueue.addJob(job: CustomJob(completion: job))
run()
}
}
private func run() {
Task {
if let job = await jobQueue.nextJob() {
Task {
await self.executeJob(job: job)
await self.jobQueue.finishOneJob()
run()
}
}
}
}
private func executeJob(job: CustomJob) async {
return await withCheckedContinuation { continuation in
job.completion()
continuation.resume()
}
}
}
我使用一个调度小组安排了5项工作并进行测试.
// MARK: - TEST
let processor = JobScheduler(maxRunningCount: 2)
let group = DispatchGroup()
for job in 1...5 {
group.enter()
print("Job \(job) scheduled")
processor.scheduleJob {
print("Job \(job) starts")
sleep(2)
print("Job \(job) complete")
group.leave()
}
}
group.wait()
print("Done")