RxSwift核心源码探索

响应式编程是RxSwift的核心思想,统一思想快速开发。同样在ReactiveX系列中其他语言也都使用了这一思想,当我们掌握了RxSwift运用,那么RxJavaRxPHPRxJs等等都能够快速上手。那么RxSwift是如何响应的呢?下面就来看一下源码都做了哪些事情。

Rx.png

RxSwift核心流程

1、创建序列
2、订阅序列
3、发送信号

先看一下是如何使用的,代码如下:

//1、创建序列
let obs = Observable<Any>.create { (observer) -> Disposable in
    //3、发送信号
    observer.onNext("我是一条消息")
    return Disposables.create()
}
//2、订阅序列
obs.subscribe(onNext: { (val) in
    //4、序列监听
    print("onNext:\(val)")
}).disposed(by: disposeBag)//5、打包待销毁
  • 通过Observablecreate创建序列,在create闭包内调用onNext方法实现信号发送
  • 调用subscribe方法订阅序列,并实现subscribe的参数闭包onNext,在闭包内监听信号
  • 最后通过disposed对序列打包等待销毁

看到代码可能会疑惑,消息是如何发给订阅者的。按正常逻辑,订阅后才能收到信息,那么可以猜测,在成为订阅者并布置好监听后,订阅者向序列发送了一条消息,通知可观察序列可以发信号了。大致可整理为如下流程:

flow.png

以上只是猜测,下面来看具体的代码实现。在RxSwift中同名的方法有很多,很难做到直接定位代码位置,我们可以通过command+点击配合断点一步步找到对应方法的底层实现。

1、创建序列

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
    return AnonymousObservable(subscribe)
}

该方法是对ObservableType协议的扩展,最外层实现的闭包subscribe则作为参数传入AnonymousObservable,并返回AnonymousObservable对象,继续执行追踪到AnonymousObservable类,如下:

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    let _subscribeHandler: SubscribeHandler
    //初始化时保存闭包
    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

首先看一下AnonymousObservable的继承链如下:

  • AnonymousObservable -> Product -> ObservableType -> ObservableConvertible

在父类Product中好像有我没需要的方法:

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
      
}

像是订阅方法,但不是,但也有着千丝万缕的联系。到此为止,序列创建部分已完成,可总结如下:

  • create方法实现参数闭包,内部创建AnonymousObservable对象
  • AnonymousObservable对象保存了外界实现的闭包
  • Producer中的subscribe方法,应该是订阅后将要调用的方法

记住以上三点,后序逐一对比。

2、订阅序列

根据方法名找到subscribe的实现,command+点击直接进入实现,源码如下(取部分源码):

public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
    -> Disposable {
        //此处省略若干行代码
        let observer = AnonymousObserver<E> { event in
            switch event {
            case .next(let value):
                onNext?(value)
            case .error(let error):
                if let onError = onError {
                    onError(error)
                }
                else {
                    Hooks.defaultErrorHandler(callStack, error)
                }
                disposable.dispose()
            case .completed:
                onCompleted?()
                disposable.dispose()
            }
        }
        return Disposables.create(
            self.asObservable().subscribe(observer),
            disposable
        )
}

该方法是对ObservableType的拓展。在方法内部已经出现对观察者的定义,AnonymousObserver类型的闭包observer

源码分析:

2.1、observer内部调用的外部(应用层)实现的闭包,由此看出所有信号是由此发出,eventobserver的参数,不难看出,observer闭包也是在其他地方调用,传入带有信号值的event参数

2.2、observer被当做参数传入到subscribe中,而observer的调用必然是在subscribe中实现的

self.asObservable().subscribe(observer)

2.3、self.asObservable()该方法返回本身,保证协议的一致性,方法如下:

public class Observable<Element> : ObservableType {
    // 省去代码若干
    public func asObservable() -> Observable<E> {
        return self
    }
}

2.4、继续断点执行找到subscribe方法,正是上面所提到的Producer中的方法,方法如下:

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    // 省去代码若干
    return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
}

2.5、紧接着我的observer观察者被传入到run中,上面说到该观察者一定会被调用,继续深入

let sinkAndSubscription = self.run(observer, cancel: disposer)

2.6、继续断点执行,发现self.run的调用,调用的是AnonymousObservable中的run方法,代码如下:

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

2.7、此处就是创建序列时的AnonymousObservable类。在run方法类创建了sink对象,在初始化时传入了我们上面所说的观察者,记住sink保存了观察者observer闭包,并且调用了sink.run(self)方法,传入的是创建时产生的可观察序列observable闭包对象,深入run

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>
          // 省去代码若干
    // 此处向父类Sink初始化了observer对象
    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}

2.8、拨开云雾见天日,此处parentlet subscription = sink.run(self)传入,self即为创建序列create方法返回的observable对象,而_subscribeHandler是创建序列所保存的闭包,此时我们的闭包就被调用了,被调用闭包如下:

   let obs = Observable<Any>.create { (observer) -> Disposable in
    //3、发送消息
    observer.onNext("我是一条消息")
    return Disposables.create()
}

发送信号的闭包被调用,接下来就是信号发送了。

3、发送信号

根据上面步骤继续探索,代码已经执行到我们的业务层。
在信号发送闭包中通常调用一下三种方法,用来发送信号。如下:

  • observer.onNext("我是一条消息") 信号发送
  • observer.onCompleted() 序列完成,完成后序列将被释放
  • observer.onError(error) 序列出错中断,序列不可继续使用,被释放

以上三个方法为ObserverType的拓展方法

extension ObserverType {
    /// Convenience method equivalent to `on(.next(element: E))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
  • E表示了一个泛型信号量,可表示任意类型的信号
  • .next(element)是一个带泛型参数的枚举,管理了三种类型事件的消息传递。如下:
public enum Event<Element> {
    /// Next element is produced.
    case next(Element)
    /// Sequence terminated with an error.
    case error(Swift.Error)
    /// Sequence completed successfully.
    case completed
}

on这是AnonymousObservableSink中的方法,代码如下:

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>
    // 代码省略若干行
    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(&self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(&self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
}

内部根据Event枚举不同成员变量做不同的信号发送,信号发送调用了forwardOn方法。方法实现如下:

class Sink<O : ObserverType> : Disposable {
    init(observer: O, cancel: Cancelable) {
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        if isFlagSet(&self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
}

代码有些长只保留了核心部分,SinkAnonymousObservableSink的父类,见上文2.7处描述_observer即是订阅中在内部产生的AnonymousObserver对象,而该对象调用了on方法并传递了信号。on方法所在位置如下:

  • AnonymousObserver -> ObserverBase -> on()
class ObserverBase<ElementType> : Disposable, ObserverType {
    typealias E = ElementType
    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(&self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(&self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
}

在方法内部又掉用了self.onCore(event),此时该方法在AnonymousObserver中实现,代码如下:

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    typealias EventHandler = (Event<Element>) -> Void
    private let _eventHandler : EventHandler
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}

源码分析

  • 此处通过_eventHandler来发送信号,_eventHandler是从哪来的呢?逆推onCore调用者是observer,而observer是订阅时在内部创建的,被一层层传入到此
  • 而在observer初始化时即被保存为
    _eventHandler_eventHandler调用即调用了订阅时创建的observer闭包,进而信号又通过闭包内的闭包传出到业务层
//2、订阅序列
obs.subscribe(onNext: { (val) in
    print("onNext:\(val)")
}).disposed(by: disposeBag)

天呢!容我喘口气~~

至此我们响应式编程的创建、订阅、发送、接收等流程就已完成。整个流程会觉得很复杂,但它统一了所有事件的创建与监听,统一思想快速开发,今后的开发流程就是:

  • 创建序列 -> 订阅序列 -> 发送序列 -> 响应序列

sink在Rx中充当管理者,管理序列,观察者和销毁者,将序列发送至观察者,并管理销毁者适时消耗序列,回收资源。

最后附上两张总结图:

Observable.png
Observable.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容

  • 在正文开始之前的最后,放上GitHub链接和引入依赖的gradle代码: Github: https://gith...
    苏苏说zz阅读 677评论 0 2
  • 在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github: https://...
    松江野人阅读 5,885评论 0 1
  • 发现 关注 消息 RxSwift入坑解读-你所需要知道的各种概念 沸沸腾关注 2016.11.27 19:11*字...
    枫叶1234阅读 2,788评论 0 2
  • 今天母亲节,醒来后刷屏,满屏都是母亲节祝福。我不想发朋友圈,母亲不会微信,只有一部能通话的手机。她那一代人,含蓄内...
    浮莲阅读 567评论 3 7
  • 欧阳文修是一名汽车司机,八十年代,五一节前一天,车队开了一次大干红五月的动员大会。 主要讲安全生产,多拉快跑,完成...
    讲诚信的人阅读 874评论 10 10