RxSwift的使用三步曲
第一步:创建序列
let ob = Observable<Any>.create { observer in
return Disposables.create()
}
第二步:订阅序列
ob.subscribe(onNext: { text in
print("订阅信息: \(text)")
}, onError: { error in
print("error: \(error)")
}, onCompleted: {
print("订阅结束")
}) {
print("已销毁")
}
第三步:发送信号
let ob = Observable<Any>.create { observer in
obserber.onNext("你好明天")
return Disposables.create()
}
整体代码
let ob = Observable<Any>.create { observer in
obserber.onNext("你好明天")
return Disposables.create()
}
ob.subscribe(onNext: { text in
print("订阅信息: \(text)")
}, onError: { error in
print("error: \(error)")
}, onCompleted: {
print("订阅结束")
}) {
print("已销毁")
}
.disposed(by: disposeBag)
分析代码
- 1:创建序列后,RxSwift返回了一个observer,在这个闭包内返回了Disposables.create(),创建的序列。
- 2:订阅序列的各个信息,发送成功,结束等等的信号
- 3:发送的信号由RxSwift返回的observer来发送,这样形成一个完整的环。
那么这个observer怎么来的?在onNext中发送的“你好明天”是怎么到订阅的text中的呢?以及,这些信号怎么发送给订阅者的呢? - 4:猜想:以UIButton为例子,UI序列的相应,比如UIButton它的点击事件,监听的就是点击事件,这个事件由UIButton来发送,而UIButton的事件响应,是依赖于target,在对应的target去实现响应的event。
那么在RxSwift中,UIControl为什么能够监听到UI层,猜测是UIControl.addTarget(rx内部类),事件的响应就由rx内部类来响应,类似于中间类或者是proxy。
同理,self.button.rx.tap.subscribe(),这个点击事件就是通过UIControl响应层去添加响应,而且这个响应不再是原始的UI事件的响应了,也不在控制器中了,在rx的内部类中,所以在事件中可以直接调用observer的onNext方法,来发送响应的信号。
还有一个在onNext中发送的“你好明天”是怎么到订阅的text中的呢?这个问题我们没有找到答案,继续去查看RxSwift的源码。
RxSwift源码分析
前提因素:首先,通过creat来创建了一个序列,所以应该先去探索creat这个方法,而能订阅到“你好明天”这段字符串,依赖于发送信号obserber.onNext("你好明天"),而代码顺序是从上往下执行的,那在ob.subscribe中能打印onNext发送的“你好明天”,那说明在ob.subscribe中应该暗藏一句代码,执行onNext。即subscribe的闭包执行依赖发送信号onNext的执行。带着这些前提,去分析查看源码。
一 首先来看一下creat序列的创建
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)
}
creat创建了一个匿名序列AnonymousObservable
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
这个匿名序列继承自Producer,而且init方法保存了通过create传递的self.subscribeHandler。
Producer继承自Observable即序列的基类,Observable遵循了协议ObservableType,ObservableType协议提供了subscribe方法。该协议是贯穿全剧的,可扩展协议可以根据不同的内容需求扩展。
匿名序列的run方法,就是重写的父类的方法。
二 subscribe订阅信号的内容
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
在订阅的时候,创建了一个匿名内部类AnonymousObserver ,AnonymousObserver继承自继承ObserverBase,ObserverBase遵循了协议Disposable, ObserverType,而AnonymousObserver在初始化的时候保存了eventHandler。self.asObservable().subscribe(observer),.asObservable这个是可观察序列基类Observerble的方法,是采用的接口隔离原则。事实上万物皆可序列,而比如以UISwitch为例:
UISwitch().rx.value.asObservable()
.subscribe { bool in
}
.disposed(by: disposeBag)
为什么需要调用asObservable呢?
public var value: ControlProperty<Bool> {
return base.rx.controlPropertyWithDefaultEvents(
getter: { uiSwitch in
uiSwitch.isOn
}, setter: { uiSwitch, value in
uiSwitch.isOn = value
}
)
}
从源码中看到,value是ControlProperty类型,ControlProperty遵循了协议ControlPropertyType,不是可观察序列,通过调用asObservable强转为可观察序列,然后利用可观察序列的各种方法,来达到相应的需求,这也是万物皆可序列的第二种解释。
而asObservable返回的就是一个可观察序列,返回的是self.values,类型就是 Observable<PropertyType>
let values: Observable<PropertyType>
public func asObservable() -> Observable<Element> {
self.values
}
继续探究self.asObservable().subscribe(observer),subscribe的内容。subscribe执行了父类Produver实现的协议方法subscribe,然后执行了self.run方法,即执行了匿名序列AnonymousObservable的run方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
匿名序列AnonymousObservable的run方法,执行了AnonymousObservableSink的run方法
func run(_ parent: Parent) -> Disposable {
parent.subscribeHandler(AnyObserver(self))
}
AnonymousObservable 调用把AnonymousObservableSink作为参数,调用创建的闭包,执行了保存的subscribeHandler即是在create的时候保存的闭包,并将AnyObserver(self)传递了进去,这就是在create的闭包中传来的observer。流程如下图所示:
三 onNext发送信号
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: Element) {
self.on(.next(element))
}
}
执行了匿名类AnonymousObserver的父类ObserverBase的on方法
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self.isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self.isStopped, 1) == 0 {
self.onCore(event)
}
}
}
然后执行了匿名类AnonymousObserver的onCore方法
override func onCore(_ event: Event<Element>) {
self.eventHandler(event)
}
然后在这个方法中执行了匿名类在subscribe中保存的eventHandler,完整的流程图如下:
总结:Observable的核心流程
从create开始,得到一个匿名观察序列AnonymousObservable,这个匿名类保存了subscribeHandler。
然后通过订阅方法subscribe创建了匿名observer保存了事件eventHandler,并且通过AnonymousObservable的run方法将observer传递,最终执行subscribeHandler将信号传递。
发送信号onNext最终通过核心方法onCore执行了eventHandler将事件传递。