Swift | GCD Review

1. 并发队列 + 同步任务

注意是主线程执行,要避免UI堵塞问题

let queue = DispatchQueue(label: "syncConcurrent", attributes: .concurrent)
        queue.sync {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Sync 1 \(i) [\(Thread.current)]")
            }
        }
        queue.sync {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Sync 2 \(i) [\(Thread.current)]")
            }
        }
        queue.sync {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Sync 3 \(i) [\(Thread.current)]")
            }
        }
        print("All done")

输出:只有一条线程(主线程),所有任务依次有序执行

Sync 1 0 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 1 1 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 1 2 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 2 0 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 2 1 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 2 2 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 3 0 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 3 1 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 3 2 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
All done

2. 并发队列 + 异步执行

let queue = DispatchQueue(label: "asyncConcurrent", attributes: .concurrent)
        queue.async {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Async 1 \(i) [\(Thread.current)]")
            }
        }
        queue.async {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Async 2 \(i) [\(Thread.current)]")
            }
        }
        queue.async {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Async 3 \(i) [\(Thread.current)]")
            }
        }
        print("All done")

输出:多(3)条线程,线程间(无序)交替执行加入队列中的任务(有序)

All done
Async 3 0 [<NSThread: 0x600002b7e000>{number = 6, name = (null)}]
Async 2 0 [<NSThread: 0x600002b702c0>{number = 3, name = (null)}]
Async 1 0 [<NSThread: 0x600002b7e4c0>{number = 7, name = (null)}]
Async 2 1 [<NSThread: 0x600002b702c0>{number = 3, name = (null)}]
Async 3 1 [<NSThread: 0x600002b7e000>{number = 6, name = (null)}]
Async 1 1 [<NSThread: 0x600002b7e4c0>{number = 7, name = (null)}]
Async 2 2 [<NSThread: 0x600002b702c0>{number = 3, name = (null)}]
Async 1 2 [<NSThread: 0x600002b7e4c0>{number = 7, name = (null)}]
Async 3 2 [<NSThread: 0x600002b7e000>{number = 6, name = (null)}]

3. 串行队列 + 同步任务

let queue = DispatchQueue(label: "syncSerial")
        queue.sync {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Sync Serial: 1 \(i) [\(Thread.current)]")
            }
        }
        queue.sync {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Sync Serial: 2 \(i) [\(Thread.current)]")
            }
        }
        queue.sync {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Sync Serial: 3 \(i) [\(Thread.current)]")
            }
        }
        print("All done")

输出:主线程+任务有序

Sync Serial: 1 0 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 1 1 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 1 2 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 2 0 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 2 1 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 2 2 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 3 0 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 3 1 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 3 2 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
All done

4. 串行队列 + 异步任务

let queue = DispatchQueue(label: "asyncSerial")
        queue.async {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Async Serial: 1 \(i) [\(Thread.current)]")
            }
        }
        queue.async {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Async Serial: 2 \(i) [\(Thread.current)]")
            }
        }
        queue.async {
            for i in 0...2 {
                Thread.sleep(forTimeInterval: 0.5)
                print("Async Serial: 3 \(i) [\(Thread.current)]")
            }
        }
        print("All done")

输出: 一条线程(非主线程)+任务有序

Async Serial: 1 0 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 1 1 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 1 2 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 2 0 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 2 1 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 2 2 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 3 0 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 3 1 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 3 2 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]

5. GCD Group

func request(task: Int, time: TimeInterval, completion: @escaping ()->Void) {
        DispatchQueue.global(qos: .background).async {
            print("Requesting task: \(task) ... started")
            Thread.sleep(forTimeInterval: time)
            print("Requesting task: \(task) ... finished")
            DispatchQueue.main.async(execute: completion)
        }
    }

let group = DispatchGroup()
        for i in 1...3 {
            group.enter()
            request(task: i, time: Double(3-i)) {
                group.leave()
            }
        }
        group.notify(queue: .main) { [weak self] in
            print("All requests are finished!")
            self?.view.backgroundColor = .systemGreen
        }

输出:

Requesting task: 1 ... started
Requesting task: 3 ... started
Requesting task: 2 ... started
Requesting task: 3 ... finished
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!

参考:https://blog.csdn.net/Hello_Hwc/article/details/54293280

wait()阻塞group线程

for i in 1...3 {
/// 等待一秒再执行下一个任务
            let _ = group.wait(timeout: .now() + .seconds(1))
            
            group.enter()
            request(task: i, time: Double(3-i)) {
                group.leave()
            }
        }

输出:

Requesting task: 1 ... started
Requesting task: 2 ... started
Requesting task: 3 ... started
Requesting task: 3 ... finished
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!

串行队列执行异步group任务

let group = DispatchGroup()
        let queue = DispatchQueue(label: "requestTaskSerial", qos: .background)
        for i in 1...3 {
            group.enter()
            queue.async(group: group){ [weak self] in
                self?.request(task: i, time: 1) {
                    group.leave()
                }
            }
        }
        group.notify(queue: .main) { [weak self] in
            print("All requests are finished!")
            self?.view.backgroundColor = .systemGreen
        }

输出:

Requesting task: 1 ... started
Requesting task: 2 ... started
Requesting task: 3 ... started
Requesting task: 1 ... finished
Requesting task: 2 ... finished
Requesting task: 3 ... finished
All requests are finished!

Semaphore

DispatchSemaphore provides an efficient implementation of a traditional counting semaphore, which can be used to control access to a resource across multiple execution contexts.
DispatchSemaphore是传统计数信号量的封装,用来控制资源被多任务访问的情况。

let group = DispatchGroup()
        let queue = DispatchQueue(label: "requestTaskSerial", qos: .background, attributes: .concurrent)
/*
        // 创建一个新的信号量,参数value代表信号量资源池的初始数量。
        //    value < 0, 返回NULL
        //    value = 0, 多线程在等待某个特定线程的结束。
        //    value > 0, 资源数量,可以由多个线程使用。
*/
        let semaphore = DispatchSemaphore(value: 2)
        for i in 1...3 {
            group.enter()
            queue.async(group: group){ [weak self] in
                semaphore.wait()
                self?.request(task: i, time: Double(3-i)) {
                    group.leave()
                    semaphore.signal()
                }
            }
        }
        group.notify(queue: .main) { [weak self] in
            print("All requests are finished!")
            self?.view.backgroundColor = .systemGreen
        }

输出:同时最多只能有2个异步任务在执行

Requesting task: 3 ... started
Requesting task: 3 ... finished
Requesting task: 1 ... started
Requesting task: 2 ... started
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!

Barrier

barrier翻译过来就是屏障。在一个并行queue里,很多时候,我们提交一个新的任务需要这样做。

  • queue里已有任务执行完了新任务才开始
  • 新任务开始后提交的任务都要等待新任务执行完毕才能继续执行

特别注意:

以barrier flag提交的任务能够保证其在并行队列执行的时候,是唯一的一个任务。(只对自己创建的队列有效,对gloablQueue无效)

let group = DispatchGroup()
        let queue = DispatchQueue(label: "requestTaskSerial", qos: .background, attributes: .concurrent)
        
        for i in 1...3 {
            group.enter()
            queue.async(flags: .barrier){
                print("Requesting task: \(i) ... started")
                Thread.sleep(forTimeInterval: Double(3-i))
                print("Requesting task: \(i) ... finished")
                group.leave()
            }
        }
        group.notify(queue: .main) { [weak self] in
            print("All requests are finished!")
            self?.view.backgroundColor = .systemGreen
        }

输出:类似同步任务顺序执行

Requesting task: 1 ... started
Requesting task: 1 ... finished
Requesting task: 2 ... started
Requesting task: 2 ... finished
Requesting task: 3 ... started
Requesting task: 3 ... finished
All requests are finished!

let queue = DispatchQueue(label: "requestTaskSerial", qos: .background, attributes: .concurrent)
        
        for i in 1...3 {
            group.enter()
            queue.async(flags: .barrier){ [weak self] in
                self?.request(task: i, time: Double(3-i), completion: {
                    group.leave()
                })
            }
        }
        group.notify(queue: .main) { [weak self] in
            print("All requests are finished!")
            self?.view.backgroundColor = .systemGreen
        }

输出:异步任务顺序执行

Requesting task: 3 ... started
Requesting task: 3 ... finished
Requesting task: 2 ... started
Requesting task: 1 ... started
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!

Operation & OperationQueue

Operation

自定义同步operation只需要重写main()方法,然后当main方法体执行完后,就isFinished,一般用于同步操作(如数据读取)

自定义异步operation时,需要手动维护isFinishedisExecuting等状态,所以较为复杂

class RequestOperation: Operation {
    var task: Int
    var time: TimeInterval
    
    init (task: Int, time: TimeInterval, completion: @escaping ()->Void) {
        self.task = task
        self.time = time
        
        super.init()
        self.completionBlock = completion
    }
    
    private var _finished: Bool = false {
        willSet{
            willChangeValue(forKey: "isFinished")
        }
        didSet{
            didChangeValue(forKey: "isFinished")
        }
    }
    private var _executing: Bool = false {
        willSet{
            willChangeValue(forKey: "isExecuting")
        }
        didSet{
            didChangeValue(forKey: "isExecuting")
        }
    }
    
    override var isFinished: Bool {
        return _finished
    }
    override var isExecuting: Bool {
        return _executing
    }
    override var isAsynchronous: Bool {
        return true
    }
    
    private func done() {
        super.cancel()
        
        _executing = false
        _finished = true
        completionBlock?()
    }
    
// 注意加锁,避免多次调用
    override func cancel() {
        objc_sync_enter(self)
        done()
        objc_sync_exit(self)
    }
    
    override func start() {
        guard !isCancelled else {
            done()
            return
        }
        _executing = true
        request(task: task, time: time) { [weak self] in
            self?.done()
        }
    }
}

使用:

func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
    DispatchQueue.global().async {
        print("Requesting task: \(task) ... started")
        Thread.sleep(forTimeInterval: time)
        print("Requesting task: \(task) ... finished")
        DispatchQueue.main.async(execute: completion ?? {})
    }
}

@IBAction func startOneByOneRequesting(_ sender: UIControl) {
        let queue = OperationQueue()
        
        var previousOperation: Operation?
        for i in 1...3 {
            let op = RequestOperation(task: i, time: Double(3-i), completion: {
                print("Operation \(i) is completed!")
                if i == 3 {
                    print("All tasks are done !!!")
                }
            })
            if let previous = previousOperation {
                op.addDependency(previous)
            }
            queue.addOperation(op)
            previousOperation = op
        }
    }

输出:

Requesting task: 1 ... started
Requesting task: 1 ... finished
Operation 1 is completed!
Requesting task: 2 ... started
Requesting task: 2 ... finished
Operation 2 is completed!
Requesting task: 3 ... started
Requesting task: 3 ... finished
Operation 3 is completed!
All tasks are done !!!

OperationQueue

  • 最大并发操作数:maxConcurrentOperationCount

    • maxConcurrentOperationCount 默认情况下为-1,表示不进行限制,可进行并发执行。
    • maxConcurrentOperationCount 为1时,队列为串行队列。只能串行执行。
  • queuePriority 属性决定了进入准备就绪状态下的操作之间的开始执行顺序。并且,优先级不能取代依赖关系。

  • 线程间的通信

在 iOS 开发过程中,我们一般在主线程里边进行 UI 刷新,例如:点击、滚动、拖拽等事件。我们通常把一些耗时的操作放在其他线程,比如说图片下载、文件上传等耗时操作。而当我们有时候在其他线程完成了耗时操作时,需要回到主线程,那么就用到了线程之间的通讯。

  • 线程同步:

    • 可理解为线程 A 和 线程 B 一块配合,A 执行到一定程度时要依靠线程 B 的某个结果,于是停下来,示意 B 运行;B 依言执行,再将结果给 A;A 再继续操作。
    • 若每个线程中对全局变量、静态变量只有读操作,而无写操作,一般来说,这个全局变量是线程安全的;若有多个线程同时执行写操作(更改变量),一般都需要考虑线程同步,否则的话就可能影响线程安全。
  • 线程安全:

如果你的代码所在的进程中有多个线程在同时运行,而这些线程可能会同时运行这段代码。如果每次运行结果和单线程运行的结果是一样的,而且其他的变量的值也和预期的是一样的,就是线程安全的。

  • 线程安全解决方案:
    • 可以给线程加锁,在一个线程执行该操作的时候,不允许其他线程进行操作。iOS 实现线程加锁有很多种方式。@synchronized、 NSLock、NSRecursiveLock、NSCondition、NSConditionLock、pthread_mutex、dispatch_semaphore、OSSpinLock、atomic(property) set/ge等等各种方式。这里我们使用 NSLock 对象来解决线程同步问题。NSLock 对象可以通过进入锁时调用 lock 方法,解锁时调用 unlock 方法来保证线程安全。

var ticketSurplusCount = 50
    override func viewDidLoad() {
        super.viewDidLoad()
        
        ///1.1 创建代表北京火车票售卖窗口
        let operationForBeiJing = OperationQueue()
        operationForBeiJing.maxConcurrentOperationCount = 1;
        ///1.2 创建卖票操作 op1
        let op1 = BlockOperation{ 
            self.saleTicketSafe()
        }
        ///1.3 添加操作
        operationForBeiJing.addOperation(op1)
        
        ///2.1创建代表上海火车票售卖窗口
        let operationForShangHai = OperationQueue()
        operationForShangHai.maxConcurrentOperationCount = 1;
        ///2.2创建卖票操作 op2
        let op2 = BlockOperation{
            self.saleTicketSafe()
        }
        ///2.3 添加操作
        operationForShangHai.addOperation(op2)
    }

private func saleTicketSafe(){
        while true {
            objc_sync_enter(self)
            if self.ticketSurplusCount > 0 {
                self.ticketSurplusCount-=1;
                print("剩余票数:\(self.ticketSurplusCount) 窗口:\(Thread.current)")
                sleep(2)
            }
            objc_sync_exit(self)
            
            if self.ticketSurplusCount <= 0 {
                print("所有火车票均已售完")
                break
            }
        }
    }

迭代任务

如果一个任务可以分解为多个相似但独立的子任务,那么迭代任务是提高性能最适合的选择。

let queue = DispatchQueue.global() // 全局并发队列
queue.async {
    DispatchQueue.concurrentPerform(iterations: 100) {(index) -> Void in
        // do something
    }
    //可以转至主线程执行其他任务
    DispatchQueue.main.async {
        // do something
    }
}
  • 示例

    var divideds = [Int]()
    lazy var nums = Array(1...100000)
    
    @IBAction func syncConcurrent(_ sender: UIControl) {
        let queue = DispatchQueue.global(qos: .userInitiated)
        
        func isNumber(_ value: Int, canBeDividedBy divisor: Int) -> Bool {
            guard divisor != 0 else {
                fatalError("Divisor can'not be zero!")
            }
            return value % divisor == 0
        }
        
        queue.async {
            print("Started \(Date().timeIntervalSince1970)")
            DispatchQueue.concurrentPerform(iterations: self.nums.count) { index in
                let number = self.nums[index]
                if isNumber(number, canBeDividedBy: 13) {
//                    print("Current Thread: \(Thread.current)")
                    DispatchQueue.main.async {
                        self.divideds.append(number)
                    }
                }
            }
            
            print("Ended \(Date().timeIntervalSince1970)")
            DispatchQueue.main.async {
                print(self.divideds.map{ String($0) }.joined(separator: ", "))
            }
        }
        
    }

1kw数计算耗时
Start 1648028143.466887
Ended 1648028153.098256

DispatchSource

GCD 中提供了一个 DispatchSource 类,它可以帮你监听系统底层一些对象的活动,例如这些对象: Mach port、Unix descriptor、Unix signal、VFS node,并允许你在这些活动发生时,向队列提交一个任务以进行异步处理。

  • 示例:监听指定目录下文件变化(增、删)
class DispatchSourceTest {
    var filePath: String
    var counter = 0
    let queue = DispatchQueue.global()
    
    init() {
        filePath = "\(NSTemporaryDirectory())"
        startObserve {
            print("File was changed")
        }
    }
    
    func startObserve(closure: @escaping () -> Void) {
        let fileURL = URL(fileURLWithPath: filePath)
        let monitoredDirectoryFileDescriptor = open(fileURL.path, O_EVTONLY)
        
        let source = DispatchSource.makeFileSystemObjectSource(
            fileDescriptor: monitoredDirectoryFileDescriptor,
            eventMask: .write, queue: queue)
        source.setEventHandler(handler: closure)
        source.setCancelHandler {
            close(monitoredDirectoryFileDescriptor)
        }
        source.resume()
    }
    
    func changeFile() {
        DispatchSourceTest.createFile(name: "DispatchSourceTest.md", filePath: NSTemporaryDirectory())
        counter += 1
        let text = "\(counter)"
        try! text.write(toFile: "\(filePath)/DispatchSourceTest.md", atomically: true, encoding: String.Encoding.utf8)
        print("file writed.")
    }
    
    static func createFile(name: String, filePath: String){
        let manager = FileManager.default
        let fileBaseUrl = URL(fileURLWithPath: filePath)
        let file = fileBaseUrl.appendingPathComponent(name)
        print("文件: \(file)")
        
        // 写入 "hello world"
        let exist = manager.fileExists(atPath: file.path)
        if !exist {
            let data = Data(base64Encoded:"aGVsbG8gd29ybGQ=" ,options:.ignoreUnknownCharacters)
            let createSuccess = manager.createFile(atPath: file.path,contents:data,attributes:nil)
            print("文件创建结果: \(createSuccess)")
        }
    }
}

DispatchIO

DispatchIO 对象提供一个操作文件描述符的通道。简单讲你可以利用多线程异步高效地读写文件。

发起读写操作一般步骤如下:

  • 创建 DispatchIO 对象,或者说创建一个通道,并设置结束处理闭包。
  • 调用 read / write 方法
  • 调用 close 方法关闭通道
  • 在 close 方法后系统将自动调用结束处理闭包
let filePath: NSString = "test.zip"
// 创建一个可读写的文件描述符
let fileDescriptor = open(filePath.utf8String!, (O_RDWR | O_CREAT | O_APPEND), (S_IRWXU | S_IRWXG))
let queue = DispatchQueue(label: "com.sinkingsoul.DispatchQueueTest.serialQueue")
let cleanupHandler: (Int32) -> Void = { errorNumber in
}
let io = DispatchIO(type: .stream, fileDescriptor: fileDescriptor, queue: queue, cleanupHandler: cleanupHandler)
  • 设置阈值
io.setLimit(highWater: 1024*1024)
io.setLimit(lowWater: 1024*1024)
  • 读操作
io.read(offset: 0, length: Int.max, queue: ioReadQueue) 
{ doneReading, data, error in 
    if (error > 0) {
            print("读取发生错误了,错误码:\(error)")
            return
        }
    if (data != nil) {
            // 使用数据
       }
    if (doneReading) {
           ioRead.close()
       }
}
  • 写操作
io.write(offset: 0, data: data!, queue: ioWriteQueue)
 { doneWriting, data, error in 
    if (error > 0) {
        print("写入发生错误了,错误码:\(error)")
        return
    }
    if doneWriting {
        //...
        ioWrite.close()
    }
}
  • 示例:大文件合并
class DispatchIOTest {
    /// 利用很小的内存空间及同一队列读写方式合并文件
    static func combineFileWithOneQueue() {
        let files: NSArray = ["/Users/xxx/Downloads/gcd.mp4.zip.001",
                              "/Users/xxx/Downloads/gcd.mp4.zip.002"]
        let outFile: NSString = "/Users/xxx/Downloads/gcd.mp4.zip"
        let ioQueue = DispatchQueue(
            label: "com.sinkingsoul.DispatchQueueTest.serialQueue")
        let queueGroup = DispatchGroup()
        
        let ioWriteCleanupHandler: (Int32) -> Void = { errorNumber in
            print("写入文件完成 @\(Date())。")
        }
        
        let ioReadCleanupHandler: (Int32) -> Void = { errorNumber in
            print("读取文件完成。")
        }
        
        let ioWrite = DispatchIO(type: .stream,
                                 path: outFile.utf8String!,
                                 oflag: (O_RDWR | O_CREAT | O_APPEND),
                                 mode: (S_IRWXU | S_IRWXG),
                                 queue: ioQueue,
                                 cleanupHandler: ioWriteCleanupHandler)
        ioWrite?.setLimit(highWater: 1024*1024)
        
//        print("开始操作 @\(Date()).")
        
        files.enumerateObjects { fileName, index, stop in
            if stop.pointee.boolValue {
                return
            }
            queueGroup.enter()
            
            let ioRead = DispatchIO(type: .stream,
                                    path: (fileName as! NSString).utf8String!,
                                    oflag: O_RDONLY,
                                    mode: 0,
                                    queue: ioQueue,
                                    cleanupHandler: ioReadCleanupHandler)
            ioRead?.setLimit(highWater: 1024*1024)
            
            print("开始读取文件: \(fileName) 的数据")
            
            ioRead?.read(offset: 0, length: Int.max, queue: ioQueue) { doneReading, data, error in
                print("当前读线程:\(Thread.current)--->")
                if (error > 0 || stop.pointee.boolValue) {
                    print("读取发生错误了,错误码:\(error)")
                    ioWrite?.close()
                    stop.pointee = true
                    return
                }
                
                if (data != nil) {
                    let bytesRead: size_t = data!.count
                    if (bytesRead > 0) {
                        queueGroup.enter()
                        ioWrite?.write(offset: 0, data: data!, queue: ioQueue) {
                            doneWriting, data, error in
                            print("当前写线程:\(Thread.current)--->")
                            if (error > 0 || stop.pointee.boolValue) {
                                print("写入发生错误了,错误码:\(error)")
                                ioRead?.close()
                                stop.pointee = true
                                queueGroup.leave()
                                return
                            }
                            if doneWriting {
                                queueGroup.leave()
                            }
                            print("--->当前写线程:\(Thread.current)")
                        }
                    }
                }
                
                if (doneReading) {
                    ioRead?.close()
                    if (files.count == (index+1)) {
                        ioWrite?.close()
                    }
                    queueGroup.leave()
                }
                print("--->当前读线程:\(Thread.current)")
            }
            _ = queueGroup.wait(timeout: .distantFuture)
        }
    }
}

可以发现在读写过程中额外占用了 3M 左右内存,用时 2s 左右。这个结果中,内存占用比单队列大(这个比较好理解),但速度还更慢了,性能瓶颈很有可能是在磁盘读写上。所以涉及文件写操作时,并不是线程越多越快,要考虑传输速度、文件大小等因素。

DispatchData

DispatchData 对象可以管理基于内存的数据缓冲区。这个数据缓冲区对外表现为连续的内存区域,但内部可能由多个独立的内存区域组成。
DispatchData 对象很多特性类似于 Data 对象,且 Data 对象可以转换为 DispatchData 对象,而通过 DispatchIO 的 read 方法获得的数据也是封装为 DispatchData 对象的。

  • 首先将两个文件转换为 Data 对象,再转换为 DispatchData 对象,然后拼接两个对象为一个 DispatchData 对象,最后通过 DispatchIO 的 write 方法写入文件中。看起来有多次的转换过程,实际上 Data 类型读取文件时支持虚拟隐射的方式,而 DispatchData 类型更是支持多个数据块虚拟拼接,也不占用什么内存。

  • 示例:多文件合并

/// 利用 DispatchData 类型快速合并文件
static func combineFileWithDispatchData() {
    let filePathArray = ["/Users/xxx/Downloads/gcd.mp4.zip.001",
                          "/Users/xxx/Downloads/gcd.mp4.zip.002"]
    let outputFilePath: NSString = "/Users/xxx/Downloads/gcd.mp4.zip"
    let ioWriteQueue = DispatchQueue(
        label: "com.sinkingsoul.DispatchQueueTest.serialQueue")
    
    let ioWriteCleanupHandler: (Int32) -> Void = { errorNumber in
        print("写入文件完成 @\(Date()).")
    }
    let ioWrite = DispatchIO(type: .stream,
                             path: outputFilePath.utf8String!,
                             oflag: (O_RDWR | O_CREAT | O_APPEND),
                             mode: (S_IRWXU | S_IRWXG),
                             queue: ioWriteQueue,
                             cleanupHandler: ioWriteCleanupHandler)
    ioWrite?.setLimit(highWater: 1024*1024*2)
    
    print("开始操作 @\(Date()).")
    
    // 将所有文件合并为一个 DispatchData 对象
    let dispatchData = filePathArray.reduce(DispatchData.empty) { data, filePath in
        // 将文件转换为 Data
        let url = URL(fileURLWithPath: filePath)
        let fileData = try! Data(contentsOf: url, options: .mappedIfSafe)
        var tempData = data
        // 将 Data 转换为 DispatchData
        let dispatchData = fileData.withUnsafeBytes {
            (u8Ptr: UnsafePointer<UInt8>) -> DispatchData in
            let rawPtr = UnsafeRawPointer(u8Ptr)
            let innerData = Unmanaged.passRetained(fileData as NSData)
            return DispatchData(bytesNoCopy:
                UnsafeRawBufferPointer(start: rawPtr, count: fileData.count),
                                deallocator: .custom(nil, innerData.release))
        }
        // 拼接 DispatchData
        tempData.append(dispatchData)
        return tempData
    }
    
    //将 DispatchData 对象写入结果文件中
    ioWrite?.write(offset: 0, data: dispatchData, queue: ioWriteQueue) {
        doneWriting, data, error in
        if (error > 0) {
            print("写入发生错误了,错误码:\(error)")
            return
        }
        
        if data != nil {
//                print("正在写入文件,剩余大小:\(data!.count) bytes.")
        }
        
        if (doneWriting) {
            ioWrite?.close()
        }
    }
}

可以发现在整个读写过程中几乎没有额外占用内存,速度很快在 1s 左右,这个读写方案堪称完美,这要归功于 DispatchData 的虚拟拼接和 DispatchIO 的分块读写大小控制。这里顺便提一下 DispatchIO 数据阀值上限 highWater,经过测试,如果设置为 1M,将耗时 4s 左右,设为 2M 及以上时,耗时均为 1s 左右,非常快速,而所有阀值的内存占用都很少。所以设置合理的阀值,对性能的改善也是有帮助的。

DispatchWorkItem

let workItem = DispatchWorkItem(qos: .default, flags: DispatchWorkItemFlags()) {
    // Do something
}
let queue = DispatchQueue.global()
queue.async(execute: workItem)
// 或
workItem.perform()
workItem.wait()
workItem.wait(timeout: DispatchTime) // 指定等待时间
workItem.wait(wallTimeout: DispatchWallTime) // 指定等待时间
  • 示例
func execute(work: @escaping ()->Void, completion: @escaping ()->Void) {
    let item = DispatchWorkItem(block: work)
    DispatchQueue.global().async(execute: item)
    item.wait()
    completion()
}

execute {
    print("[\(Date().timeIntervalSince1970)] I start sleeping and will finish after 5s...")
    Thread.sleep(forTimeInterval: 5)
} completion: {
    print("[\(Date().timeIntervalSince1970)] I'm done!")
}
/*
[1648089354.896827] I start sleeping and will finish after 5s...
[1648089359.901157] I'm done!
*/

参考:https://www.cnblogs.com/lxlx1798/articles/15040615.html
https://www.jianshu.com/p/52457e86cfe0

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容