前言
调度者(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。本文将探索Schedulers是如何控制线程或队列的过程的实现原理,篇幅较长,感兴趣的同学可以一步步跟着探索下去。
提出疑问
如下代码,在异步队列子线程执行rx的点击监听,然后打印当前线程,竟然打印出来的是点击了按钮 --- <NSThread: 0x600000c2d980>{number = 1, name = main}
当前线程是主线程,这是怎么做到的呢?
DispatchQueue.global().async {
self.actionBtn.rx.tap
.subscribe(onNext: { () in
print("点击了按钮 --- \(Thread.current)")
})
.disposed(by: self.bag)
}
开始探索
一样的按住cmd点击进入tap里,可以看到返回的是controlEvent(.touchUpInside)
,再进去看controlEvent
,
public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
MainScheduler.ensureRunningOnMainThread()
//省略
}
public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
#if !os(Linux) // isMainThread is not implemented in Linux Foundation
guard Thread.isMainThread else {
rxFatalError(errorMessage ?? "Running on background thread.")
}
#endif
}
可以发现这里有判断必须要在主线程执行,但是还没看到调度的代码,然后发现上面的第一个方法里返回的是一个ControlEvent
,进入ControlEvent
看一下
public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
}
可以看到subscribeOn
订阅在ConcurrentMainScheduler.instance
主线程上,这时可以清楚的明白是切换到了主线程,不过还是需要明白subscribeOn
到底干了些什么呢?
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
到这里很明确了,它是继承ObservableType
和Producer
的,在前几篇文章分析过的RxSwift的核心逻辑,其实这里切换了一下,整个的过程是:
订阅序列->创建observer
->Producer
里的SubscribeOn
的run
->SubscribeOnSink.run
的on
->调用observer.on
执行sink
的on
->sink
里保存的eventHandler
响应event
事件->最后调用外界的subscriber的闭包
对于上面的流程不了解的可以看之前的文章,可以断点每个步骤,了解核心流程。
调度者Schedulers核心源码
写如下代码,使用observeOn(SerialDispatchQueueScheduler.init(qos: .background))
,这里一般是执行串行队列的需要使用到的,里面是封装了GCD的队列 ,我们点击observeOn
进去看看里面的代码
Observable.of(1,2,3,4,5)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
.subscribe{print("observeOn",$0,Thread.current)}
.disposed(by: self.bag)
- 很明显,我们在外面调用的是
SerialDispatchQueueScheduler
,所以这里会返回一个ObserveOnSerialDispatchQueue
的新序列,当然是要点进去看看
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
- 这里把外界传递过来的
SerialDispatchQueueScheduler
和source
源序列都保存起来了,同时还继承了Producer
,实现了run
方法
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
let scheduler: SerialDispatchQueueScheduler
let source: Observable<Element>
init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
_ = increment(_numberOfSerialDispatchQueueObservables)
#endif
}
//省略
}
-
外界调用
subscribe
时,就会走这里的run
方法(我这里省略了走到这里的原因,因为在之前探索核心逻辑已经探索的很清楚了)
我们看到,self.source.subscribe(sink)
是订阅源序列,然后传递一个ObserveOnSerialDispatchQueueSink
的管子进去override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel) let subscription = self.source.subscribe(sink) return (sink: sink, subscription: subscription) }
-
cmd
然后点击self.source.subscribe
,弹出一堆可进入的方法,这里肯定是会执行Producer
的subscribe
,所以选择它进入,可以在这处打个断点确认一下。
- 看到这里
CurrentThreadScheduler.isScheduleRequired
,根据这个标识符CurrentThreadScheduler.isScheduleRequired
去找执行的方法
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired { // 后续走这里
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else { // 第一次走这里
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
- 我们点进
CurrentThreadScheduler.instance.schedule
,可以看到其实这里只是多了一个标记isScheduleRequired
为false
的步骤
public func schedule<StateType>(_ state: action: ) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
// 已经标记,就置false
CurrentThreadScheduler.isScheduleRequired = false
// 外界闭包调用执行
let disposable = action(state)
// 延迟销毁
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
// 先省略。。。
return disposable
}
// 先省略。。。
return scheduledItem
}
- 再看到
self.run(observer, cancel: disposer)
,点击进去,选择ObservableSequence
,又会回到第三步的run
方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
点击进入到 ObservableSequenceSink->run
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
继续点击进入ImmediateSchedulerType里的scheduleRecursive
,在这里可以看到,创建了RecursiveImmediateScheduler
,来保存外界传递过来的闭包,而且调用了recursiveScheduler.schedule(state)
并执行闭包
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
- 点击进入
recursiveScheduler.schedule(state)
,在这个递归调度者里
- 这里因为在递归环境,加了一把锁递归锁,保障安全
- 通过保护,获取
action
执行,也就是外界传给递归调度者的闭包任务 - 因为加了一把递归锁,所以保证了RxSwift的调度是有顺序的
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = self._scheduler.schedule(state) { state -> Disposable in
// 这里因为在递归环境,加了一把锁递归锁,保障安全
let action = self._lock.calculateLocked { () -> Action? in
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
// 篇幅,先省略,大家自行查阅
}
- 点击进入
self._scheduler.schedule(state)
,之前我们在外界使用的是SerialDispatchQueueScheduler
,所以进入它里面的schedule
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
self.scheduleInternal(state, action: action)
-> scheduleInternal
->self.configuration.schedule
,可以看到如下代码,这里异步执行了cancel.setDisposable(action(state))
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
其实这里执行的就是第8步这里的RecursiveImmediateScheduler
里的
if let action = action {
action(state, self.schedule)
}
- 再回到第7步
ObservableSequenceSink
里的run
,上面的action调用尾随闭包,即调用了self.forwardOn
,点进去发现其实调用的就是self._observer.on
. 这里在之前的核心逻辑分析过
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
-
然后会调用到第三步里创建的
ObserveOnSerialDispatchQueueSink
的onCore
override func onCore(_ event: Event<Element>) { _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!) }
看到这里发现很熟悉,继续点进去
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable { return self.scheduleInternal(state, action: action) } func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable { return self.configuration.schedule(state, action: action) }
最终又来到了这里,在当前队列异步执行任务,调用
action(state)
,
所以,我们在最外界调用on(Event)
响应的时候,就会在外界指定的schedule
里执行闭包了func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable { let cancel = SingleAssignmentDisposable() self.queue.async { if cancel.isDisposed { return } cancel.setDisposable(action(state)) } return cancel }
总结
其实就是创建了一个中间序列
ObserveOnSerialDispatchQueueSink
,继承自Producer
,实现了run
方法,保存了源序列传递进来的observer
和schedule
。在外界订阅的时候会执行一套producer
的run,还会执行sink.run
,然后由传递进来的observer
响应on(event)
所以其实就是两层序列订阅响应,第二层
sink
就相当于源序列的observer
观察者,其实和RxSwift核心逻辑那里非常相似,只是多了个中间的序列来处理线程的调度。掌握了我之前写过的核心逻辑,其实后面都是类似的,乍看有难度,上手探索之后发现很好玩,很简单,很有意思!