调度环境
CurrentThreadScheduler:当前线程的Scheduler.(默认)
MainScheduler:主线程,继承自SerialDispatchQueueScheduler
SerialDispatchQueueScheduler:封装了GCD串行队列
ConcurrentMainScheduler:封装GCD的并行队列
OperationQueueScheduler:封装NSOperationQueue
调度流程
1.observeOn 串行队列:ObserveOnSerialDispatchQueue
创建一个订阅到串行队列的订阅
Observable.of(0,1,2) //ObservableSequence
//ObserveOnSerialDispatchQueue
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "serial"))
.subscribe { (value) in
print("observeOn", value.element, Thread.current)
}
.disposed(by: disposebag)
SerialDispatchQueueScheduler.init 创建了一个DispatchQueue
observeOn 创建了ObserveOnSerialDispatchQueue,保存了元序列 ObservableSequence和调度者scheduler
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
创建 AnonymousObserver类,保存元订阅闭包
ObserveOnSerialDispatchQueue继承自Producer
执行Producer的subscribe方法
首先判断CurrentThreadScheduler.isScheduleRequired == true
则把CurrentThreadScheduler.isScheduleRequired = false
并执行以下闭包
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
执行子类ObserveOnSerialDispatchQueue.run
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
创建ObserveOnSerialDispatchQueueSink
self.source 即ObservableSequence:Producer
调用Producer.subscribe(sink)
由于CurrentThreadScheduler.isScheduleRequired = false,执行以下代码
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
调用子类ObservableSequence.run
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
创建 ObservableSequenceSink 执行run()
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
当前调度者调度执行
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
forwardOn 回到ObserveOnSerialDispatchQueueSink.onCore
手动创建的串行队列开始调度
override func onCore(_ event: Event<Element>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
最终回到异步执行action(state)
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}