[toc]
前言
本文来自拉勾网课程整理
在 iOS
开发中,随着 App
功能不断增强,处理各种异步事件,保持程序状态实时更新,也变得越来越困难。
以 ViewController
来为例,我们需要处理许多异步事件,比如来自 Delegate
和 DataSource
的回调,来自 NotificationCenter
的通知消息,来自 View
的 Target-Action
事件,等等。
由于它们随机发生且可能来自不同的线程,本身就会比较复杂,再加上其他新事件的引入,代码处理的逻辑会呈指数式增长。那么,怎样才能从根本上解决这些问题呢?这一讲我们所介绍的响应式编程就可以解决。
响应式编程与 RxSwift
所谓响应式编程,就是使用异步数据流(Asynchronous data streams
)进行编程。在传统的指令式编程语言里,代码不仅要告诉程序做什么,还要告诉程序什么时候做。而在响应式编程里,我们只需要处理各个事件,程序会自动响应状态的更新。而且,这些事件可以单独封装
,能有效提高
代码复用性并简化错误处理的逻辑。
现在,响应式编程已慢慢成为主流的编程范式,比如Android
平台的 Architecture Components
提供了支持响应式编程的 LiveData
,SwiftUI
也配套了 Combine
框架。在 Moments App
中,我采用的也是响应式编程模式
。
目前比较流行的响应式编程框架有 ReactiveKit
、ReactiveSwift
和 Combine
。在这里,我们推荐使用RxSwift
。因为 RxSwift
遵循了 ReactiveX
的 API
标准,由于 ReactiveX
提供了多种语言的实现,学会 RxSwift
能有效把知识迁移到其他平台。还有 RxSwift
项目非常活跃,也比较成熟。更重要的是,RxSwift
提供的 RxCocoa
能帮助我们为UIKit
扩展响应式编程的能力,而Combine
所对应的 CombineCocoa
还不成熟。
为了让 App
可以自动更新状态,我们在Moments App
里面使用RxSwift
把 MVVM
各层连接起来。
从上图可以看出,当用户打开朋友圈页面,App
会使用后台排程器向 BFF
发起一个网络请求,Networking
模块把返回结果通过Observable
序列发送给 Repository
模块。Repository
模块订阅接收后,把数据发送到Subject
里面,然后经过map
操作符转换,原先的 Model
类型转换成了 ViewModel
类型。 ViewModel
模块订阅经过操作符转换的数据,发送给下一个Subject
,之后,这个数据被 ViewController
订阅,并通过主排程器更新了 UI
。
整个过程中,Repository
模块、 ViewModel
模块、ViewController
都是订阅者,分别接收来自前一层的信息。就这样,当App
得到网络返回数据时,就能自动更新每一层的状态信息,也能实时更新 UI
显示。
这其中的 Observable
序列、订阅者、Subject
、操作符、排程器属于 RxSwift
中的关键概念,它们该如何理解,如何使用呢?接下来我就一一介绍下。
异步数据序列 Observable
为了保证程序状态的同步,我们需要把各种异步事件都发送到异步数据流里,供响应式编程发挥作用。在 RxSwfit
中,异步数据流称为 Observable
序列,它表示可观察的异步数据序列,也可以理解为消息发送序列。
在实际应用中,我们通常使用Observable
序列作为入口,把外部事件连接到响应式编程框架里面。比如在 Moments App
,我通过Observable
把网络请求的结果连接进 MVVM
架构中。
那么怎样创建 Observable
序列呢?为方便我们生成 Observable
序列, RxSwfit
的Observable
类型提供了如下几个工厂方法:
-
just
方法,用于生成只有一个事件的Observable
序列; -
of
方法,生成包含多个事件的Observable
序列; -
from
方法,和of
方法一样,from
方法也能生成包含多个事件的Observable
序列,但它只接受数组为入口参数。
let observable1: Observable<Int> = Observable.just(1) // 序列包含 1
let observable2: Observable<Int> = Observable.of(1, 2, 3) // 序列包含 1, 2, 3
let observable3: Observable<Int> = Observable.from([1, 2, 3]) // 序列包含 1, 2, 3
let observable4: Observable<[Int]> = Observable.of([1, 2, 3]) // 序列包含 [1, 2, 3]
当你需要生成只有一个事件的 Observable
序列时,可以使用just
方法,如observable1
只包含了1
。
当需要生成包含多个事件的Observable
序列时,可以使用of
或者from
方法。它们的区别是,of
接收多个参数而from
只接收一个数组。如上所示,我们分别使用了of
和from
方法来生成observable2
和observable3
,它们都包含了1、2 和 3
三个事件。
这里需要注意,of
方法也能接收数组作为参数的。与from
方法会拆分数组为独立元素的做法不同,of
方法只是把这个数组当成唯一的事件,例如observable4
只包含值为[1, 2, 3]
的一个事件。
在开发当中,Observable
序列不仅仅存放数值,比如 Moments App
的异步数据流就需要存放朋友圈信息来更新 UI
,Observable
也支持存放任意类型的数据。像在下面的例子中,peopleObservable
就存放了两条类型为Person
的数据,其中 Jake
的收入是10
而Ken
的收入是20
。
struct Person {
在响应式编程模式里,订阅者是一个重要的角色。在 Moments App 里面,上层模块都担任订阅者角色,主要订阅下层模块的 Observable 序列。那订阅者怎样才能订阅和接收数据呢?
let name: String
let income: Int
}
let peopleObservable = Observable.of(Person(name: "Jake", income: 10), Person(name: "Ken", income: 20))
订阅者
在响应式编程模式里,订阅者是一个重要的角色。在Moments App
里面,上层模块都担任订阅者角色,主要订阅下层模块的Observable
序列。那订阅者怎样才能订阅和接收数据呢?
在 RxSwift
中,订阅者可以调用Observable
对象的subscribe
方法来订阅。如下所示。
let observable = Observable.of(1, 2, 3)
observable.subscribe { event in
print(event)
}
订阅者调用subscribe
方法订阅observable
,并接收事件,当程序执行时会打印以下信息:
next(1)
next(2)
next(3)
completed
你可能会问上面的next
和completed
是什么呢?其实它们都是事件,用来表示异步数据流上的一条信息。RxSwift
使用了Event
枚举来表示事件,定义如下。
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
-
.next(value: T)
:用于装载数据的事件。当Observable
序列发送数据时,订阅者会收到next事件,我们可以从该事件中取出实际的数据。 -
.error(error: Error)
:用于装载错误事件。当发生错误的时候,Observable
序列会发出error
事件并关闭该序列,订阅者一旦收到error
事件后就无法接收其他事件了。 -
.completed
:用于正常关闭序列的事件。当Observable
序列发出completed
事件时就会关闭自己,订阅者在收到completed
事件以后就无法收到任何其他事件了。
怎么理解呢?下面我通过两个例子来介绍下。由于之前讲过的of
和from
等方法都不能发出error
和completed
事件 ,在这里我就使用了create
方法来创建 Observable
序列。
首先我们看一下发送error
事件的例子。
Observable<Int>.create { observer in
observer.onNext(1)
observer.onNext(2)
observer.onError(MyError.anError)
observer.onNext(3)
return Disposables.create()
}.subscribe { event in
print(event)
}
在这个例子中,我们调用了create
方法来生成一个 Observable
序列,该 Observable
发出next(1)
、next(2)
、error
和next(3)
事件。由于next(3)
事件在错误事件之后,因此订阅者无法接收到next(3)
事件。程序执行时会打印下面的日志。
next(1)
next(2)
error(anError)
接着我们看一下发送completed
事件的例子。
Observable<Int>.create { observer in
observer.onNext(1)
observer.onCompleted()
observer.onNext(2)
observer.onNext(3)
return Disposables.create()
}.subscribe { event in
print(event)
}
在这里,我调用create
方法来生成一个 Observable
序列,该 Observable
发出了next(1)、completed、next(2)和next(3)
事件。因为next(2)和next(3)
都在完成事件之后发出的,所以订阅者也无法接收它们,程序执行时会打印如下的日志。
next(1)
completed
在现实生活中,当我们订阅了报刊时可以自己选择退订,却无法让发行方停刊。在 RxSwift
里面也一样,订阅者无法强行让 Observable
序列发出completed事
件来关闭数据流。那订阅者该怎样取消订阅呢?
如果你仔细观察就会发现,subscribe
方法返回的类型为Disposable
的对象,我们可以通过调用该对象的dispose
方法来取消订阅。
为了更好地理解dispose
方法的作用和触发时机,我通过subscribe()
方法来打印出各个事件,如下所示。
let disposable = Observable.of(1, 2).subscribe { element in
print(element) // next event
} onError: { error in
print(error)
} onCompleted: {
print("Completed")
} onDisposed: {
print("Disposed")
}
disposable.dispose()
我们在onNext
闭包里面处理next
事件;在onError
闭包里处理error
事件;在onCompleted
闭包里处理completed
事件;而在onDisposed
闭包里处理退订事件。
在这里,我们调用subscribe
方法后,它又马上调用了dispose
方法,因此程序会在调用onCompleted
之后立刻调用onDisposed
。其执行效果如下:
1
2
Completed
Disposed
假如我在订阅前调用delay
方法,那么所有的事件都会延时两秒钟后才通知订阅者,代码如下:
let disposableWithDelay = Observable.of(1, 2).delay(.seconds(2), scheduler: MainScheduler.instance).subscribe { element in
print(element) // next event
} onError: { error in
print(error)
} onCompleted: {
print("Completed")
} onDisposed: {
print("Disposed")
}
disposableWithDelay.dispose()
和上面没有延时的例子一样,我们在调用subscribe
方法以后马上调用了dispose
方法,由于 Observable
序列上所有事件还在延时等待中,程序会直接调用onDisposed
并退订了disposableWithDelay
序列,因此没办法再收到两秒钟后所发出的next(1)、next(2)
和completed
事件了。 其执行效果如下:
Disposed
在很多时候,订阅后马上退订并不是我们想要的结果,我们希望订阅者一直监听事件直到自身消亡的时候才取消订阅。那有什么好的办法能做到这一点呢?
RxSwift
为我们提供了DisposeBag
类型,方便存放和管理各个Disposable
对象。其用法也非常简单,只需调用Disposable的disposed(by:)
方法即可。代码如下:
let disposeBag: DisposeBag = .init()
Observable.just(1).subscribe { event in
print(event)
}.disposed(by: disposeBag)
Observable.of("a", "b").subscribe { event in
print(event)
}.disposed(by: disposeBag)
代码中的disposeBag
存放了两个Disposable
对象。当订阅者调用其deinit
方法时,同时也会调用disposeBag
的deinit
方法。在这时候,disposeBag
会取出存放的所有Disposable
对象,并调用它们的dispose
方法来取消所有订阅。
在实际情况下,我建议只需为一个订阅者定义一个disposeBag
即可。例如 Repository
模块同时订阅了 Networking
模块和 DataStore
模块,但它只定义了一个disposeBag
来管理所有的订阅。
事件中转 Subject
以上是如何生成、订阅和退订 Observable
序列。使用Observable
的工厂方法所生成的对象都是“只读
”,一旦生成,就无法添加新的事件。但很多时候,我们需要往 Observable
序列增加事件,比如要把用户点击UI
的事件添加到 Observable
中,或者把底层模块的事件加工并添加到上层模块的序列中。
那么,有什么好办法能为异步数据序列添加新的事件呢?RxSwift
为我们提供的 Subject
及其onNext
方法可以完成这项操作。
具体来说,Subject
作为一种特殊的Observable
序列,它既能接收又能发送,我们一般用它来做事件的中转。在Moments App
的 MVVM
架构里面,我们就大量使用Subject
发挥这一作用。 比如,当 Repository
模块从 Networking
模块中接收到事件时,会把该事件转送到自身的 Subject
来通知 ViewModel
,从而保证 ViewModel
的状态同步。
那么,都有哪些常见的Subject
呢?一般有 PublishSubject、BehaviorSubject 和 ReplaySubject
。它们的区别在于订阅者能否收到订阅前的事件。
-
PublishSubject
:如果你想订阅者只收到订阅后的事件,可以使用PublishSubject
。 -
BehaviorSubject
:如果你想订阅者在订阅时能收到订阅前最后一条事件,可以使用BehaviorSubject
。 -
ReplaySubject
:如果你想订阅者在订阅的时候能收到订阅前的 N 条事件,那么可以使用ReplaySubject
。
在订阅以后,它们的行为都是一致的,当 Subject
发出error
或者completed
事件以后,订阅者将无法接收到新的事件。
操作符
操作符(Operator
)是RxSwift
另外一个重要的概念,它能帮助订阅者在接收事件之前把Observable
序列中的事件进行过滤、转换或者合并。
例如在 Moments App
里面,我们使用 map
操作符把 Model
数据转换成 ViewModel
类型来更新UI
。这里的 map
操作符就属于转换操作符,能帮助我们从一种数据类型转变成另外一种类型。除了map
,compactMap
和flapMap
也属于转换操作符。
此外还有 filter
和 distinctUntilChanged
等过滤操作符,我们可以使用过滤操作符把订阅者不关心的事件给过滤掉。还有合并操作符如 startWith,concat
,merge,combineLatest
和 zip
,可用于组装与合并多个 Observable
序列。
除了上面提到过的常用操作符,RxSwift
还为我们提供了 50 多个操作符,那怎样才能学会它们呢?我推荐你到 rxmarbles.com
或者到App Store
下载 RxMarbles App
,然后打开各个操作符并修改里面的参数,通过输入的事件和执行的结果来理解这些操作的作用。
排程器
保持程序状态自动更新之所以困难,很大原因在于处理并发的异步事件是一件烦琐的事情
。为了方便处理来自不同线程的并发异步事件,RxSwift
为我们提供了排程器。它可以帮我们把繁重的任务调度到后台排程器完成,并能指定其运行方式(如是串行还是并发),也能保证 UI 的任务都在主线程上执行。
比如在Moments App
里面,Networking
和 DataStore
模块都在后台排程器上执行,而 View
模块都在主排程器上执行。
根据串行或者并发来归类,我们可以把排程器分成两大类串行的排程器和并发的排程器。
串行的排程器包括 CurrentThreadScheduler、MainScheduler、SerialDispatchQueueScheduler
。
其中,CurrentThreadScheduler
可以把任务安排在当前的线程上执行,这是默认的排程器。当我们不指定排程器的时候,RxSwift
都会使用 CurrentThreadScheduler
把任务放在当前线程里串行执行;MainScheduler
是把任务调度到主线程MainThread
里并马上执行,它主要用于执行 UI
相关的任务;而SerialDispatchQueueScheduler
则会把任务放在dispatch_queue_t里
面并串行执行。
并发的排程器包括 ConcurrentDispatchQueueScheduler
和 OperationQueueScheduler
。
其中,ConcurrentDispatchQueueScheduler
把任务安排到dispatch_queue_t
里面,且以并发的方式执行。该排程器一般用于执行后台任务,例如网络访问和数据缓存等等。在创建的时候,我们可以指定DispatchQueue
的类型,例如使用ConcurrentDispatchQueueScheduler(qos: .background)
来指定使用后台线程执行任务。
OperationQueueScheduler
是把任务放在NSOperationQueue
里面,以并发的方式执行。这个排程器一般用于执行繁重的后台任务,并通过设置maxConcurrentOperationCount
来控制所执行并发任务的最大数量。它可以用于下载大文件。
那么,如何用排程器进行调度,处理好不同线程的并发异步事件呢?请看下面的代码实现。
Observable.of(1, 2, 3, 4)
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.dumpObservable()
.map { "\(getThreadName()): \($0)" }
.observeOn(MainScheduler.instance)
.dumpObserver()
.disposed(by: disposeBag)
首先我们传入ConcurrentDispatchQueueScheduler(qos: .background)
来调用subscribeOn
方法,把 Observable
序列发出事件的执行代码都调度到后台排程器去执行。然后通过传入MainScheduler.instance
来调用observeOn
,把订阅者执行的逻辑都调度主排程器去执行。
这是一种常用的模式,我们通常使用后台排程器来进行网络访问并处理返回数据,然后通过主排程器把数据呈现到UI
中去。
由于后台线程不能保证执行的顺序,其执行效果如下,当你执行的时候可能会有点变化。
[Observable] 1 emitted on Unnamed Thread
[Observable] 2 emitted on Unnamed Thread
[Observer] Unnamed Thread: 1 received on Main Thread
[Observable] 3 emitted on Unnamed Thread
[Observer] Unnamed Thread: 2 received on Main Thread
[Observable] 4 emitted on Unnamed Thread
[Observer] Unnamed Thread: 3 received on Main Thread
[Observer] Unnamed Thread: 4 received on Main Thread
总结
在这一讲中我们介绍了RxSwift
的五个关键概念:Observable 序列
、订阅者
、Subject
、操作符
以及排程器
。我把本讲的代码都放在 Moments App
项目中的RxSwift Playground
文件里面,希望你能多练习,把五个概念融会贯通。
以下是我在实际工作中使用RxSwift
的一些经验总结,希望能帮助到你。
当我们拿到需求的时候,先把任务进行分解,找出哪个部分是事件发布者,哪部分是事件订阅者,例如一个新功能页面,网络请求部分一般是事件发布者,当得到网络请求的返回结果时会发出事件,而 UI 部分一般为事件订阅者,通过订阅事件来保持UI
的自动更新。
找到事件发布者以后,要分析事件发布的频率与间隔。如果只是发布一次,可以使用Obervable
;如果需要多次发布,可以使用Subject
;如果需要缓存之前多个事件,可以使用ReplaySubject
。
当我们有了事件发布者和订阅者以后,接着可以分析发送和订阅事件的类型差异,选择合适的操作符来进行转换。我们可以先使用本讲中提到的常用操作符,如果它们还不能解决你的问题,可以查看 RxMarbles
来寻找合适的操作符。
最后,我们可以根据事件发布者和订阅者所执行的任务性质,通过排程器进行调度。例如把网络请求和数据缓存任务都安排在后台排程器,而 UI
更新任务放在主排程器。
源码地址:
RxSwift Playground
文件地址:https://github.com/lagoueduCol/iOS-linyongjian/blob/main/Playgrounds/RxSwiftPlayground.playground/Contents.swift