新手读的懂的RxSwift源码解析(一)-- 事件的产生与监听(create&subscribe)

上一篇中,我们了解了RxSwift的核心概念与核心逻辑,有兴趣的读者可以了解一下:
新手读的懂的RxSwift源码解析(零)-- 核心概念与核心逻辑
本篇文章,笔者将与各位读者一起学习一下下一个课题:RxSwift中事件的产生与传递。
我们通过一个最常用的操作符:create来说明。

        Observable<Int>.create { observer in
            //订阅闭包
            observer.onNext(1)
            observer.onNext(2)
            observer.onNext(3)
            observer.onCompleted()
            return Disposables.create()
        }
        .subscribe { value in
            //监听闭包1
            print("Next:\(value)")
        } onError: { error in
            //监听闭包2
            print("Error:\(error)")
        } onCompleted: {
            //监听闭包3
            print("Cpmpleted")
        } onDisposed: {
            //监听闭包
            print("Disposed")
        }
        .disposed(by: disposeBag)

为了便于理解,我们把第一个闭包称之为订阅闭包,后面几个闭包称之为监听闭包。

一、事件的产生

上面的代码中,我们调用了create,传入一个订阅闭包。这个闭包接受一个AnyObserver参数observer,这个AnyObserver是一个遵循了ObserverType协议的struct。在这个订阅闭包闭包中,我们调用了这个observer的onNext方法和onCompleted方法,于是也就产生了一个由三个next事件和一个完成事件的序列。那这个闭包是在什么时候被调用的呢?

先看一下这个create的实现:

extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        AnonymousObservable(subscribe)
    }
}

create接受一个订阅闭包,并且返回一个AnonymousObservable。还是没有看到这个订阅闭包是在什么时候调用的,也就不清楚事件序列是什么时候产生的。那我们只能继续往下看,看看事件的监听过程。

二、事件的监听

通过上一篇,我们已经知道了,我们可以通过调用ObservableType的subscribe方法对事件进行监听。下面我们就来看看这个subscribe方法干了些什么。

根据上面的分析,我们知道,我们通过create创建了一个AnonymousObservable,这正是一个ObservableType。之后我们调用了它的subscribe方法, 但并不是协议定义的:

func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element

而是协议扩展中提供的一个方法:

    public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

这个方法主要就是创建了一个AnonymousObserver,并通过self.asObservable().subscribe(observer)订阅了这个AnonymousObserver。由于这里的self是一个AnonymousObservable,而它是继承于Observable的,所以这个asObservable()实际是返回了self。

所以我们大致了解了这个调用链条的第二步干了些什么:通过传入的订阅闭包,创建了一个AnonymousObserver并传给了第一步创建的AnonymousObservable的subscribe方法。那我们接下来就要看一下这个subscrib方法又干了些什么。

为了搞清楚,我们就得进入到这个AnonymousObservable的源码。它是继承Producer类的,而Producer类又是继承于Observable类的,也就是这样:

AnonymousObservable => Producer => Observable

为了搞清楚这个AnonymousObservable是个什么,我们就得先简单介绍一下它的父类Producer。

Producer

Producer 是Observabled的一个子类,它是一个抽象类。从名称来看,我们大概可以猜测,它是负责生产事件序列的类。

class Producer<Element>: Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
}

Producer继承Observable,并重写了subscribe方法。我们主要关注这么几行:

    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer)
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

这里首先创建了一个SinkDisposer对象,调用自己的run方法,将这个SinkDisposer以及subscribe方法的参数observer传给run方法,这个SinkDisposer以及run方法的返回值主要是用于资源释放相关的,不属于这篇文章要讨论的重点。
看到这里,我们知道了,通过调用subscribe,我们会调用到AnonymousObservable的run方法,run方法就是核心逻辑的起点,那下面我们就深入到AnonymousObservable的源码里面看一下。

AnonymousObservable

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self.subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

首先我们看到AnonymousObservable持有了第一步create中创建的订阅闭包。

其次,AnonymousObservable作为Producer的子类,重写了run方法,在这个run方法中,创建了一个AnonymousObservableSink,并调用了这个AnonymousObservableSink的run方法。

这里又涉及到了另一个概念,叫做Sink。这个Sink其实也是RxSwift里一个比较重要的概念,它主要是作为一个桥梁,连接了Observable和Observer,每个Observable类型,都会通过一个Sink来与它的Observer连接并传递事件。

AnonymousObservableSink

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
  
    ......

    func run(_ parent: Parent) -> Disposable {
        parent.subscribeHandler(AnyObserver(self))
    }
}

我们看到,这个run方法主要干了两件事。1,使用self作为参数,创建了一个AnyObserver,2.调用了parent(即AnonymousObservable)的subscribeHandler。至此,我们就知道了,第一步里面创建的订阅闭包是何时调用的了:

在AnonymousObservable.subscribe方法中通过调用self.run,最终调用AnonymousObservableSink.run方法,在run方法中调用了订阅闭包,而在这个订阅闭包中,我们便产生了一些列的事件。

其实,RxSwift中的其他一些Observable,也大部分都是类似的流程,这个我们会在后续文章中一一分析。

接下来还有一个问题,事件产生了,也被监听了,那事件是如何传递到我们的第二步提供的监听闭包的呢?我们之前说了,Observable和Observer通过Sink进行连接,也就是通过Sink进行了事件的传递,那我们就来看一下这个Sink。

Sink

class Sink<Observer: ObserverType>: Disposable {
    fileprivate let observer: Observer
    fileprivate let cancel: Cancelable
    private let disposed = AtomicInt(0)

    #if DEBUG
        private let synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self.observer = observer
        self.cancel = cancel
    }

    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self.disposed, 1) {
            return
        }
        self.observer.on(event)
    }

    final func forwarder() -> SinkForward<Observer> {
        SinkForward(forward: self)
    }

    final var isDisposed: Bool {
        isFlagSet(self.disposed, 1)
    }

    func dispose() {
        fetchOr(self.disposed, 1)
        self.cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}

我们看到,Sink持有了一个Observer,并在自己的核心方法forwardOn中会将事件传递给所持有的这个observer,这个forwardOn是个final方法,会在不同的子类中被调用。下面我们就再来看一下,前述代码中所产生的AnonymousObservableSink

AnonymousObservableSink

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = AnonymousObservable<Element>

    // state
    private let isStopped = AtomicInt(0)

    #if DEBUG
        private let synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self.isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        parent.subscribeHandler(AnyObserver(self))
    }
}

之前我们已经分析过,这个AnonymousObservableSink的run方法中使用self作为参数,创建了一个AnyObserver,并调用了订阅闭包,将这个AnyObserver传给了订阅闭包。所以当订阅闭包中调用observer.onNext和observer.onComplete的时候,实际会调用的这个AnyObserver的on方法,那这个on方法又是怎么传递到后面的监听闭包的呢?

这个得看看AnyObserver的源码:

public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<Element> {
        self
    }
}

我们看到,前面的AnyObserver(self)(注:这里的self也就是前述的AnonymousObservableSink),实际上会把参数self.on传给这个AnyObserver的observer闭包,而在AnyObserver的on方法中,直接调用了这个observer闭包,于是就将事件传递到了AnonymousObservableSink的on方法中。而
AnonymousObservableSink的on方法会调用forwardOn方法,forwardOn方法又会调用AnonymousObservableSink所持有的observer的on方法:

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    ...
    func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self.isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    ...
}

class Sink<Observer: ObserverType>: Disposable {
    ...
    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self.disposed, 1) {
            return
        }
        self.observer.on(event)
    }
    ...
}

而这个self.observer,正是subscribe方法中生成的AnonymousObserver,它的on方法便会把事件一一传递到监听闭包中。

三、总结

至此,我们已经彻底搞清楚了事件的产生、传递以及监听。
1.Observable.create:创建AnonymousObservable
2.Observable.subscribe(onNext:onError:onCompleted:onDisposed:): 创建AnonymousObserver,并调用AnonymousObservable.subscribe(_ observer:)
3.AnonymousObservable.subscribe(_ observer:):调用Producer.subscribe(_ observer:)
4.Producer.subscribe(_ observer:): 调用AnonymousObservable.run
5.AnonymousObservable.run: 创建AnonymousObservableSink, AnonymousObservableSink.observer = AnonymousObserver, 并调用AnonymousObservableSink.run
6.AnonymousObservableSink.run:创建AnyObserver,调用AnonymousObserver.subscribeHandler,传入AnyObserver
7.AnonymousObserver.subscribeHandler:调用AnyObserver.on
8.AnyObserver.on:调用self.observer,即AnonymousObservableSink.on
9.AnonymousObservableSink.on: 调用Sink.forwardOn
10.Sink.forwardOn: 调用AnonymousObservableSink.observer.on,即AnonymousObserver.on
11.AnonymousObserver.on: 调用监听闭包

码字不易,若有错漏,欢迎指正。若有助益,烦请点赞。^ _ ^

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,036评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,046评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,411评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,622评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,661评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,521评论 1 304
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,288评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,200评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,644评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,837评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,953评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,673评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,281评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,889评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,011评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,119评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,901评论 2 355

推荐阅读更多精彩内容