RxSwift-scheduler调度者

调度环境

CurrentThreadScheduler:当前线程的Scheduler.(默认)
MainScheduler:主线程,继承自SerialDispatchQueueScheduler
SerialDispatchQueueScheduler:封装了GCD串行队列
ConcurrentMainScheduler:封装GCD的并行队列
OperationQueueScheduler:封装NSOperationQueue

调度流程

1.observeOn 串行队列:ObserveOnSerialDispatchQueue

创建一个订阅到串行队列的订阅

 Observable.of(0,1,2) //ObservableSequence
//ObserveOnSerialDispatchQueue
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "serial"))
.subscribe { (value) in
                print("observeOn", value.element, Thread.current)
        }
.disposed(by: disposebag)

SerialDispatchQueueScheduler.init 创建了一个DispatchQueue
observeOn 创建了ObserveOnSerialDispatchQueue,保存了元序列 ObservableSequence和调度者scheduler

 if let scheduler = scheduler as? SerialDispatchQueueScheduler {
     return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
 }

  let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)

创建 AnonymousObserver类,保存元订阅闭包
ObserveOnSerialDispatchQueue继承自Producer
执行Producer的subscribe方法
首先判断CurrentThreadScheduler.isScheduleRequired == true
则把CurrentThreadScheduler.isScheduleRequired = false
并执行以下闭包

 return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer

执行子类ObserveOnSerialDispatchQueue.run

 let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)

创建ObserveOnSerialDispatchQueueSink
self.source 即ObservableSequence:Producer
调用Producer.subscribe(sink)
由于CurrentThreadScheduler.isScheduleRequired = false,执行以下代码

if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }

调用子类ObservableSequence.run

 let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)

创建 ObservableSequenceSink 执行run()

 func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }

当前调度者调度执行

  let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)

forwardOn 回到ObserveOnSerialDispatchQueueSink.onCore
手动创建的串行队列开始调度

 override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }

最终回到异步执行action(state)

func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()

        self.queue.async {
            if cancel.isDisposed {
                return
            }
            cancel.setDisposable(action(state))
        }
        return cancel
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容