RxSwift(八)调度者Schedulers 核心逻辑解析

前言

调度者(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。本文将探索Schedulers是如何控制线程或队列的过程的实现原理,篇幅较长,感兴趣的同学可以一步步跟着探索下去。

提出疑问

如下代码,在异步队列子线程执行rx的点击监听,然后打印当前线程,竟然打印出来的是点击了按钮 --- <NSThread: 0x600000c2d980>{number = 1, name = main}当前线程是主线程,这是怎么做到的呢?

DispatchQueue.global().async {
    self.actionBtn.rx.tap
        .subscribe(onNext: { () in
            print("点击了按钮 --- \(Thread.current)")
        })
        .disposed(by: self.bag)
}

开始探索

一样的按住cmd点击进入tap里,可以看到返回的是controlEvent(.touchUpInside),再进去看controlEvent

public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
    let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
            MainScheduler.ensureRunningOnMainThread()
            //省略
}
            
  public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
        #if !os(Linux) // isMainThread is not implemented in Linux Foundation
            guard Thread.isMainThread else {
                rxFatalError(errorMessage ?? "Running on background thread.")
            }
        #endif
    }

可以发现这里有判断必须要在主线程执行,但是还没看到调度的代码,然后发现上面的第一个方法里返回的是一个ControlEvent,进入ControlEvent看一下

 public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
        self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
    }

可以看到subscribeOn订阅在ConcurrentMainScheduler.instance主线程上,这时可以清楚的明白是切换到了主线程,不过还是需要明白subscribeOn到底干了些什么呢?

final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

到这里很明确了,它是继承ObservableTypeProducer的,在前几篇文章分析过的RxSwift的核心逻辑,其实这里切换了一下,整个的过程是:
订阅序列->创建observer->Producer里的SubscribeOnrun->SubscribeOnSink.runon->调用observer.on执行sinkon->sink里保存的eventHandler响应event事件->最后调用外界的subscriber的闭包

对于上面的流程不了解的可以看之前的文章,可以断点每个步骤,了解核心流程。

调度者Schedulers核心源码

写如下代码,使用observeOn(SerialDispatchQueueScheduler.init(qos: .background)),这里一般是执行串行队列的需要使用到的,里面是封装了GCD的队列 ,我们点击observeOn进去看看里面的代码

    Observable.of(1,2,3,4,5)
            .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
            .subscribe{print("observeOn",$0,Thread.current)}
            .disposed(by: self.bag)
  1. 很明显,我们在外面调用的是SerialDispatchQueueScheduler,所以这里会返回一个ObserveOnSerialDispatchQueue的新序列,当然是要点进去看看
 public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
            if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
            }
            else {
                return ObserveOn(source: self.asObservable(), scheduler: scheduler)
            }
    }
  1. 这里把外界传递过来的SerialDispatchQueueSchedulersource源序列都保存起来了,同时还继承了Producer,实现了run方法
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source

        #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
            _ = increment(_numberOfSerialDispatchQueueObservables)
        #endif
    }

    //省略
}
  1. 外界调用subscribe时,就会走这里的run方法(我这里省略了走到这里的原因,因为在之前探索核心逻辑已经探索的很清楚了)
    我们看到,self.source.subscribe(sink)是订阅源序列,然后传递一个ObserveOnSerialDispatchQueueSink的管子进去

     override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
    
  2. cmd然后点击self.source.subscribe,弹出一堆可进入的方法,这里肯定是会执行Producersubscribe,所以选择它进入,可以在这处打个断点确认一下。

  1. 看到这里CurrentThreadScheduler.isScheduleRequired,根据这个标识符CurrentThreadScheduler.isScheduleRequired去找执行的方法
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    if !CurrentThreadScheduler.isScheduleRequired {   // 后续走这里
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
        return disposer
    }
    else {    // 第一次走这里
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        }
    }
}
  1. 我们点进CurrentThreadScheduler.instance.schedule,可以看到其实这里只是多了一个标记isScheduleRequiredfalse的步骤
    public func schedule<StateType>(_ state: action: ) -> Disposable {

    if CurrentThreadScheduler.isScheduleRequired {
      // 已经标记,就置false
        CurrentThreadScheduler.isScheduleRequired = false
     // 外界闭包调用执行
        let disposable = action(state)
      // 延迟销毁 
        defer {
            CurrentThreadScheduler.isScheduleRequired = true
            CurrentThreadScheduler.queue = nil
        }
      // 先省略。。。
        return disposable
    }
     // 先省略。。。
    return scheduledItem
}
  1. 再看到self.run(observer, cancel: disposer),点击进去,选择ObservableSequence,又会回到第三步的run方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
       let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
       let subscription = sink.run()
       return (sink: sink, subscription: subscription)
   }

点击进入到 ObservableSequenceSink->run

  func run() -> Disposable {
       return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
           var mutableIterator = iterator
           if let next = mutableIterator.next() {
               self.forwardOn(.next(next))
               recurse(mutableIterator)
           }
           else {
               self.forwardOn(.completed)
               self.dispose()
           }
       }
   }

继续点击进入ImmediateSchedulerType里的scheduleRecursive,在这里可以看到,创建了RecursiveImmediateScheduler,来保存外界传递过来的闭包,而且调用了recursiveScheduler.schedule(state)并执行闭包

public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
       let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
       
       recursiveScheduler.schedule(state)
       
       return Disposables.create(with: recursiveScheduler.dispose)
   }
  1. 点击进入recursiveScheduler.schedule(state),在这个递归调度者里
  • 这里因为在递归环境,加了一把锁递归锁,保障安全
  • 通过保护,获取action执行,也就是外界传给递归调度者的闭包任务
  • 因为加了一把递归锁,所以保证了RxSwift的调度是有顺序的
func schedule(_ state: State) {
   var scheduleState: ScheduleState = .initial
   let d = self._scheduler.schedule(state) { state -> Disposable in     
       // 这里因为在递归环境,加了一把锁递归锁,保障安全   
       let action = self._lock.calculateLocked { () -> Action? in
                return self._action
       }
       
       if let action = action {
           action(state, self.schedule)
       }
       
       return Disposables.create()
   }
 // 篇幅,先省略,大家自行查阅  
}
  1. 点击进入self._scheduler.schedule(state),之前我们在外界使用的是SerialDispatchQueueScheduler,所以进入它里面的schedule
 public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
       return self.scheduleInternal(state, action: action)
   }

self.scheduleInternal(state, action: action)-> scheduleInternal->self.configuration.schedule,可以看到如下代码,这里异步执行了cancel.setDisposable(action(state))

func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
       let cancel = SingleAssignmentDisposable()

       self.queue.async {
           if cancel.isDisposed {
               return
           }
           cancel.setDisposable(action(state))
       }

       return cancel
   }

其实这里执行的就是第8步这里的RecursiveImmediateScheduler里的

 if let action = action {
           action(state, self.schedule)
       }
  1. 再回到第7步ObservableSequenceSink里的run,上面的action调用尾随闭包,即调用了self.forwardOn,点进去发现其实调用的就是self._observer.on. 这里在之前的核心逻辑分析过
     func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
  1. 然后会调用到第三步里创建的ObserveOnSerialDispatchQueueSinkonCore

        override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }
    

    看到这里发现很熟悉,继续点进去

      public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.scheduleInternal(state, action: action)
    }
    
    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.configuration.schedule(state, action: action)
    }
    

    最终又来到了这里,在当前队列异步执行任务,调用action(state),
    所以,我们在最外界调用on(Event)响应的时候,就会在外界指定的schedule里执行闭包了

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
    
        self.queue.async {
            if cancel.isDisposed {
                return
            }
            cancel.setDisposable(action(state))
        }
    
        return cancel
    }
    

    总结

    其实就是创建了一个中间序列ObserveOnSerialDispatchQueueSink,继承自Producer,实现了run方法,保存了源序列传递进来的observerschedule。在外界订阅的时候会执行一套producer的run,还会执行sink.run,然后由传递进来的observer响应on(event)

所以其实就是两层序列订阅响应,第二层sink就相当于源序列的observer观察者,其实和RxSwift核心逻辑那里非常相似,只是多了个中间的序列来处理线程的调度。掌握了我之前写过的核心逻辑,其实后面都是类似的,乍看有难度,上手探索之后发现很好玩,很简单,很有意思!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容