03. RxSwift源码解读:Sink 和 Queue Scheduler

今天要解读的源码是队列调度,同时探讨下Sink的设计思想,我们可以指定订阅和序列发送操作在哪个队列上执行,以observe(on:)subscribe(on:)两个操作符为例,探究一下内部原理。
observe指定在哪个队列接受序列,而subscribe是指定创建序列的闭包在哪个队列执行。下面给出一个例子:

示例

        Observable<Int>.create { (anyObserver) -> Disposable in
            print("Subscribe Thread:", Thread.current)
            anyObserver.onNext(1)
            anyObserver.onCompleted()
            return Disposables.create()
        }
        .observe(on: MainScheduler.instance)
        .subscribe(on: SerialDispatchQueueScheduler(qos: .background))
        .subscribe(onNext: { ele in
            print("Observe Thread:", Thread.current)
            print(ele)
        }, onDisposed: {
            print("disposed2")
        })
        .disposed(by: bag)

打印结果:

Subscribe Thread: <NSThread: 0x600003c45380>{number = 5, name = (null)}
Observe Thread: <NSThread: 0x600003c047c0>{number = 1, name = main}
1
disposed2

可见因为指定subscribeSerialDispatchQueueScheduler(串行队列执行),所以第一条打印的线程不是主线程,而observe指定在主队列执行,所以第二条打印的线程是主线程。

源码解读

我们看一下.observe(on: MainScheduler.instance)的内部实现,在ObservableType的extension中可以找到代码:

    public func observe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
            return ObserveOn(source: self.asObservable(), scheduler: scheduler)
        }

        return ObserveOnSerialDispatchQueue(source: self.asObservable(),
                                            scheduler: serialScheduler)
    }

如果是同步队列调度者,则返回ObserveOnSerialDispatchQueue对象,将当前Observablescheduler传入,否则返回ObserveOn对象,这两个类都是Observable,它们都继承了Producer,通过这种方式实现链式调用,可以继续调用其他操作符,每个操作符都内部都有对应的ObservableType实现类,它们一般会重写run方法,而且还有对应的Sink类,用来实现操作符的功能,比如ObserveOn类有一个对应的ObserveOnSink类,ObserveOnSink有自己的run方法,同样ObserveOnSerialDispatchQueue类有对应的ObserveOnSerialDispatchQueueSink类,这些Sink类继承自ObserverBase类,最终都实现了ObserverType,能发送序列。
在看看subscribe(on:)

    public func subscribe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        SubscribeOn(source: self, scheduler: scheduler)
    }

同理返回一个SubscribeOn对象,将当前对象和scheduler传入。SubscribeOn继承自Producer,对应有一个SubscribeOnSink类,它继承自Sink类;因为它需要使用SinkforwardOn方法。

当我们调用.subscribe(onNext方法时,程序依然会走到Producersubscribe方法,这个流程没有变,然后调用当前对象run方法,因为当前对象已经再是AnonymousObservable对象了,而是 SubscribeOn, 而且SubscribeOn重写了run方法,所以会调用SubscribeOnrun方法,这是面向对象的多态,然后我们进入SubscribeOnrun方法看看:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

这里依然会创建sink对象,但是不再是AnonymousObservableSink,而是SubscribeOnSink,它来完成这个操作符的功能;这个Sink保存了SubscribeOn对象,observercancelobserver依然还是刚开始创建的AnonymousObserver对象,然后调用了SubscribeOnSinkrun方法:

     func run() -> Disposable {
        let disposeEverything = SerialDisposable()
        let cancelSchedule = SingleAssignmentDisposable()
        
        disposeEverything.disposable = cancelSchedule
        
        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

        cancelSchedule.setDisposable(disposeSchedule)
    
        return disposeEverything
    }

SerialDisposable: 表示一个可释放资源,其底层可释放资源可被另一个可释放资源替换,从而导致前一个底层可释放资源的自动释放。
SingleAssignmentDisposable: 表示只允许对其底层可释放资源进行一次赋值的可释放资源。如果已经设置了底层可释放资源,那么将来尝试设置底层可释放资源将抛出异常。
ScheduledDisposable:释放资源时会在对应的队列中调度执行。
关键代码:self.parent.scheduler.schedule, 这里parent是SubscribeOn对象,scheduler是调度者SerialDispatchQueueScheduler,然后调用它的schedule方法,去看看:

    public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        self.scheduleInternal(state, action: action)
    }

    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        self.configuration.schedule(state, action: action)
    }

调到self.configuration.schedule:

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()

        self.queue.async {
            if cancel.isDisposed {
                return
            }


            cancel.setDisposable(action(state))
        }

        return cancel
    }

终于看到队列调用了self.queue.async 将action异步派发到quene中执行,而queue是在创建SerialDispatchQueueScheduler时创建的, SerialDispatchQueueScheduler明显会创建串行队列。这里有个小细节,如果资源已经被释放了则不执行。
cancel.setDisposable(action(state)) 设置释放资源对象,如果已经设置则抛出异常。
action 在哪里, 回到action定义的地方?:

        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

接着执行self.parent.source.subscribe(self), 这个source是subscibe(:on,的调用者,即在链式调用序列中,先调用observe(:on)再调用subscibe(:on) 所以sourceObserveOnSerialDispatchQueue对象。

很多observable都会保存它的上一个observable,即source, 以此实现链式调用
ObserveOnSerialDispatchQueue对象现在要执行subscribe方法,进入看看:

    return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }

又到这里来了,然又进入run,这里会进入ObserveOnSerialDispatchQueue的run方法:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }

这个地方代码不太一样了,不再是调用sinkrun,因为ObserveOnSerialDispatchQueueSink只能处理observe而不能处理subscribe,只能转发给source处理subscribe,所以调用self.source.subscribe, 这里的source是最初创建的未变形的AnonymousObservable对象,这相当于绕了一圈又回到了原来的流程,调用subscribe(sink),但是这里的sink是ObserveOnSerialDispatchQueueSink对象,它是Observer,看看这个会怎么影响后面的流程:

if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }

这里调到了前面的if分支:因为isScheduleRequired用来标示在当前线程是否正在通过schedule执行action,如果action内部又在当前线程执行了subscribe,则无需再调度到当前线程执行,即CurrentThreadScheduler.instance.schedule不会也没必要嵌套调用。

继续调用AnonymousObservablerun方法,然后调用创建AnonymousObservableSink,调用run,最后执行subscribeHandler(AnyObserver(self)),这里回到了熟悉的流程,不过因为ObserverObserveOnSerialDispatchQueueSink对象,所以发送序列时会调用ObserveOnSerialDispatchQueueSinkonCore方法:

    override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }

这里会做队列调度,在对应的队列(主队列)中执行action :

        self.cachedScheduleLambda = { pair in
            guard !cancel.isDisposed else { return Disposables.create() }

            pair.sink.observer.on(pair.event)

            if pair.event.isStopEvent {
                pair.sink.dispose()
            }

            return Disposables.create()
        }

接着调用observer.on(event),这里的sink就是当前这个ObserveOnSerialDispatchQueueSink对象,而observer是在创建ObserveOnSerialDispatchQueueSink对象时传入的SubscribeOnSinkSubscribeOnSinkobserver对象是AnonymousObserver,因为它们都遵循了ObserverType,所以都可以作为ObserverType被其他Sink持有。

各类Sink遵循了ObserverType又持有了ObserverType,这样Sink之间可以相互持有,调用协议方法时又可以调用observer的相同协议方法,这样可以一直调用下去,跟装饰器模式很像,当我需要在现有操作符基础上再增加操作,无需修改原有操作符的逻辑代码,通过扩展方式增加新的功能,不过这里的设计更复杂。

我们继续看代码,SubscribeOnSink实现了on方法,进去看看:

    func on(_ event: Event<Element>) {
        self.forwardOn(event)
        
        if event.isStopEvent {
            self.dispose()
        }
    }

然后进入Sink类的forwardOn:在forwardOn又又调用了observer.on(event), 这里observerAnonymousObserver,最终调用AnonymousObserveronCore,完成最后一击:
调用self.eventHandler(event)

如果将observe(:on)subscribe(:on) 互换位置,subscribe(:on)先调用,observe(:on)后调用结果会怎么样? 结果依然不变,但是内部调用流程会不一样,大家可以自己试试!

Scheduler

上面的例子中已经见到了两个Scheduler:

  • SerialDispatchQueueScheduler : 串行队列调度者,
  • MainScheduler: 主队列调度者,它继承自SerialDispatchQueueScheduler
    还有一个并行队列:
  • ConcurrentDispatchQueueScheduler: 并行队列调度者
    每个调度者都会维护自己的队列,MainScheduler维护一个主队列DispatchQueue.mainSerialDispatchQueueScheduler维护一个串行队列,ConcurrentDispatchQueueScheduler维护一个并行队列; 当需要执行action时,调度对应的队列执行action即可。
    这三个调度者都遵循了SchedulerType协议,SchedulerType继承ImmediateSchedulerType, ImmediateSchedulerType有一个协议方法:
    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable用来执行队列调度。

Scheduler包含一个DispatchQueueConfiguration对象,DispatchQueueConfiguration对象持有队列,当执行调度方法时,会转发到这个类的schedule进行实际的调度。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容