上一篇文章我们提到了subscribeOn
和observeOn
两个重要函数:
- observeOn:观察者回调在某个特定的线程上
- subscribeOn: 订阅或者取消订阅在某个特定的线程上
下面我们先来探究observeOn
的具体调度流程,来探究调度者的原理。
observeOn的实现流程
我们先来看下面一个简单的测试代码:
print("当前测试方法中的线程:\(Thread.current)")
Observable.of(1,2,3,4,5)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
.subscribe{print("observeOn",$0,Thread.current)}
.disposed(by: self.bag)
/*
输出结果:
当前测试方法中的线程:<NSThread: 0x600001486800>{number = 1, name = main}
observeOn next(1) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
observeOn next(2) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
observeOn next(3) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
observeOn next(4) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
observeOn next(5) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
observeOn completed <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
*/
上面代码表示:一个of
函数创建的源序列,订阅了一个打印元素和当前线程的任务,并且要求观察者回调,也就是说打印回来,必须要在名字为observeOnSerial
的串行队列里面
代码运行流程:
1、来到Observable.of(1,2,3,4,5)
序列的创建方法:
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
- 传进来去了序列的所有元素:1,2,3,4,5
- 外面没有传调度者,默认调度者为
CurrentThreadScheduler
当前线程调度者,从上面打印的线程可知,也就是主线程 - 返回的是一个
ObservableSequence
序列
2、ObservableSequence
序列的初始化:
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
fileprivate let _elements: Sequence
fileprivate let _scheduler: ImmediateSchedulerType
init(elements: Sequence, scheduler: ImmediateSchedulerType) {
self._elements = elements // 元素 1 2 3 4 5 6 7 8 9 10
self._scheduler = scheduler // 主队列 初始化时的默认值ImmediateSchedulerType = CurrentThreadScheduler.instance
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
- 保存了外面传进来的所有元素
- 保存了外面传进来的调度者,这里是当前线程调度者,也就是主线程
- 继承自
Producer
3、序列创建完成过后,就来到最外层的observeOn
方法:
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
- 传进去了一个
SerialDispatchQueueScheduler
串行调度者
4、然后来到参数中SerialDispatchQueueScheduler
的初始化
init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
// 初始化的时候,保存了一个串行队列
self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
serialQueueConfiguration?(queue)
self.init(serialQueue: queue, leeway: leeway)
}
- 使用传进来的
internalSerialQueueName
创建了一个串行队列 - 初始化的时候,进行了配置,并保存在
DispatchQueueConfiguration
中 -
DispatchQueueConfiguration
保存了上面创建的串行队列 -
DispatchQueueConfiguration
保存了leeway
延迟执行时间
5、点击observeOn
函数进去:
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
// 判断是串行队列,还是并行队列
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
// 传进去两个参数 : 源序列和队列
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
- 判断调度者是否是串行调度者,如果是就返回的是:专门针对串行队列的,观察者回调在指定的串行队列上的,序列
ObserveOnSerialDispatchQueue
- 如果不是串行队列,就返回的是一个
ObserveOn
序列 - 我们这里第3步,传进来的是一个串行队列,所以返回的是
ObserveOnSerialDispatchQueue
6、点击ObserveOnSerialDispatchQueue
进去:
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
let scheduler: SerialDispatchQueueScheduler
let source: Observable<Element>
init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler // 保存传进来的规定串行队列
self.source = source // 保存源序列 ObservableSequence
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink) // 源序列订阅 ObserveOnSerialDispatchQueueSink
return (sink: sink, subscription: subscription)
}
}
- 继承自
Producer
- 保存了外面传进来的特定队列(这里是串行队列
SerialDispatchQueueScheduler
) - 保存了传进来的源序列(由第2步知:这里是
ObservableSequence
)
7、然后就来到了最外层的订阅:
.subscribe{print("observeOn",$0,Thread.current)}
- 注意这里的订阅,不是源序列进行订阅,而是源序列调用
observeOn
方法后返回的ObserveOnSerialDispatchQueue
序列进行订阅
8、来到了ObservableType
的subscribe
方法的实现:
public func subscribe(_ on: @escaping (Event<Element>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
- 在
AnonymousObserver
初始化的时候,保存了事件响应的闭包eventHandler
,里面有on
函数 - 来到这句关键代码:
self.asObservable().subscribe(observer)
。这里的self
就是第7步中订阅的ObserveOnSerialDispatchQueue
序列
9、由第6步知道:ObserveOnSerialDispatchQueue
继承自Producer
,所以来到Producer
的订阅方法:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
// 当前线程被调度过,也就是被赋过值,isScheduleRequired属性里面的key不为nil
// 所以就在当前调度环境,也就是当前线程中执行括号中的代码
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
// 没有被调度过,就初始化当前线程,并在这里面调度
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
- 这里会先判断
CurrentThreadScheduler
是否被调度过 - 由上一篇文章知道,也就是判断isScheduleRequired属性里面的
key
是否为nil
,如果为nil
,就是没有被调度过 - 如果
CurrentThreadScheduler
没有被调度过,就初始化它,并让它进行调度schedule
10、CurrentThreadScheduler
的schedule
方法
/**
Schedules an action to be executed as soon as possible on current thread.
If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.
*/
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
// 没有被调度过,也就是没有被赋值过
if CurrentThreadScheduler.isScheduleRequired {
// 这次被调度过了,所以赋值为false,在Producer中就不需要走CurrentThreadScheduler的初始化了
CurrentThreadScheduler.isScheduleRequired = false
// 走前面传进来的闭包
let disposable = action(state)
defer {
// 延迟执行 也就是说在 return scheduledItem 返回事物过后,即所有工作都完成过后,才会走这里,清空当前调度环境,因为下次任务的调度环境可能会不同
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
// 事物排队出队列
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke() // 销毁
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue { // 判断 CurrentThreadScheduler 有队列,就赋值给queue
queue = existingQueue
}
else {
// 要是CurrentThreadScheduler 没有队列,就将赋值
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem) // 事物排队进入队列
return scheduledItem
}
}
- 先翻译一下上面的两句注释:调度
action
闭包尽快在当前线程上执行 - 如果这个方法在没有初始化
CurrentThreadScheduler
的线程上调用,CurrentThreadScheduler
会自动初始化,并在所有工作完成过后清空当前线程调度者 -
CurrentThreadScheduler.isScheduleRequired = false
,做一个记号,下次就不走CurrentThreadScheduler
的schedule
方法了 -
action(state)
执行外界闭包,即第9步中传过来的闭包 -
defer
延迟执行,当所有任务都执行完成过后,清空当前线程调度环境,因为下次任务的调度环境可能会不同
11、执行action(state)
闭包
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
- 即这个括号中的内容
- 来到
Producer
的run
方法,也就来到子类ObserveOnSerialDispatchQueue
的run
方法
12、ObserveOnSerialDispatchQueue
的run
方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink) // 源序列订阅 ObserveOnSerialDispatchQueueSink
return (sink: sink, subscription: subscription)
}
-
self.source.subscribe(sink)
关键代码,源序列订阅ObserveOnSerialDispatchQueueSink
13、由第2步知道,源序列ObservableSequence
也继承自Producer
,所以又来到Producer
的订阅方法
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
- 因为
CurrentThreadScheduler
已经初始化,并在第10步中当前线程已经调度过了,所以这里走这个括号中的代码。因为CurrentThreadScheduler
只需要被初始化一次。 - 来到
Producer
的run
方法,也就来到子类ObservableSequence
的run
方法
14、ObservableSequence
的run
方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
- 来到
ObservableSequenceSink
的run
方法
15、ObservableSequenceSink
的run
方法
func run() -> Disposable {
// scheduleRecursive 调度递归 self._parent._elements.makeIterator() 用iterator来装这些元素
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator // 迭代器,里面包含了源序列的所以元素
if let next = mutableIterator.next() { // 迭代器中还有下一个元素就递归
// 源序列发送响应 源序列发送出来的时候,都是为主队列
// 源序列发送响应 最后肯定去到ObserveOnSerialDispatchQueueSink的on方法
self.forwardOn(.next(next))
recurse(mutableIterator) // 将迭代器传进去,再重新走这个闭包
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
-
self._parent._scheduler
这里的调度者是源序列保存的调度者,即当前线程调度者(主线程) -
self._parent._scheduler.scheduleRecursive
当前线程调度者递归调度执行后面的闭包 - 闭包里面将源序列的所有元素生成迭代器,递归发送元素,一直到所有元素发送完
- 发送元素过后,肯定去到源序列订阅的
ObserveOnSerialDispatchQueueSink
的on
方法中
16、先进入scheduleRecursive
方法
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state) // 里面肯定有闭包action的执行,然后action闭包中源序列才能发送事件
return Disposables.create(with: recursiveScheduler.dispose)
}
- 将外面传进来的
action
和源序列的调度者封装成RecursiveImmediateScheduler
递归调度者 -
RecursiveImmediateScheduler
调度执行state
,state
里面包含有源序列所有元素和当前递归执行的位置
17、RecursiveImmediateScheduler
的schedule
if let action = action {
action(state, self.schedule) // 执行外界递归调度那里的闭包
}
- 里面有这么一句关键代码,所有就回到了第15步闭包中的发送元素
18、来到ObserveOnSerialDispatchQueueSink
的on
方法,也就来到onCore
方法
override func onCore(_ event: Event<Element>) {
// self.scheduler 规定的串行队列 用这个队列去调度这个事件,就保证了observeOn的观察者回调是在特定线程
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
- 这里的
self.scheduler
是第3步创建的串行队列 - 用
self.scheduler
去调度这个事件,就保证了observeOn的观察者回调是在特定线程,也就是源序列的事件回调是在特定的线程 -
self.cachedScheduleLambda
作为一个逃逸闭包参数传给action
- 因为
self.scheduler
是一个串行队列,所以接下来进入的是SerialDispatchQueueScheduler
的schedule
方法
19、SerialDispatchQueueScheduler
的schedule
方法
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
-
schedule
方法实际上是执行的第4步中SerialDispatchQueueScheduler
初始化时保存的self.configuration
的schedule
方法 - 将
state
和action
作为参数传给self.configuration
的schedule
方法 - 所以接下来,来到
self.configuration
的schedule
方法
20、self.configuration
的schedule
方法
extension DispatchQueueConfiguration {
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
// 传进来队列(可以是串行队列,也可以是并行队列)的异步执行
// 将传进来的闭包即action异步执行
// 就是说在特定的队列中执行闭包操作,也就是:observeOn的观察者回调,以及subscribeOn的订阅
self.queue.async {
if cancel.isDisposed {
return
}
// action(state) 调用外面传进来的闭包
// action 是ObserveOnSerialDispatchQueueSink中的cachedScheduleLambda
cancel.setDisposable(action(state))
}
return cancel
}
}
- 这是整个流程中可以说是最核心的一段代码
-
self.queue.async {}
在传进来的队列中异步执行外面传进来的闭包action
- 传进来的队列,可以是串行队列,也可以是并发队列。达到观察者回调或者订阅在特定线程,都是这里进行具体实现的,用特定线程异步执行任务
- 所以接下来来到
action
闭包,也就是第18步的闭包cachedScheduleLambda
中
21、action
闭包执行,也就是cachedScheduleLambda
闭包执行
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
pair.sink.observer.on(pair.event) // 调用ObserveOnSerialDispatchQueueSink中的observer.on方法,即最外层的打印
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
- 核心方法
pair.sink.observer.on(pair.event)
- 调用ObserveOnSerialDispatchQueueSink中的observer.on方法,即最外层的打印
- 后面就是一步步逐层回收方法,一直回收到源序列发送事件那里
22、方法回收到源序列发送事件
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator // 迭代器,里面包含了源序列的所以元素
if let next = mutableIterator.next() { // 迭代器中还有下一个元素就递归
// 源序列发送响应 源序列发送出来的时候,都是为主队列
// 源序列发送响应 最后肯定去到ObserveOnSerialDispatchQueueSink的on方法
self.forwardOn(.next(next))
recurse(mutableIterator) // 将迭代器传进去,再重新走这个闭包
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
- 回收到
self.forwardOn(.next(next))
的时候,next(1) <NSThread: 0x6000024fa640>{number = 3, name = (null)}
才打印出来 -
recurse(mutableIterator)
然后执行这句,递归执行,下次源序列发送元素2 - 最后打印
next(2) <NSThread: 0x6000024fa640>{number = 3, name = (null)}
- 元素发送完了,递归结束过后,源序列就发送一个
completed
事件。
至此流程分析结束
observeOn和subscribeOn的区别
上面我们知道observeOn
有两种情况:
传进来的是串行队列:
- 包装了一个序列
ObserveOnSerialDispatchQueue
,并且在ObserveOnSerialDispatchQueue
的run
中,self.source.subscribe(sink)
源序列订阅ObserveOnSerialDispatchQueueSink
。 - 在
ObserveOnSerialDispatchQueueSink
的onCore
方法中用特定队列去调度源序列返回的事件。从而就保证了观察者回调在特定线程。self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
传进来的是并发队列:
- 包装了一个序列
ObserveOn
,并且在ObserveOn
的run
中,self.source.subscribe(sink)
源序列订阅ObserveOnSink
。 - 在
ObserveOnSink
的onCore
方法中用特定队列去递归调度源序列返回的事件。从而就保证了观察者回调在特定线程:self._scheduler.scheduleRecursive((), action: self.run)
闭包self.run
中有观察者的on
方法:
observer.on(nextEvent)`
subscribeOn
subscribeOn
的流程和observeOn
基本上一样,只是它并没有区分传进来的队列是串行的还是并发的,都统一只是创建了一个中间序列subscribeOn
。
然后来到关键地方:subscribeOn
并没有在中间序列subscribeOn
的run
放中进行源序列订阅SubscribeOnSink
,而是在SubscribeOnSink
的run
方法中订阅,代码如下:
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
// 在这个闭包中进行 源序列订阅sink
// SubscribeOn这样就保证了订阅在特定线程
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
- 在
self.parent.scheduler.schedule
调度的闭包中执行,也就是是说在:
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
特定队列下异步执行的闭包中执行:
self.parent.source.subscribe(self)
源序列订阅SubscribeOnSink
操作,就保证了订阅在特定线程。
总结:
调度者Scheduler
就是对线程进行的一次封装。
subscribeOn
,observeOn
和前面的map
函数类似,不是源序列直接订阅,而是源序列先订阅的中间层sink
,在这层sink
里面进行了在特定的队列中,用源序列响应给sink
的元素来执行事件的响应event
。
即订阅里面又套了一个订阅,在套的那个订阅里面进行特定队列的处理