1. 并发队列 + 同步任务
注意是主线程执行,要避免UI堵塞问题
let queue = DispatchQueue(label: "syncConcurrent", attributes: .concurrent)
queue.sync {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Sync 1 \(i) [\(Thread.current)]")
}
}
queue.sync {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Sync 2 \(i) [\(Thread.current)]")
}
}
queue.sync {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Sync 3 \(i) [\(Thread.current)]")
}
}
print("All done")
输出:只有一条线程(主线程),所有任务依次有序执行
Sync 1 0 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 1 1 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 1 2 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 2 0 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 2 1 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 2 2 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 3 0 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 3 1 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
Sync 3 2 [<_NSMainThread: 0x600001d18380>{number = 1, name = main}]
All done
2. 并发队列 + 异步执行
let queue = DispatchQueue(label: "asyncConcurrent", attributes: .concurrent)
queue.async {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Async 1 \(i) [\(Thread.current)]")
}
}
queue.async {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Async 2 \(i) [\(Thread.current)]")
}
}
queue.async {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Async 3 \(i) [\(Thread.current)]")
}
}
print("All done")
输出:多(3)条线程,线程间(无序)交替执行加入队列中的任务(有序)
All done
Async 3 0 [<NSThread: 0x600002b7e000>{number = 6, name = (null)}]
Async 2 0 [<NSThread: 0x600002b702c0>{number = 3, name = (null)}]
Async 1 0 [<NSThread: 0x600002b7e4c0>{number = 7, name = (null)}]
Async 2 1 [<NSThread: 0x600002b702c0>{number = 3, name = (null)}]
Async 3 1 [<NSThread: 0x600002b7e000>{number = 6, name = (null)}]
Async 1 1 [<NSThread: 0x600002b7e4c0>{number = 7, name = (null)}]
Async 2 2 [<NSThread: 0x600002b702c0>{number = 3, name = (null)}]
Async 1 2 [<NSThread: 0x600002b7e4c0>{number = 7, name = (null)}]
Async 3 2 [<NSThread: 0x600002b7e000>{number = 6, name = (null)}]
3. 串行队列 + 同步任务
let queue = DispatchQueue(label: "syncSerial")
queue.sync {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Sync Serial: 1 \(i) [\(Thread.current)]")
}
}
queue.sync {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Sync Serial: 2 \(i) [\(Thread.current)]")
}
}
queue.sync {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Sync Serial: 3 \(i) [\(Thread.current)]")
}
}
print("All done")
输出:主线程+任务有序
Sync Serial: 1 0 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 1 1 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 1 2 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 2 0 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 2 1 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 2 2 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 3 0 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 3 1 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
Sync Serial: 3 2 [<_NSMainThread: 0x6000005f4000>{number = 1, name = main}]
All done
4. 串行队列 + 异步任务
let queue = DispatchQueue(label: "asyncSerial")
queue.async {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Async Serial: 1 \(i) [\(Thread.current)]")
}
}
queue.async {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Async Serial: 2 \(i) [\(Thread.current)]")
}
}
queue.async {
for i in 0...2 {
Thread.sleep(forTimeInterval: 0.5)
print("Async Serial: 3 \(i) [\(Thread.current)]")
}
}
print("All done")
输出: 一条线程(非主线程)+任务有序
Async Serial: 1 0 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 1 1 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 1 2 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 2 0 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 2 1 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 2 2 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 3 0 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 3 1 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
Async Serial: 3 2 [<NSThread: 0x600000869a00>{number = 6, name = (null)}]
5. GCD Group
func request(task: Int, time: TimeInterval, completion: @escaping ()->Void) {
DispatchQueue.global(qos: .background).async {
print("Requesting task: \(task) ... started")
Thread.sleep(forTimeInterval: time)
print("Requesting task: \(task) ... finished")
DispatchQueue.main.async(execute: completion)
}
}
let group = DispatchGroup()
for i in 1...3 {
group.enter()
request(task: i, time: Double(3-i)) {
group.leave()
}
}
group.notify(queue: .main) { [weak self] in
print("All requests are finished!")
self?.view.backgroundColor = .systemGreen
}
输出:
Requesting task: 1 ... started
Requesting task: 3 ... started
Requesting task: 2 ... started
Requesting task: 3 ... finished
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!
wait()
阻塞group
线程
for i in 1...3 {
/// 等待一秒再执行下一个任务
let _ = group.wait(timeout: .now() + .seconds(1))
group.enter()
request(task: i, time: Double(3-i)) {
group.leave()
}
}
输出:
Requesting task: 1 ... started
Requesting task: 2 ... started
Requesting task: 3 ... started
Requesting task: 3 ... finished
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!
串行队列执行异步group任务
let group = DispatchGroup()
let queue = DispatchQueue(label: "requestTaskSerial", qos: .background)
for i in 1...3 {
group.enter()
queue.async(group: group){ [weak self] in
self?.request(task: i, time: 1) {
group.leave()
}
}
}
group.notify(queue: .main) { [weak self] in
print("All requests are finished!")
self?.view.backgroundColor = .systemGreen
}
输出:
Requesting task: 1 ... started
Requesting task: 2 ... started
Requesting task: 3 ... started
Requesting task: 1 ... finished
Requesting task: 2 ... finished
Requesting task: 3 ... finished
All requests are finished!
Semaphore
DispatchSemaphore provides an efficient implementation of a traditional counting semaphore, which can be used to control access to a resource across multiple execution contexts.
DispatchSemaphore是传统计数信号量的封装,用来控制资源被多任务访问的情况。
let group = DispatchGroup()
let queue = DispatchQueue(label: "requestTaskSerial", qos: .background, attributes: .concurrent)
/*
// 创建一个新的信号量,参数value代表信号量资源池的初始数量。
// value < 0, 返回NULL
// value = 0, 多线程在等待某个特定线程的结束。
// value > 0, 资源数量,可以由多个线程使用。
*/
let semaphore = DispatchSemaphore(value: 2)
for i in 1...3 {
group.enter()
queue.async(group: group){ [weak self] in
semaphore.wait()
self?.request(task: i, time: Double(3-i)) {
group.leave()
semaphore.signal()
}
}
}
group.notify(queue: .main) { [weak self] in
print("All requests are finished!")
self?.view.backgroundColor = .systemGreen
}
输出:同时最多只能有2个异步任务在执行
Requesting task: 3 ... started
Requesting task: 3 ... finished
Requesting task: 1 ... started
Requesting task: 2 ... started
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!
Barrier
barrier翻译过来就是屏障。在一个并行queue里,很多时候,我们提交一个新的任务需要这样做。
- queue里已有任务执行完了新任务才开始
- 新任务开始后提交的任务都要等待新任务执行完毕才能继续执行
特别注意:
以barrier flag提交的任务能够保证其在并行队列执行的时候,是唯一的一个任务。
(只对自己创建的队列有效,对gloablQueue无效)
let group = DispatchGroup()
let queue = DispatchQueue(label: "requestTaskSerial", qos: .background, attributes: .concurrent)
for i in 1...3 {
group.enter()
queue.async(flags: .barrier){
print("Requesting task: \(i) ... started")
Thread.sleep(forTimeInterval: Double(3-i))
print("Requesting task: \(i) ... finished")
group.leave()
}
}
group.notify(queue: .main) { [weak self] in
print("All requests are finished!")
self?.view.backgroundColor = .systemGreen
}
输出:类似同步任务顺序执行
Requesting task: 1 ... started
Requesting task: 1 ... finished
Requesting task: 2 ... started
Requesting task: 2 ... finished
Requesting task: 3 ... started
Requesting task: 3 ... finished
All requests are finished!
let queue = DispatchQueue(label: "requestTaskSerial", qos: .background, attributes: .concurrent)
for i in 1...3 {
group.enter()
queue.async(flags: .barrier){ [weak self] in
self?.request(task: i, time: Double(3-i), completion: {
group.leave()
})
}
}
group.notify(queue: .main) { [weak self] in
print("All requests are finished!")
self?.view.backgroundColor = .systemGreen
}
输出:异步任务顺序执行
Requesting task: 3 ... started
Requesting task: 3 ... finished
Requesting task: 2 ... started
Requesting task: 1 ... started
Requesting task: 2 ... finished
Requesting task: 1 ... finished
All requests are finished!
Operation & OperationQueue
Operation
自定义
同步
operation只需要重写main()
方法,然后当main方法体执行完后,就isFinished
,一般用于同步操作(如数据读取)
自定义
异步
operation时,需要手动维护isFinished
、isExecuting
等状态,所以较为复杂
class RequestOperation: Operation {
var task: Int
var time: TimeInterval
init (task: Int, time: TimeInterval, completion: @escaping ()->Void) {
self.task = task
self.time = time
super.init()
self.completionBlock = completion
}
private var _finished: Bool = false {
willSet{
willChangeValue(forKey: "isFinished")
}
didSet{
didChangeValue(forKey: "isFinished")
}
}
private var _executing: Bool = false {
willSet{
willChangeValue(forKey: "isExecuting")
}
didSet{
didChangeValue(forKey: "isExecuting")
}
}
override var isFinished: Bool {
return _finished
}
override var isExecuting: Bool {
return _executing
}
override var isAsynchronous: Bool {
return true
}
private func done() {
super.cancel()
_executing = false
_finished = true
completionBlock?()
}
// 注意加锁,避免多次调用
override func cancel() {
objc_sync_enter(self)
done()
objc_sync_exit(self)
}
override func start() {
guard !isCancelled else {
done()
return
}
_executing = true
request(task: task, time: time) { [weak self] in
self?.done()
}
}
}
使用:
func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
DispatchQueue.global().async {
print("Requesting task: \(task) ... started")
Thread.sleep(forTimeInterval: time)
print("Requesting task: \(task) ... finished")
DispatchQueue.main.async(execute: completion ?? {})
}
}
@IBAction func startOneByOneRequesting(_ sender: UIControl) {
let queue = OperationQueue()
var previousOperation: Operation?
for i in 1...3 {
let op = RequestOperation(task: i, time: Double(3-i), completion: {
print("Operation \(i) is completed!")
if i == 3 {
print("All tasks are done !!!")
}
})
if let previous = previousOperation {
op.addDependency(previous)
}
queue.addOperation(op)
previousOperation = op
}
}
输出:
Requesting task: 1 ... started
Requesting task: 1 ... finished
Operation 1 is completed!
Requesting task: 2 ... started
Requesting task: 2 ... finished
Operation 2 is completed!
Requesting task: 3 ... started
Requesting task: 3 ... finished
Operation 3 is completed!
All tasks are done !!!
OperationQueue
-
最大并发操作数:
maxConcurrentOperationCount
-
maxConcurrentOperationCount
默认情况下为-1,表示不进行限制,可进行并发执行。 -
maxConcurrentOperationCount
为1时,队列为串行队列。只能串行执行。
-
queuePriority
属性决定了进入准备就绪状态下的操作之间的开始执行顺序。并且,优先级不能取代依赖关系。线程间的通信
在 iOS 开发过程中,我们一般在主线程里边进行 UI 刷新,例如:点击、滚动、拖拽等事件。我们通常把一些耗时的操作放在其他线程,比如说图片下载、文件上传等耗时操作。而当我们有时候在其他线程完成了耗时操作时,需要回到主线程,那么就用到了线程之间的通讯。
-
线程同步:
- 可理解为线程 A 和 线程 B 一块配合,A 执行到一定程度时要依靠线程 B 的某个结果,于是停下来,示意 B 运行;B 依言执行,再将结果给 A;A 再继续操作。
- 若每个线程中对全局变量、静态变量只有读操作,而无写操作,一般来说,这个全局变量是线程安全的;若有多个线程同时执行写操作(更改变量),一般都需要考虑线程同步,否则的话就可能影响线程安全。
线程安全:
如果你的代码所在的进程中有多个线程在同时运行,而这些线程可能会同时运行这段代码。如果每次运行结果和单线程运行的结果是一样的,而且其他的变量的值也和预期的是一样的,就是线程安全的。
- 线程安全解决方案:
- 可以给线程加锁,在一个线程执行该操作的时候,不允许其他线程进行操作。iOS 实现线程加锁有很多种方式。@synchronized、 NSLock、NSRecursiveLock、NSCondition、NSConditionLock、pthread_mutex、dispatch_semaphore、OSSpinLock、atomic(property) set/ge等等各种方式。这里我们使用 NSLock 对象来解决线程同步问题。NSLock 对象可以通过进入锁时调用 lock 方法,解锁时调用 unlock 方法来保证线程安全。
var ticketSurplusCount = 50
override func viewDidLoad() {
super.viewDidLoad()
///1.1 创建代表北京火车票售卖窗口
let operationForBeiJing = OperationQueue()
operationForBeiJing.maxConcurrentOperationCount = 1;
///1.2 创建卖票操作 op1
let op1 = BlockOperation{
self.saleTicketSafe()
}
///1.3 添加操作
operationForBeiJing.addOperation(op1)
///2.1创建代表上海火车票售卖窗口
let operationForShangHai = OperationQueue()
operationForShangHai.maxConcurrentOperationCount = 1;
///2.2创建卖票操作 op2
let op2 = BlockOperation{
self.saleTicketSafe()
}
///2.3 添加操作
operationForShangHai.addOperation(op2)
}
private func saleTicketSafe(){
while true {
objc_sync_enter(self)
if self.ticketSurplusCount > 0 {
self.ticketSurplusCount-=1;
print("剩余票数:\(self.ticketSurplusCount) 窗口:\(Thread.current)")
sleep(2)
}
objc_sync_exit(self)
if self.ticketSurplusCount <= 0 {
print("所有火车票均已售完")
break
}
}
}
迭代任务
如果一个任务可以分解为多个相似但独立的子任务,那么迭代任务是提高性能最适合的选择。
let queue = DispatchQueue.global() // 全局并发队列
queue.async {
DispatchQueue.concurrentPerform(iterations: 100) {(index) -> Void in
// do something
}
//可以转至主线程执行其他任务
DispatchQueue.main.async {
// do something
}
}
- 示例
var divideds = [Int]()
lazy var nums = Array(1...100000)
@IBAction func syncConcurrent(_ sender: UIControl) {
let queue = DispatchQueue.global(qos: .userInitiated)
func isNumber(_ value: Int, canBeDividedBy divisor: Int) -> Bool {
guard divisor != 0 else {
fatalError("Divisor can'not be zero!")
}
return value % divisor == 0
}
queue.async {
print("Started \(Date().timeIntervalSince1970)")
DispatchQueue.concurrentPerform(iterations: self.nums.count) { index in
let number = self.nums[index]
if isNumber(number, canBeDividedBy: 13) {
// print("Current Thread: \(Thread.current)")
DispatchQueue.main.async {
self.divideds.append(number)
}
}
}
print("Ended \(Date().timeIntervalSince1970)")
DispatchQueue.main.async {
print(self.divideds.map{ String($0) }.joined(separator: ", "))
}
}
}
1kw数计算耗时
Start 1648028143.466887
Ended 1648028153.098256
DispatchSource
GCD 中提供了一个
DispatchSource
类,它可以帮你监听系统底层一些对象的活动,例如这些对象:Mach port、Unix descriptor、Unix signal、VFS node
,并允许你在这些活动发生时,向队列提交一个任务以进行异步处理。
- 示例:
监听指定目录下文件变化(增、删)
class DispatchSourceTest {
var filePath: String
var counter = 0
let queue = DispatchQueue.global()
init() {
filePath = "\(NSTemporaryDirectory())"
startObserve {
print("File was changed")
}
}
func startObserve(closure: @escaping () -> Void) {
let fileURL = URL(fileURLWithPath: filePath)
let monitoredDirectoryFileDescriptor = open(fileURL.path, O_EVTONLY)
let source = DispatchSource.makeFileSystemObjectSource(
fileDescriptor: monitoredDirectoryFileDescriptor,
eventMask: .write, queue: queue)
source.setEventHandler(handler: closure)
source.setCancelHandler {
close(monitoredDirectoryFileDescriptor)
}
source.resume()
}
func changeFile() {
DispatchSourceTest.createFile(name: "DispatchSourceTest.md", filePath: NSTemporaryDirectory())
counter += 1
let text = "\(counter)"
try! text.write(toFile: "\(filePath)/DispatchSourceTest.md", atomically: true, encoding: String.Encoding.utf8)
print("file writed.")
}
static func createFile(name: String, filePath: String){
let manager = FileManager.default
let fileBaseUrl = URL(fileURLWithPath: filePath)
let file = fileBaseUrl.appendingPathComponent(name)
print("文件: \(file)")
// 写入 "hello world"
let exist = manager.fileExists(atPath: file.path)
if !exist {
let data = Data(base64Encoded:"aGVsbG8gd29ybGQ=" ,options:.ignoreUnknownCharacters)
let createSuccess = manager.createFile(atPath: file.path,contents:data,attributes:nil)
print("文件创建结果: \(createSuccess)")
}
}
}
DispatchIO
DispatchIO 对象提供一个操作文件描述符的通道。简单讲你可以利用多线程异步高效地读写文件。
发起读写操作一般步骤如下:
- 创建 DispatchIO 对象,或者说创建一个通道,并设置结束处理闭包。
- 调用 read / write 方法
- 调用 close 方法关闭通道
- 在 close 方法后系统将自动调用结束处理闭包
let filePath: NSString = "test.zip"
// 创建一个可读写的文件描述符
let fileDescriptor = open(filePath.utf8String!, (O_RDWR | O_CREAT | O_APPEND), (S_IRWXU | S_IRWXG))
let queue = DispatchQueue(label: "com.sinkingsoul.DispatchQueueTest.serialQueue")
let cleanupHandler: (Int32) -> Void = { errorNumber in
}
let io = DispatchIO(type: .stream, fileDescriptor: fileDescriptor, queue: queue, cleanupHandler: cleanupHandler)
- 设置阈值
io.setLimit(highWater: 1024*1024)
io.setLimit(lowWater: 1024*1024)
- 读操作
io.read(offset: 0, length: Int.max, queue: ioReadQueue)
{ doneReading, data, error in
if (error > 0) {
print("读取发生错误了,错误码:\(error)")
return
}
if (data != nil) {
// 使用数据
}
if (doneReading) {
ioRead.close()
}
}
- 写操作
io.write(offset: 0, data: data!, queue: ioWriteQueue)
{ doneWriting, data, error in
if (error > 0) {
print("写入发生错误了,错误码:\(error)")
return
}
if doneWriting {
//...
ioWrite.close()
}
}
- 示例:
大文件合并
class DispatchIOTest {
/// 利用很小的内存空间及同一队列读写方式合并文件
static func combineFileWithOneQueue() {
let files: NSArray = ["/Users/xxx/Downloads/gcd.mp4.zip.001",
"/Users/xxx/Downloads/gcd.mp4.zip.002"]
let outFile: NSString = "/Users/xxx/Downloads/gcd.mp4.zip"
let ioQueue = DispatchQueue(
label: "com.sinkingsoul.DispatchQueueTest.serialQueue")
let queueGroup = DispatchGroup()
let ioWriteCleanupHandler: (Int32) -> Void = { errorNumber in
print("写入文件完成 @\(Date())。")
}
let ioReadCleanupHandler: (Int32) -> Void = { errorNumber in
print("读取文件完成。")
}
let ioWrite = DispatchIO(type: .stream,
path: outFile.utf8String!,
oflag: (O_RDWR | O_CREAT | O_APPEND),
mode: (S_IRWXU | S_IRWXG),
queue: ioQueue,
cleanupHandler: ioWriteCleanupHandler)
ioWrite?.setLimit(highWater: 1024*1024)
// print("开始操作 @\(Date()).")
files.enumerateObjects { fileName, index, stop in
if stop.pointee.boolValue {
return
}
queueGroup.enter()
let ioRead = DispatchIO(type: .stream,
path: (fileName as! NSString).utf8String!,
oflag: O_RDONLY,
mode: 0,
queue: ioQueue,
cleanupHandler: ioReadCleanupHandler)
ioRead?.setLimit(highWater: 1024*1024)
print("开始读取文件: \(fileName) 的数据")
ioRead?.read(offset: 0, length: Int.max, queue: ioQueue) { doneReading, data, error in
print("当前读线程:\(Thread.current)--->")
if (error > 0 || stop.pointee.boolValue) {
print("读取发生错误了,错误码:\(error)")
ioWrite?.close()
stop.pointee = true
return
}
if (data != nil) {
let bytesRead: size_t = data!.count
if (bytesRead > 0) {
queueGroup.enter()
ioWrite?.write(offset: 0, data: data!, queue: ioQueue) {
doneWriting, data, error in
print("当前写线程:\(Thread.current)--->")
if (error > 0 || stop.pointee.boolValue) {
print("写入发生错误了,错误码:\(error)")
ioRead?.close()
stop.pointee = true
queueGroup.leave()
return
}
if doneWriting {
queueGroup.leave()
}
print("--->当前写线程:\(Thread.current)")
}
}
}
if (doneReading) {
ioRead?.close()
if (files.count == (index+1)) {
ioWrite?.close()
}
queueGroup.leave()
}
print("--->当前读线程:\(Thread.current)")
}
_ = queueGroup.wait(timeout: .distantFuture)
}
}
}
可以发现在读写过程中额外占用了 3M 左右内存,用时 2s 左右。这个结果中,内存占用比单队列大(这个比较好理解),但速度还更慢了,性能瓶颈很有可能是在磁盘读写上。所以涉及文件写操作时,并不是线程越多越快,要考虑传输速度、文件大小等因素。
DispatchData
DispatchData 对象可以管理基于内存的数据缓冲区。这个数据缓冲区对外表现为连续的内存区域,但内部可能由多个独立的内存区域组成。
DispatchData 对象很多特性类似于 Data 对象,且 Data 对象可以转换为 DispatchData 对象,而通过 DispatchIO 的 read 方法获得的数据也是封装为 DispatchData 对象的。
首先将两个文件转换为 Data 对象,再转换为 DispatchData 对象,然后拼接两个对象为一个 DispatchData 对象,最后通过 DispatchIO 的 write 方法写入文件中。看起来有多次的转换过程,实际上 Data 类型读取文件时支持虚拟隐射的方式,而 DispatchData 类型更是支持多个数据块虚拟拼接,也不占用什么内存。
示例:
多文件合并
/// 利用 DispatchData 类型快速合并文件
static func combineFileWithDispatchData() {
let filePathArray = ["/Users/xxx/Downloads/gcd.mp4.zip.001",
"/Users/xxx/Downloads/gcd.mp4.zip.002"]
let outputFilePath: NSString = "/Users/xxx/Downloads/gcd.mp4.zip"
let ioWriteQueue = DispatchQueue(
label: "com.sinkingsoul.DispatchQueueTest.serialQueue")
let ioWriteCleanupHandler: (Int32) -> Void = { errorNumber in
print("写入文件完成 @\(Date()).")
}
let ioWrite = DispatchIO(type: .stream,
path: outputFilePath.utf8String!,
oflag: (O_RDWR | O_CREAT | O_APPEND),
mode: (S_IRWXU | S_IRWXG),
queue: ioWriteQueue,
cleanupHandler: ioWriteCleanupHandler)
ioWrite?.setLimit(highWater: 1024*1024*2)
print("开始操作 @\(Date()).")
// 将所有文件合并为一个 DispatchData 对象
let dispatchData = filePathArray.reduce(DispatchData.empty) { data, filePath in
// 将文件转换为 Data
let url = URL(fileURLWithPath: filePath)
let fileData = try! Data(contentsOf: url, options: .mappedIfSafe)
var tempData = data
// 将 Data 转换为 DispatchData
let dispatchData = fileData.withUnsafeBytes {
(u8Ptr: UnsafePointer<UInt8>) -> DispatchData in
let rawPtr = UnsafeRawPointer(u8Ptr)
let innerData = Unmanaged.passRetained(fileData as NSData)
return DispatchData(bytesNoCopy:
UnsafeRawBufferPointer(start: rawPtr, count: fileData.count),
deallocator: .custom(nil, innerData.release))
}
// 拼接 DispatchData
tempData.append(dispatchData)
return tempData
}
//将 DispatchData 对象写入结果文件中
ioWrite?.write(offset: 0, data: dispatchData, queue: ioWriteQueue) {
doneWriting, data, error in
if (error > 0) {
print("写入发生错误了,错误码:\(error)")
return
}
if data != nil {
// print("正在写入文件,剩余大小:\(data!.count) bytes.")
}
if (doneWriting) {
ioWrite?.close()
}
}
}
可以发现在整个读写过程中几乎没有额外占用内存,速度很快在 1s 左右,这个读写方案堪称完美,这要归功于 DispatchData 的虚拟拼接和 DispatchIO 的分块读写大小控制。这里顺便提一下 DispatchIO 数据阀值上限 highWater,经过测试,如果设置为 1M,将耗时 4s 左右,设为 2M 及以上时,耗时均为 1s 左右,非常快速,而所有阀值的内存占用都很少。所以设置合理的阀值,对性能的改善也是有帮助的。
DispatchWorkItem
let workItem = DispatchWorkItem(qos: .default, flags: DispatchWorkItemFlags()) {
// Do something
}
let queue = DispatchQueue.global()
queue.async(execute: workItem)
// 或
workItem.perform()
workItem.wait()
workItem.wait(timeout: DispatchTime) // 指定等待时间
workItem.wait(wallTimeout: DispatchWallTime) // 指定等待时间
- 示例
func execute(work: @escaping ()->Void, completion: @escaping ()->Void) {
let item = DispatchWorkItem(block: work)
DispatchQueue.global().async(execute: item)
item.wait()
completion()
}
execute {
print("[\(Date().timeIntervalSince1970)] I start sleeping and will finish after 5s...")
Thread.sleep(forTimeInterval: 5)
} completion: {
print("[\(Date().timeIntervalSince1970)] I'm done!")
}
/*
[1648089354.896827] I start sleeping and will finish after 5s...
[1648089359.901157] I'm done!
*/
参考:https://www.cnblogs.com/lxlx1798/articles/15040615.html
https://www.jianshu.com/p/52457e86cfe0