Swift 实现:三级下载队列

1. 自定义的硬队列

下载逻辑的三级队列,第一级取3个、第二季取2个、第三级取1个并追加至正在执行的队列中,依此循环,异步下载方法从正在执行的队列的头部读取任务,依次执行。

代码如下:

import Foundation
import Combine
import Moya

extension WorkPreload {
    
    class PreloadItem {
        
        var localPath: String?
        var isCompleted = false
        // var progress: Float = 0.0
        
        let url: URL
        let taskId: String
        let priority: PreloadPriority
        
        init(url: URL, taskId: String, priority: PreloadPriority) {
            self.url = url
            self.taskId = taskId
            self.priority = priority
        }
        
    }
    
}

class WorkPreload {
    
    enum PreloadPriority: Int {
        case high = 1
        case medium = 2
        case low = 3
    }
    
    static let shared = WorkPreload.init(concurrentCount: 3)
    
    @ThreadSafetyWrapper private var highList: [PreloadItem] = []
    @ThreadSafetyWrapper private var mediumList: [PreloadItem] = []
    @ThreadSafetyWrapper private var lowList: [PreloadItem] = []
    
    @ThreadSafetyWrapper private var activeDownloads: [PreloadItem] = []
    @ThreadSafetyWrapper private var concurrentCount: Int = 3
    
    @ThreadSafetyWrapper private var cyclePosition = 0
    @ThreadSafetyWrapper private var paused = false
    
    @ThreadSafetyWrapper private var cancellables: [String: AnyCancellable] = [:]
    
    init(concurrentCount: Int) {
        self.concurrentCount = concurrentCount
    }
    
    // MARK: - Add DownloadItem
    
    private func addDownloadItem(url: URL, taskId: String, priority: PreloadPriority = .medium) {
        HXLogger.info("[VideoDownloadService] [addDownloadItem] >> url: \(url), taskId: \(taskId), priority: \(priority)")
        guard !isAlreadyExist(url: url) else { return }
        
        let item = PreloadItem(url: url, taskId: taskId, priority: priority)
        switch priority {
        case .high:
            highList.append(item)
        case .medium:
            mediumList.append(item)
        case .low:
            lowList.append(item)
        }
        processNextItem()
    }
    
    private func processNextItem() {
        guard !paused, activeDownloads.count < concurrentCount else { return }
        var itemsToProcess = [PreloadItem]()
        
        switch cyclePosition {
        case 0:
            itemsToProcess = takeFromList(&highList, count: 3)
            cyclePosition = 1
        case 1:
            itemsToProcess = takeFromList(&mediumList, count: 2)
            cyclePosition = 2
        case 2:
            itemsToProcess = takeFromList(&lowList, count: 1)
            cyclePosition = 0
        default:
            break
        }
        itemsToProcess.forEach {
            executeDownload($0)
        }
    }
    
    private func takeFromList(_ list: inout [PreloadItem], count: Int) -> [PreloadItem] {
        let items = Array(list.prefix(count))
        list.removeFirst(min(count, list.count))
        return items
    }
    
    private func isAlreadyExist(url: URL) -> Bool {
        return highList.contains(where: { $0.url == url }) ||
               mediumList.contains(where: { $0.url == url }) ||
               lowList.contains(where: { $0.url == url }) ||
               activeDownloads.contains(where: { $0.url == url })
    }
    
    // MARK: - ExecuteDownload
    
    private func executeDownload(_ item: PreloadItem) {
        let cancellable = AssetLoader.shared
            .publisher(.videoURL(item.url))
            .map({ $0.path })
            .sink(receiveCompletion: { [unowned self] com in
                if case .failure = com {
                    downloadFailed(item)
                } else {
                    downloadCompleted(item)
                }
            }, receiveValue: { [unowned self] in
                item.localPath = $0
                updateTask(item)
            })
        cancellables[item.taskId] = cancellable
        activeDownloads.append(item)
    }
    
    private func downloadCompleted(_ item: PreloadItem) {
        item.isCompleted = true
        activeDownloads.removeAll { $0 === item }
        cancellables.removeValue(forKey: item.taskId)
        processNextItem()
    }
    
    private func downloadFailed(_ item: PreloadItem) {
        activeDownloads.removeAll { $0 === item }
        cancellables.removeValue(forKey: item.taskId)
        processNextItem()
    }
    
    private func updateTask(_ item: PreloadItem) {
        guard let localPath = item.localPath else {
            return
        }
        guard let task = TaskManager.shared.allTaskList.first(where: { $0.taskId == item.taskId }) else {
            return
        }
        Logger.info("[VideoDownloadService] [updateTask] >> localPath: \(localPath)")
        task.workVideoLocalPath = localPath
        task.save()
    }
    
    // MARK: - Actions
    
    func pauseSerivce() {
        paused = true
    }
    
    func resumeSerivce() {
        paused = false
        processNextItem()
    }
    
    func cancelDownload(taskId: String) {
        highList.removeAll { $0.taskId == taskId }
        mediumList.removeAll { $0.taskId == taskId }
        lowList.removeAll { $0.taskId == taskId }
        activeDownloads.removeAll { $0.taskId == taskId }
        let cancellable = cancellables[taskId]
        cancellables.removeValue(forKey: taskId)
        cancellable?.cancel()
    }
    
}

注意:上面逻辑中的三级队列,如果某个队列为空时,容易停止,怎么办呢? 如下:

private func processNextItem() {
        let listEmpty = highList.isEmpty && mediumList.isEmpty && lowList.isEmpty
        guard !listEmpty else {
            Logger.info("[WorkPreload] [processNextItem] >> listEmpty")
            return
        }
        guard !paused, activeDownloads.count < concurrentCount else {
            Logger.info("[WorkPreload] [processNextItem] >> waiting")
            return
        }
        
        var itemsToProcess: [PreloadItem] = []
        switch cyclePosition {
        case 0:
            itemsToProcess = takeFromList(&highList, count: 3)
            cyclePosition = 1
        case 1:
            itemsToProcess = takeFromList(&mediumList, count: 2)
            cyclePosition = 2
        case 2:
            itemsToProcess = takeFromList(&lowList, count: 1)
            cyclePosition = 0
        default:
            break
        }
        
        if !itemsToProcess.isEmpty {
            itemsToProcess.forEach { executeDownload($0) }
        } else {
            processNextItem()
        }
    }

2. 基于Operation实现的三级队列

import Foundation

// 定义下载操作
class DownloadOperation: Operation {
    let url: URL

    init(url: URL) {
        self.url = url
    }

    override func main() {
        if isCancelled {
            return
        }

        // 模拟下载任务
        downloadFile(from: url)
    }

    private func downloadFile(from url: URL) {
        print("Downloading from \(url)")
        // 模拟下载时间
        sleep(2)
        print("Finished downloading from \(url)")
    }
}

// 创建下载队列管理器
class DownloadQueueManager {
    static let shared = DownloadQueueManager()

    private let highPriorityQueue: OperationQueue
    private let mediumPriorityQueue: OperationQueue
    private let lowPriorityQueue: OperationQueue

    private init() {
        highPriorityQueue = OperationQueue()
        mediumPriorityQueue = OperationQueue()
        lowPriorityQueue = OperationQueue()

        highPriorityQueue.qualityOfService = .userInitiated
        mediumPriorityQueue.qualityOfService = .utility
        lowPriorityQueue.qualityOfService = .background
    }

    func addDownload(url: URL, priority: DownloadPriority) {
        let operation = DownloadOperation(url: url)
        switch priority {
        case .high:
            highPriorityQueue.addOperation(operation)
        case .medium:
            mediumPriorityQueue.addOperation(operation)
        case .low:
            lowPriorityQueue.addOperation(operation)
        }
    }
}

enum DownloadPriority {
    case high
    case medium
    case low
}

// 示例使用
let urls = [
    URL(string: "https://example.com/high1")!,
    URL(string: "https://example.com/medium1")!,
    URL(string: "https://example.com/low1")!,
    URL(string: "https://example.com/high2")!,
    URL(string: "https://example.com/medium2")!,
    URL(string: "https://example.com/low2")!
]

DownloadQueueManager.shared.addDownload(url: urls[0], priority: .high)
DownloadQueueManager.shared.addDownload(url: urls[1], priority: .medium)
DownloadQueueManager.shared.addDownload(url: urls[2], priority: .low)
DownloadQueueManager.shared.addDownload(url: urls[3], priority: .high)
DownloadQueueManager.shared.addDownload(url: urls[4], priority: .medium)
DownloadQueueManager.shared.addDownload(url: urls[5], priority: .low)

// 保持程序运行直到所有任务完成
DispatchQueue.global().asyncAfter(deadline: .now() + 10) {
    print("All downloads should be complete by now.")
    exit(0)
}

RunLoop.main.run()

DownloadOperation:这是一个自定义的 Operation 子类,代表一个下载任务。main 方法包含实际的下载逻辑。
DownloadQueueManager:这是一个单例类,用于管理高、中、低优先级的下载队列。每个队列使用不同的 qualityOfService 来代表优先级。
DownloadPriority:这是一个枚举,定义了三种下载优先级:高、中、低。
示例使用:创建了一些示例 URL,并将它们添加到不同优先级的下载队列中。最后使用 RunLoop.main.run() 保持程序运行,直到所有任务完成。

参考链接:

Swift 使用 Operation 实现
https://github.com/showmylym/FileDownloaderDemo

Swift 使用 sync/await 实现
https://www.jb51.net/program/2903196bf.htm

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容