RXSwift-核心逻辑

基本概念


要想充分理解RXSwift核心逻辑,那么首先必须要知道RXSwift里包含哪几个角色,以及它们的职责。

被观察者(Observable)

它主要负责产生事件,实质上就是一个可被监听的序列(Sequence)。

Observable<T>这个类就是Rx框架的基础,我们称它为可观察序列

Observable<T> ==异步产生==> event(element : T)

被观察者(Observable)类继承关系

观察者(Observer)

它主要负责监听事件然后对这个事件做出响应,或者说任何响应事件的行为都是观察者

观察者(Observer)类继承关系

订阅者(Subscriber)

事件的最终处理者

管道(Sink)

Observer 和 Observable 沟通的桥梁:Sink相当与一个加工者,可以将源事件流转换成一个新的事件流,如果将事件流比作水流,事件的传递过程比作水管,那么Sink就相当于水管中的一个转换头。

管道(Sink)类继承关系

代码解析


接下来我们结合以下很简单的代码来分析,逐步揭开RXSwift的神秘面纱。

//1:创建序列
 let ob = Observable<Any>.create { (observer) -> Disposable in
            // 3:发送信号
            obserber.onNext("测试OnNext")
            obserber.onCompleted()
            obserber.onError(NSError.init(domain: "error!", code: 000, userInfo: nil))
            return Disposables.create()
 
//2:订阅信息
 let _ = ob.subscribe(onNext: { (text) in
            print("订阅到:\(text)")    //text从哪里来的?
        }, onError: { (error) in
            print("error: \(error)")    //error从哪里来的?
        }, onCompleted: {
            print("完成")
        }) {
            print("销毁")
        }

在这里我们主要关注下
create 闭包什么时候执行,
subscribe 闭包又是什么时候执行的
也就是3->2这条线

Create方法

 public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }

我们看到create 函数, 返回了一个AnonymousObservable实例,接着我们进入AnonymousObservable看它里面具体内容

// AnonymousObservable  Class
final fileprivate class AnonymousObservable<Element> : Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        _subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

在这里我们再次回顾下Observable集成体系
(父类)
ObservableConvertibleType(完全的抽象)
|
ObservableType( 处理subscribe)
|
Observable(处理 asObservable)
|
Producer(重载subscribe)
|
AnonymousObservable(处理run)
(子类)

PS:由上面我们能看出,如果想自定义Observable通常只需要继承Producer, 并实现run方法就可以了。

在这里我们看到Run方法里面涉及了类AnonymousObservableSink,它作为Observer 和 Observable的衔接的桥梁我们看到它本身遵守ObseverType协议,与此同时实现了run方法

那也就是说,sink从某种程度来说也是Observable
通过sink就可以完成从Observable到Obsever的转变。

总结下create方法主要工作:

  • 创建AnonymousObservable对象,
  • _subscribeHandler保存了闭包
  • 写了run方法在内部创建了AnonymousObservableSink

Subscribe方法

 public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }

上述代码只是入口,下面的才是核心(Producer里面)

// Producer Class
  override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

这里面我们看到了Producer调用了自己的run方法,而AnonymousObservableSink作为其子类重写了该方法,我们先去看下子类是如何写的。

// AnonymousObservable Class
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }

之前提到过AnonymousObservableSink注意Sink是持有Observer,从这也可以看出来

Observerablerun方法触发Sinkrun方法

接下来就要关注AnonymousObservableSink方法。

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }

在这里我们再一次见到了subscribeHandler,这个subscribeHandler就是之前最开始的闭包!

 Observable<String>.create { observer -> Disposable in
            observer.onNext("测试")
            return Disposables.create()
        }

至此我们知道了create闭包是什么执行的了,接下来我们自然的把目光锁定到实体类AnyObserver,看看它里面究竟是如何实现的。

public struct AnyObserver<Element> : ObserverType {
    /// The type of elements in sequence that observer can observe.
    public typealias E = Element
    
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}

在这里我们看到其内部的Observer其实是一个EventHandler,并且在初始化的时候把外部传过来的AnonymousObservableSink.on赋值给了这个Observer,也就是说observer.onNext("测试")最终会触发AnonymousObservableSink.on事件

接着我们看下AnonymousObservableSinkon的具体实现

// AnonymousObservableSink.on

    func on(_ event: Event<E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if _isStopped == 1 {
                return
            }
            forwardOn(event)
        case .error, .completed:
            if AtomicCompareAndSwap(0, 1, &_isStopped) {
                forwardOn(event)
                dispose()
            }
        }
    }

AnonymousObservableSinkon会调用其内部的forwardOn

// Sink.forwardOn
    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        if _disposed {
            return
        }
        _observer.on(event)
    }

在这里还记得这个_observer的实体吗,看到下面代码你一定会回忆起来,

  public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            return self.asObservable().subscribe(observer)
    }

对,没错,这个_observer就是我们最初传进来的subscribe闭包的实体类AnonymousObserver~!

接着我们继续看下这个AnonymousObserveron方法又是如何实现的

继承关系:AnonymousObserver->ObserverBase->ObserverType

// ObserverBase.on
    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if _isStopped == 0 {
                onCore(event)
            }
        case .error, .completed:
            if AtomicCompareAndSwap(0, 1, &_isStopped) {
                onCore(event)
            }
        }
    }

ObserverBase.on会触发onCore方法,看下子类的实现

// AnonymousObserver.onCore
    override func onCore(_ event: Event<Element>) {
        return _eventHandler(event)
    }

这里的_eventHandler还记得吗?它就是最初传进来的订阅闭包即:

            .subscribe { event in
               print(event.element)
        }

总结一下


Observable-Create阶段:

  • 创建AnonymousObservable
  • 保存闭包(subscribeHandler)

Observable-Subscribe阶段:

  • 创建AnonymousObserver
  • 调用自己(AnonymousObservable)的run方法:(AnonymousObserver作为参数)
    AnonymousObservable重写了run,它在方法里面创建了AnonymousObservableSink并在sink里保存了这个刚创建的AnonymousObserver
  • 调用AnonymousObservableSinkrunrun方法里用到AnonymousObservable_subscribeHandler并传入AnyObserver,这里AnonymousObservableSink.on赋值给了AnyObserver内部的EventHandler成员observer

执行阶段:

AnyObserver.on ----> AnonymousObservableSink.on ----> AnonymousObservableSink.forwardOn ----> AnonymousObserver.on---> AnonymousObserver.onCore ----> _eventHandler(event)

PS:

可以看出Sink在不同的阶段有着不同的身份

  • Sink充当Observable
    Obsevable.subscribe ---> Obsevable.run ----> Sink.run
    这个过程通过Sink建立Obsevable和 Observer的联系
  • Sink充当Observer
    AnyObserver.on ----> Sink.on ----> Observer.on ---> Observer.OnCore
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354