需求
有A、B、C、D、E、F 六个任务,D依赖A、B的结果,E依赖B、C的结果,F依赖D、E的结果,A、B、C互不依赖,D、F之间不依赖,可以并发,整体要尽快执行完成。
参考实现
一、通过 OperationQueue 添加依赖关系实现
/// 通过 OperationQueue 控制异步任务调度
func testOperation() {
// 创建异步任务
let operaA = BlockOperation {
self.execTask("A")
}
let operaB = BlockOperation {
self.execTask("B")
}
let operaC = BlockOperation {
self.execTask("C")
}
let operaD = BlockOperation {
self.execTask("D")
}
let operaE = BlockOperation {
self.execTask("E")
}
let operaF = BlockOperation {
self.execTask("F")
}
let operaG = BlockOperation {
self.printf("\nFinished\n")
}
// 添加依赖关系
operaD.addDependency(operaA)
operaD.addDependency(operaB)
operaE.addDependency(operaB)
operaE.addDependency(operaC)
operaF.addDependency(operaD)
operaF.addDependency(operaE)
operaG.addDependency(operaF)
let queue = OperationQueue()
// Adds the specified operation to the receiver.
queue.addOperation(operaA)
queue.addOperation(operaB)
queue.addOperation(operaC)
queue.addOperation(operaD)
queue.addOperation(operaE)
queue.addOperation(operaF)
queue.addOperation(operaG)
// 设置最大并发数
queue.maxConcurrentOperationCount = queue.operations.count
}
二、通过 DispatchGroup 控制异步任务调度
/// 通过 DispatchGroup 控制异步任务调度
func testDispatchGroup() {
// 创建DispatchGroup
let groupD = DispatchGroup() // A、B -> D
let groupE = DispatchGroup() // B、C -> E
let groupF = DispatchGroup() // D、E -> F
let groupFinish = DispatchGroup() // 完成
// 创建异步任务
let queueA = DispatchQueue(label: "A")
let queueB = DispatchQueue(label: "B")
let queueC = DispatchQueue(label: "C")
let queueD = DispatchQueue(label: "D")
let queueE = DispatchQueue(label: "E")
let queueF = DispatchQueue(label: "F")
groupD.enter()
queueA.async {
self.execTask("A")
groupD.leave()
}
groupD.enter()
groupE.enter()
queueB.async {
self.execTask("B")
groupD.leave()
groupE.leave()
}
groupE.enter()
queueC.async {
self.execTask("C")
groupE.leave()
}
groupF.enter()
groupD.notify(queue: DispatchQueue.global()) {
// A、B任务已完成,开始D任务
queueD.async {
self.execTask("D")
groupF.leave()
}
}
groupF.enter()
groupE.notify(queue: DispatchQueue.global()) {
// B、C任务已完成,开始E任务
queueE.async {
self.execTask("E")
groupF.leave()
}
}
groupFinish.enter()
groupF.notify(queue: DispatchQueue.global()) {
// D、E任务已完成,开始F任务
queueF.async {
self.execTask("F")
groupFinish.leave()
}
}
groupFinish.notify(queue: DispatchQueue.global()) {
self.printf("\nFinished\n")
}
}
三、通过信号量控制异步任务调度
/// 通过信号量控制异步任务调度
func testDispatchSemaphore() {
let value = 0
let semaphoreD = DispatchSemaphore(value: value) // A、B -> D
let semaphoreE = DispatchSemaphore(value: value) // B、C -> E
let semaphoreF = DispatchSemaphore(value: value) // E、D -> F
let semaphoreG = DispatchSemaphore(value: value) // 完成
// 创建异步任务
let queueA = DispatchQueue(label: "A")
let queueB = DispatchQueue(label: "B")
let queueC = DispatchQueue(label: "C")
let queueD = DispatchQueue(label: "D")
let queueE = DispatchQueue(label: "E")
let queueF = DispatchQueue(label: "F")
queueA.async {
self.execTask("A")
}
queueB.async {
self.execTask("B")
semaphoreD.signal()
}
queueC.async {
self.execTask("C")
semaphoreE.signal()
}
semaphoreD.wait()
queueD.async {
Thread.sleep(forTimeInterval: 2)
self.execTask("D")
}
semaphoreE.wait()
queueE.async {
Thread.sleep(forTimeInterval: 2)
self.execTask("E")
semaphoreF.signal()
}
semaphoreF.wait()
queueF.async {
self.execTask("F")
semaphoreG.signal()
}
semaphoreG.wait()
printf("\nFinished\n")
}
四、ReactiveSwift信号流
typealias SignalProducerHandler = SignalProducer<String, Error>
/// 通过ReactiveSwift信号流控制异步任务调度
private func testSignalProducer() {
/// 创建异步任务
let signalA = setupSignal("A")
let signalB = setupSignal("B")
let signalC = setupSignal("C")
let signalD = setupSignal("D")
let signalE = setupSignal("E")
let signalF = setupSignal("F")
/// A、B、C并发请求
SignalProducer.merge(signalA, signalB, signalC).collect().on(
failed: { error in
self.printf("\(error)")
}
).flatMap(.concat, { (_) -> SignalProducer<[String], Error> in
// D、E并发请求
return SignalProducer.merge(signalD, signalE).collect().on(
failed: { error in
self.printf("\(error)")
}
)
}).flatMap(.concat) { (data) -> SignalProducerHandler in
// 请求F
return signalF
}.on(
failed: { error in
self.printf("\(error)")
},
completed: {
self.printf("\nFinished\n")
},
value: { data in
self.printf("\ndata: \(data)\n")
}
).start()
}
问题:
这里实现的是【A、B、C并发请求】,成功后再串联【D、E并发请求】的结果,最后再请求F,与题目中的“
整体要尽快执行完成
”描述不太相符,目前没有想到怎么控制请求B重复请求的办法,做标记还是用其他方法,待求解。。。
私有成员
private let _time = 3
// mock异步任务
private func execTask(_ taskName: String) {
for _ in 0..<_time { printf(taskName) }
}
/// 打印log
private func printf(_ items: String) {
print(items, terminator: "\t")
}
// 创建异步请求信号
private func setupSignal(_ taskName: String) -> SignalProducerHandler {
return SignalProducerHandler() {[weak self] sink, _ in
DispatchQueue(label: taskName).async {
self?.execTask(taskName)
}
sink.send(value: taskName)
sink.sendCompleted()
}
}
参考log输出
A A A C C C B B B E D D D E E F F F
Finished
A A A B B B C C C D D D E E E F F F
Finished
B C A A A C C B B D E D E E D F F F
Finished