前言
RxSwift 的创建序列的流程调用比较复杂,我看了好久才真正的理解,希望我这篇文章能保住对这个流程的源码还没有理解的同学。我还不知道如何用一种直观的方式将流程清晰的表达出来,不过关键的代码本文已经都放了出来,比较关键的方法也都有注释说明。
创建自定义队列
Rxswift 创建序列的代码如下:
///创建序列
let seq = Observable<String>.create {
(observer) -> Disposable in
///发送元素
observer.onNext("RxSwift Hello World")
return Disposables.create()
}
///订阅
dispose = seq.subscribe(onNext: {
e in
debugPrint(e)
})
///销毁
dispose.dispose()
从代码可以看到大体上可以分为三步走: 创建序列,订阅序列,销毁。
创建序列
///创建序列
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
RxSwift创建的自定义序列实际上是 AnonymousObservable,来看看AnonymousObservable 的实现
AnonymousObservable
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
///调用创建时传递的 create 闭包
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
AnonymousObservable 的结构比较简单 ,继承了Producer<Element>,内部持有了初始化创建时传递的闭包。
订阅序列
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
///这个Disposeable是为调用 onDisposed回调的
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
///创建的观察者
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
///这里创建的是BinaryDisposable
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
如果没有理解代码中的注释没有关系,源码的调用过程也比较复杂,请继续向下看。
订阅序列的流程看起来比较长,不过总结起来做了三件事:
- 根据是否传递了onDisposed, 创建了一个Disposeable(销毁器)
- 创建了一个AnonymousObserver
- 调用 Disposables.creat 又创建了 Disposeable(销毁器) 然后返回,并且调用了另一个 subscribe方法。
Producer 的 subscribe 方法
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
///会调用子类的run方法
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
上面比较关键的是run方法,看上面AnonymousObservable的注释。run方法的作用是调用 创建 调用create方法时传递的闭包。
run方法
/// AnonymousObservableSink<O: ObserverType>
func run(_ parent: Parent) -> Disposable {
///这里的 parent 就是AnonymousObservable
///AnyObserver是对外界隐藏 观察者具体类型的包装
return parent._subscribeHandler(AnyObserver(self))
}
订阅过程总的来说就创建观察者,调用<创建序列时传递的闭包>将观察者传递给调用方,调用方拿到观察者就可发送事件了。然后返回销毁器,给调用者用来销毁资源。
相关对象的引用关系
下图是整个过程涉及到的类型的引用图,箭头指向持有者:
结语
如果我的文章对你有帮助,那就请点个赞吧,让我知道本篇文章确实起到了作用。Thanks♪(・ω・)ノ