RxSwift, 异步操作组合处理

响应式编程&链式编程

公司的几个共享项目, 有较多后台定时的网络请求,定位和蓝牙操作的组合.

原方案是通过闭包嵌套起来, 如此一来有些比较头疼的问题:

  • 闭包回调地狱, 每个组合操作的业务上有变动需要做大修.
  • 无法获取或取消上一次的请求/操作.
  • 异步响应不及时可能造成之前的请求后至, 让数据出错. 或在页面退出之后仍然在进行未完成的请求.

网上较推荐的解决方案, 就是使用响应式编程框架: ReactiveCocoa/RxSwift

刚好公司的项目里面本来已经使用了Swagger来自动生成网络请求业务的代码,
自带Moya框架并配搭了RxSwift.
理所当然用了RxSwift.


串行异步处理

举个场景: 设备列表界面

需求大致如下:

  • 列表下拉刷新.
  • 每次下拉刷新, 需要先更新坐标, 然后再获取附近的设备列表.
  • 考虑定位超时或者定位出错的情况(例如没有开启权限).

分析之后得出, 下拉刷新触发之后, 需要实现以下函数:

// 检查定位权限
func checkAuthStatus(completion: (Error?) -> Void)
// 更新坐标
func updateLocation(success: (CLLocation) -> Void, failure: (Error) -> Void)
// 请求设备
func fetchDevices(near location: CLLocation, success: ([Device]) -> Void, failure: (Error) -> Void)
// 更新数据以及界面
func update(with devices: [Device])
// 处理错误
func handlError(_ error: Error)

闭包实现的调用代码:

self.checkAuthStatus(completion: { [weak self] error in
    if let _error = error {
        self?.handleError(_error)
    }else {
        self?.updateLocation(
            success: { location in
                self?.fetchDevices(
                    near: location, 
                    success: {self?.update(with: $0)}
                    failure: {self?.handlError($0)}
                    )
            }, 
            failure: {
                 self?.handlError($0)
            }
        )
    }
})

是的, 它能跑, 但是它也很难看.

如果把这段代码放在列表下拉刷新的回调里面, 视觉上会更加让人崩溃.


这个时候搬出RxSwift. 我们先直接看一下, 接入后代码的样子:

self.tableView.rx_pullToRefresh
    .flatMap({[unowned self] in self.rx_checkAuthStatus()})
    .flatMap({[unowned self] in self.rx_updateLocation()})
    .flatMap({[unowned self] in self.rx_fetchDevices(near: $0)})
    .subscribe(onNext:  {[weak self] in self?.update(with: $0)},
               onError: {[weak self] in self?.handleError($0)})
    .disposed(by: self.disposeBag)

(以上出现的rx_xxx函数会在后面讨论如何实现. 同时, 这段代码仍未完善, 后面会继续讨论.)

这段代码关键点主要有:

  • 链式调用
    把原来通过闭包回调的函数都改造成了返回Observable的函数, 然后链式调用ObservableOperator(flatMap函数).

  • 函数式
    把处理回调的函数(匿名函数)作为参数传递给 Operator 函数和 subscribe 函数.

  • Error聚合处理
    用闭包回调的方式需要在每个函数调用的时候传入处理 Error 的函数.
    而使用 RxSwift 的话, Error 的处理入口可以聚合到1个地方. 就是在 subscribe 函数的 onError参数.

以上便是串行异步操作使用RxSwift处理的效果.


并行异步处理

照样举例: 下载多张图片并本地化.

大致需求:

  • 并发下载若干张图片(不考虑并发数量)
  • 全部图片下载完成后把数据本地化
  • 如果任意一张图片下载失败则不把数据本地化

分解之后有以下几个函数:

// 下载图片
func downloadImage(withURL: URL) -> (imageData: Data?, error: Error?)
// 处理下载好的图片, 更新临时数据
func handleDownloadImage(imageData: Data, url: URL)
// 处理错误
func handleError(error: Error)
// 把临时数据写入本地数据库
func updateDatabase()

原来的实现, 比较简单地遍历了全部URL并开启异步下载, 通过DispatchGroup做任务依赖:

let group = DispatchGroup()

urls.forEach { value in 
    DispatchQueue.global().async(group: group, execute: { [weak self] in
        let res = self?.downloadImage(withURL: value)
        if let data = res?.imageData {
            self?.handleDownloadImage(imageData: data, url: value)
        }else if let error = res?.error {
            group.leave() // 遇到错误, 停止group
            DispatchQueue.main.async { // 处理错误只需要执行一次
                self?.handleError(error: error)
            }
        }
    })
}

group.notify(queue: DispatchQueue.main) { [weak self] in
    self?.updateDatabase()
}

而接入RxSwift的做法, 利用Observablezip函数把多个下载任务聚合:

let observables = 
urls.map { [unowned self] url in
    self.rx_downloadImage(withURL: url).map { data in self.handleDownloadImage(imageData: data, url: url) }
}

Observable.zip(observables)
          .subscribe(onNext: {[unowned self] _ in self.updateDatabase()}
                     onError: {[unowned self] in self.handleError(error: $0)})
          .disposed(by: self.disposeBag)

对原函数进行改造

上面我们说了要引入 RxSwift做异步处理, 接着我们讨论如何把原来的函数改造并接入 RxSwift 体系.

Observable

ObservableRxSwift 中数据传递的载体.

以前, 我们从一个函数获得其执行结果, 一般是:

  • 同步函数情况下, 通过这个函数的返回值.
  • 异步函数情况下, 通过向函数传入回调闭包.
  • 其他方式(如传入回调代理, 或者监听广播通知等), 这些在这里先不讨论.

RxSwift 可以让我们在不传入回调闭包的情况下, 用类似调用同步函数的方式, 从返回值拿到异步函数的运行结果. 而这个结果, 就是通过Observable包装而来的.


因此我们要做的, 就是把运行结果用 Observable 包装起来返回.

RxSwift提供了一个比较简单的方式创建 Observable, 就是其静态函数create:

// Creates an observable sequence from a specified subscribe method implementation.
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E>

这个函数只有一个闭包参数, 闭包会给我们提供一个Observer的实例, 我们的操作需要在这个闭包里头完成, 而操作结果则需要传给这个 Observer .

比方上面的下载图片函数:

func rx_downloadImage(withURL url: URL) -> Observable<Data> {
    return Observable.create({ [weak self] observer -> Disposable in 
        DispatchQueue.global().async({ // 子线程中调用
            let res = self?.downloadImage(withURL: url)
            if let data = res?.data {
                observer.onNext(data) // 把下载的数据传递给Observer
                observer.onCompleted() // 告诉Observer, 已经完成了
            }else if let error = res?.error {
                observer.onError(error) // 把错误传递给Observer
            }
        })
        return Disposables.create()
    })
}

这样就封装了一个在内部子线程调用旧函数downloadImage的函数. 对于子线程调用这个需求, 简单地使用了global队列来异步执行下载.


也许有人会有疑问, 那么这个下载操作实际上是在什么时候执行呢? Observable 被返回的时候吗? 还是Observable被创建的时候?

这里先直接给出答案:

就是这个Observable(或者通过它转换得来的新Observable)被subscribe的时候.

具体为什么, 这里暂时不探讨.


队列的切换

刚刚的我们实现了在函数内异步下载并回调的需求, 但是这个函数又有一个新问题:

下载操作必然是在DispatchQueue.global中执行, 调用者无法控制.

那我们有办法自己去控制下载任务在哪个队列中进行吗?

有的, 通过以下两个函数observeOnsubscribeOn:

// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
public func observeOn(_ scheduler: ImmediateSchedulerType) -> PrimitiveSequence<Trait, Element> 
// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified
public func subscribeOn(_ scheduler: ImmediateSchedulerType)

其中, observeOn是指定了Observable对应的任务在什么队列里执行.

subscribeOn则是指定了我们调用subscribe函数的时候, 回调任务在什么队列执行. 如果没有调用subscribeOn, 那么回调里任务执行的队列就和ObserveOn指定的保持一致.

这两个函数接收的参数ImmediateSchedulerType是RxSwift定义的协议, 说白了就是队列的信息, 以及执行任务的方式.

实际上我们有几个已经实现好结构体可以直接使用, 其中比较常用的是:

  • SerialDispatchQueueScheduler (串行队列)
  • ConcurrentDispatchQueueScheduler (并行队列)
  • ConcurrentMainScheduler (主队列, 是为了subscribeOn函数设计的)

Abstracts work that needs to be performed on MainThread. In case schedule methods are called from main thread, it will perform action immediately without scheduling.
This scheduler is optimized for subscribeOn operator. If you want to observe observable sequence elements on main thread using observeOn operator,
MainScheduler is more suitable for that purpose.

  • MainScheduler (主队列, 是为了observeOn函数设计的)

Abstracts work that needs to be performed on DispatchQueue.main. In case schedule methods are called from DispatchQueue.main, it will perform action immediately without scheduling.
This scheduler is usually used to perform UI work.
Main scheduler is a specialization of SerialDispatchQueueScheduler.
This scheduler is optimized for observeOn operator. To ensure observable sequence is subscribed on main thread using subscribeOn
operator please use ConcurrentMainScheduler because it is more optimized for that purpose.

我们可以在SerialDispatchQueueSchedulerConcurrentDispatchQueueScheduler的init函数中传入对应的队列. 或者直接指定qos, 类似DispatchQueue的global函数.

通过这2个函数, 我们就可以去掉上面封装的下载任务中的GCD调用, 在外部调用的时候自由切换队列:

func rx_downloadImage(withURL url: URL) -> Observable<Data> {
    return Observable.create({ [unowned self] observer -> Disposable in 
        let res = self.downloadImage(withURL: url)
            if let data = res.data {
                observer.onNext(data)
                observer.onCompleted()
            }else if let error = res.error {
                observer.onError(error)
            }
        return Disposables.create()
    })
}

let observables = ...

let observeScheduler = ConcurrentDispatchQueueScheduler(qos: DispatchQoS.default)

Observable.zip(observables)
          .observeOn(observeScheduler)
          .subscribeOn(MainScheduler.asyncInstance)
          .subscribe(onNext: {[unowned self] res in self.updateDatabase()}
                     onError: {[unowned self] in self.handleError(error: $0)})
          .disposed(by: self.disposeBag)

Observable对于Error报错处理的"缺陷"

在串行异步请求处理里面提到了这样的处理方式并不完善, 因为Observable有一个特点:

抛出Error之后就会自行销毁订阅

而这段代码里面, 下拉刷新的Observable是与另外两个异步操作聚合在一起的, 就是说如果网络请求或者定位的操作抛出Error, 那么用户下一次下拉刷新也是不会被处理的.

所以, 为了避免这个问题, 解决方式有两种:

  1. 不要把下拉刷新的Observable与另外两个异步操作的Observable聚合;

  2. 拦截另外两个Observable可能抛出的Error

第1个方案的解决代码:

self.tableView.rx_pullToRefresh
    .subscribe(onNext: { [unowned self] in
        _ = self.rx_updateLocation()
                .flatMap({self.rx_fetchDevices(near: $0)})
                .subscribe(onNext:  {self.update(with: $0)},
                           onError: {self.handleError($0)})
    })    
    .disposed(by: self.disposeBag)

这样就可以保证rx_pullToRefresh会一直被监听, 但是这个处理方式会让代码可读性又下降了.

所幸RxSwift还给我们提供了另外的选择, 利用ObservablecatchError函数或者catchErrorJustReturn函数, 我们就可以把以上2个异步操作的错误拦截.

Continues an observable sequence that is terminated by an error with the observable sequence produced by the handler.

public func catchError(_ handler: @escaping (Error) throws -> RxSwift.Observable<Self.E>) -> RxSwift.Observable<Self.E>

Continues an observable sequence that is terminated by an error with a single element.

public func catchErrorJustReturn(_ element: Self.E) -> RxSwift.Observable<Self.E>

其中catchErrorJustReturn函数可以让我们产生一个默认值来继续事件链,
catchError则接收一个闭包参数, 可以通过我们产生的Error来制定一个值给后续事件链或者直接在闭包里面处理Error.
显然这个场景我们是需要处理Error的, 所以选用catchError. 而这样的话, subscribe函数的onError就永远都不会执行了, 等于是把Error的处理提前到了catchError:

// 处理Error并且返回默认值
func rx_handleError(_ error: Error) -> Observable<[Device]> {
    // 处理Error, 生成默认参数
    ...
    return Observable.of(__defaultValue__)
}
// 请求列表的数据(聚合2个异步操作的Observable)
func rx_fetchTableViewData() -> Observable<[Device]> {
    return self.rx_updateLocation().flatMap({self.rx_fetchDevices(near: $0)})
}

self.tableView.rx_pullToRefresh
    .flatMap({ [unowned self] in 
        self.rx_fetchTableViewData().catchError({self.rx_handleError($0)})
    })
    .subscribe(onNext: {[weak self] in self?.update(with: $0)})
    .disposed(by: self.disposeBag)

以上这样操作算是把Error处理了, 但是还是存在问题, 我们还需要再改进一下, 不要让handleResponseValuehandleError的代码分散.

配合enum以及泛型更舒服地处理Error

Swift里面的枚举类型是可以带参数的, 我们可以把我们要的结果抽象出来, 定义一个枚举:

enum Result {
    case value([Device])
    case error(Error)
}

然后改动一下我们的fetch以及handle方式:

// 改进后的fetch函数把[Device]和Error转换成Result
func rx_fetchTableViewData() -> Observable<Result> {
    return self.rx_updateLocation()
               .flatMap({self.rx_fetchDevices(near: $0)})
               .map({Result.value($0)})
               .catchError({Observable.of(Result.error($0))})
}
// 统一处理Result
func handleResult(_ result: Result) {
    switch result {
        case .value(let value):
            self.update(with: value)
        case .error(let error):
            self.handleError(error)
    }
}

最后我们这个下拉刷新->更新定位坐标->获取附近设备->更新table的代码就可以变成:

self.tableView.rx_pullToRefresh
    .flatMap({[unowned self] in self.rx_fetchTableViewData()})
    .subscribe(onNext: {[weak self] in self?.handleResult($0)})
    .disposed(by: self.disposeBag)

但是, 实际上有这种场景的不止是设备列表, 还有比如附近的设备中心(Station), 这个Result显然可以承担更多的任务.

利用Swift的泛型改进, 让Result适用于通用的场景:

enum Result<T> {
    case value(T)
    case error(Error)
}
func rx_fetchDeviceTableViewData() -> Observable<Result<[Device]>> {
    return ...
}
func handleDeviceResult(_ result: Result<[Device]>) {
    ...
}
func rx_fetchStationTableViewData() -> Observable<Result<[Station]>> {
    return ...
}
func handleStationResult(_ result: Result<[Station]>) {
    ...
}

快速改造原来的GCD异步函数

如果在项目中途接入Rx, 原来项目中已经存在大量通过GCD回调的函数了.

这个时候把全部函数都改造是很高成本的, 而且部分函数可能在项目中被调用了很多次, 涉及的模块可能比较多, 但是不一定每个调用了这个函数的模块都有必要接入Rx.

在这种情况下通用可以使用Observable的create函数去封装原来的函数.

比如有一个加载本地数据的函数:

func loadDataFromLocal(filePath: URL, success: (Data)->Void, failure: (Error)->Void) {
    ...
    ...
    ...
}

在不改动原函数的情况下, 增加一个新的函数:

func rx_loadDataFromLocal(filePath: URL) -> Observable<Result<Data>> {
    
    return Observable.create({ observer in
    
        loadDataFromLocal(
            filePath: filePath,
            success: {observer.onNext(.value($0))}, 
            failure: {observer.onNext(.error($0))}
        )
                          
        return Disposables.create()
    })
}

这样的函数改造和同步函数的改造一样, 有一个类似的缺陷, 就是不可以再改变loadDataFromLocal函数在哪个队列执行.

相对地, 这个改造方式可以比较简便地复用现有的函数.

所以这是一个折中的改造方案.


取消任务

回顾上面的代码, 有一段

self.tableView.rx_pullToRefresh
    .flatMap({ [unowned self] in 
        self.rx_fetchTableViewData().catchError({self.rx_handleError($0)})
    })
    .subscribe(onNext: {[weak self] in self?.update(with: $0)})
    .disposed(by: self.disposeBag)

在订阅Observable之后, 我们又调用了其结果的disposed(by:)函数, 这里传入的参数就是用于管理这个订阅的生命周期.

这个参数是个DisposeBag类型, 当它被释放的时候, 对应它的订阅任务也会取消. 因此我们可以通过管理DisposeBag来取消异步任务.

比方说让ViewController持有一个DisposeBag, 在ViewController调用deinit的时候, 绑定在这个DisposeBag上的Observable就都不会继续处理了.

但是上面是一种间接取消, 我们也有直接取消的方式:

调用dispose函数, 如下:

let disposable = self.rx_fetchTableViewData().subscribe() // 保存订阅的任务
...
...
...
disposable.dispose() // 在适当的时候调用dispose取消订阅

小结

以上是项目中接入 RxSwift 对异步/并行操作的优化改进经历. 概括来说, 就是:

  • 降维闭包嵌套, 替换成链式的Observable变量或返回Observable的函数.

  • 使用RxSwift提供的Operator函数进行数据流处理.

  • Error聚合处理.

  • 通过observeOnsubscribeOn函数控制队列切换.

  • 通过DisposeBag类型, 或者dispose函数控制任务的销毁.


持续更新...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容