前言
Schedulers是Rx实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。
在这里,我们就来探索下RxSwift
中。Schedulers
是什么?是如何实现的呢?又是如何实现Schedulers
的切换的呢?
Schedulers是什么
根据文档:
Schedulers
分为以下几种:
- MainScheduler: 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。
- SerialDispatchQueueScheduler: 抽象了串行 DispatchQueue。如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。
- ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。
- OperationQueueScheduler :抽象了 NSOperationQueue。它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。
- ConcurrentMainScheduler:看源码时发现的。代表主线程,他对
subscribeOn
操作符做了优化,因此。在主线程subscribeOn
应该使用ConcurrentMainScheduler
接下来我们就深入到源码当中,看这些Scheduler
到底是如何实现的。
Schedulers的实现
MainScheduler
看定义,MainScheduler
是SerialDispatchQueueScheduler
的子类。
源代码如下:
public final class MainScheduler : SerialDispatchQueueScheduler {
private let _mainQueue: DispatchQueue
/// 计数(相当于引用计数)
let numberEnqueued = AtomicInt(0)
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
public static let asyncInstance = SerialDispatchQueueScheduler(serialQueue: DispatchQueue.main)
public class func ensureExecutingOnScheduler(errorMessage: String? = nil) {
if !DispatchQueue.isMain {
rxFatalError(errorMessage ?? "Executing on background thread. Please use `MainScheduler.instance.schedule` to schedule work on main thread.")
}
}
public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
#if !os(Linux) // isMainThread is not implemented in Linux Foundation
guard Thread.isMainThread else {
rxFatalError(errorMessage ?? "Running on background thread.")
}
#endif
}
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let previousNumberEnqueued = increment(self.numberEnqueued)
if DispatchQueue.isMain && previousNumberEnqueued == 0 {
let disposable = action(state)
decrement(self.numberEnqueued)
return disposable
}
let cancel = SingleAssignmentDisposable()
self._mainQueue.async {
if !cancel.isDisposed {
_ = action(state)
}
decrement(self.numberEnqueued)
}
return cancel
}
}
从源代码很容易看出,MainScheduler
封装了DispatchQueue.main
。并提供了一系列访问方法,包括init
、instance
、asyncInstance
。并实现了自己的调度方法scheduleInternal
。
scheduleInternal
的主要逻辑为:
- 1、 将事件的引用计数加一,当此时的引用计数为0时,执行一次事件回调。事件回调完成后引用计数减一。
- 2、当引用计数不为0时,也就是说当前主队列中还有Event事件没有执行完,继续在主线程中执行未完成的Event事件,完成一件后,相应的引用计数减一
ConcurrentDispatchQueueScheduler
源码如下:
public class ConcurrentDispatchQueueScheduler: SchedulerType {
public typealias TimeInterval = Foundation.TimeInterval
public typealias Time = Date
public var now : Date {
return Date()
}
let configuration: DispatchQueueConfiguration
public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
}
public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue(
label: "rxswift.queue.\(qos)",
qos: qos,
attributes: [DispatchQueue.Attributes.concurrent],
target: nil),
leeway: leeway
)
}
/// 正常调度
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
/// 延迟调度,dueTime为延迟时间
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
/// 延迟并重复调度
/// startAfter: 延迟时长。
/// period: 重复时间间隔。
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
}
说明:
- 1、提供了一些初始化接口,主要是初始化
configuration
。 - 2、提供了调度接口:
schedule
正常调度、scheduleRelative
延迟调度、schedulePeriodic
延迟重复调度。这些调度接口都是通过configuration
去实现的。
DispatchQueueConfiguration
的源码这里就不贴了,源码主要是通过DispatchSource
实现了上面提到的几种调度。
SerialDispatchQueueScheduler
SerialDispatchQueueScheduler
源码如下:
public class SerialDispatchQueueScheduler : SchedulerType {
public typealias TimeInterval = Foundation.TimeInterval
public typealias Time = Date
/// - returns: Current time.
public var now : Date {
return Date()
}
let configuration: DispatchQueueConfiguration
init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
serialQueueConfiguration?(queue)
self.init(serialQueue: queue, leeway: leeway)
}
public convenience init(queue: DispatchQueue, internalSerialQueueName: String, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
// Swift 3.0 IUO
let serialQueue = DispatchQueue(label: internalSerialQueueName,
attributes: [],
target: queue)
self.init(serialQueue: serialQueue, leeway: leeway)
}
@available(iOS 8, OSX 10.10, *)
public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue.global(qos: qos.qosClass), internalSerialQueueName: internalSerialQueueName, leeway: leeway)
}
/// 调度方法,在当前线程中实现action回调
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
/// 调度方法,在当前线程中实现action回调,可供子类重写
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
/// 延迟调度,dueTime为延迟时间
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
/// 延迟并重复调度
/// startAfter: 延迟时长。
/// period: 重复时间间隔。
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
}
由源码可以看出SerialDispatchQueueScheduler
的实现和ConcurrentDispatchQueueScheduler
大体相同。只不过一个抽象了串行队列,一个抽象了并发队列。
OperationQueueScheduler
OperationQueueScheduler
抽象了OperationQueue
。因此具有OperationQueue
的一些特性,比如设置最大并发数等。
源码如下:
public class OperationQueueScheduler: ImmediateSchedulerType {
public let operationQueue: OperationQueue
public let queuePriority: Operation.QueuePriority
public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
self.operationQueue = operationQueue
self.queuePriority = queuePriority
}
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
let operation = BlockOperation {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
operation.queuePriority = self.queuePriority
self.operationQueue.addOperation(operation)
return cancel
}
}
ConcurrentMainScheduler(看源码时发现的)
/**
Abstracts work that needs to be performed on `MainThread`. In case `schedule` methods are called from main thread, it will perform action immediately without scheduling.
This scheduler is optimized for `subscribeOn` operator. If you want to observe observable sequence elements on main thread using `observeOn` operator,
`MainScheduler` is more suitable for that purpose.
*/
public final class ConcurrentMainScheduler : SchedulerType {
public typealias TimeInterval = Foundation.TimeInterval
public typealias Time = Date
private let _mainScheduler: MainScheduler
private let _mainQueue: DispatchQueue
/// - returns: Current time.
public var now: Date {
return self._mainScheduler.now as Date
}
private init(mainScheduler: MainScheduler) {
self._mainQueue = DispatchQueue.main
self._mainScheduler = mainScheduler
}
/// Singleton instance of `ConcurrentMainScheduler`
public static let instance = ConcurrentMainScheduler(mainScheduler: MainScheduler.instance)
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if DispatchQueue.isMain {
return action(state)
}
let cancel = SingleAssignmentDisposable()
self._mainQueue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
return self._mainScheduler.scheduleRelative(state, dueTime: dueTime, action: action)
}
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
return self._mainScheduler.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
}
源码的大概实现和其他Scheduler
差不多。
这里我们要注意这点
ConcurrentMainScheduler
抽象了MainScheduler
并针对subscribeOn
操作符做了相应的优化。而MainScheduler
针对observeOn
作了优化。具体请看schedule
方法的分别实现。
因此:
- 1、在主线程
subscribeOn
应该使用ConcurrentMainScheduler
- 2、在主线程
observeOn
应该使用MainScheduler
如何切换Scheduler
subscribeOn & observeOn
observeOn
和subscribeOn
属于Rx
中调度器调度的操作符。
如上图:
图上有两个Schedulers
.
subscribeOn
用来决定数据序列的构建函数在哪个Scheduler
上运行.
observeOn
用来决定在哪个 Scheduler
监听这个数据序列。
实现逻辑
为了说明捋清楚这个逻辑,我们先看一个简单的demo:
demo的大概逻辑:在子线程创建序列,在主线程订阅(监听)这个序列
/// 创建一个String类型的序列。
let observable = Observable<String>.create { (observe) -> Disposable in
/// 发送一个事件
observe.onNext("1")
/// 发送一个事件
observe.onNext("2")
/// 发送完成事件
observe.onCompleted()
return Disposables.create()
}
/// 在子线程构建该序列,在主线程订阅。
observable
.subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: DispatchQueue.global()))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { (str) in
/// 监听到next事件
print(str)
}, onError: { ( error) in
/// 监听到error事件
print(error)
}, onCompleted: {
/// 监听到完成事件
print("订阅完成")
}).disposed(by: disposeBag)
subscribeOn
结合一开始的例子,当我们调用subscribeOn
,RxSwift会如何处理呢?
源码如下:
extension ObservableType {
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
return SubscribeOn(source: self, scheduler: scheduler)
}
}
由此可以看出,调用subscribeOn
方法。初始化了SubscribeOn
的实例,并保存了源序列和调度器scheduler
。
SubscribeOn
的实现如下
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
SubscribeOn
是继承Producer
,所以也是一个可监听序列。
observeOn
extension ObservableType {
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
}
说明:
- 1、如果当前的
scheduler
是SerialDispatchQueueScheduler
,则初始化一个ObserveOnSerialDispatchQueue
并返回。 - 2、否则初始化一个
ObserveOn
,并返回。
结合一开始给的例子observeOn(MainScheduler.instance)
:所以这里初始化了一个SerialDispatchQueueScheduler
并保存了源序列和调度器。
SerialDispatchQueueScheduler
的实现如下:
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
let scheduler: SerialDispatchQueueScheduler
let source: Observable<Element>
init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
_ = increment(_numberOfSerialDispatchQueueObservables)
#endif
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
_ = decrement(_numberOfSerialDispatchQueueObservables)
}
#endif
}
同样ObserveOnSerialDispatchQueue
也继承Producer
,所以也是可以可监听序列。
到这里我们可以发现subscribeOn
、observeOn
分别创建了一个可监听序列。并保存的源序列和调度器。
添加scheduler后,序列的产生和订阅流程
- 1、调用
subscribe
之后,回来到Producer
的subscribe
方法。其中有一个判断if !CurrentThreadScheduler.isScheduleRequired
为false。 - 2、那么就走
CurrentThreadScheduler.instance.schedule
方法。派发action
。
CurrentThreadScheduler
源码如下:
public class CurrentThreadScheduler : ImmediateSchedulerType {
/// 调度队列,队列里放的是ScheduledItemType元素
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/// CurrentThreadScheduler的单例
public static let instance = CurrentThreadScheduler()
/// 在保存线程特有数据(TSD)之前,需要获取线程特有数据的 key。
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer { key.deallocate() }
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
/// 开辟一块内存空间,并返回指向该内存的指针,这里主要用于判断CurrentThreadScheduler是否被调度过。
private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
}()
/// 调度队列
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// Gets a value that indicates whether the caller must call a `schedule` method.
/// 判断CurrentThreadScheduler是否需要被调度,默认true
public static fileprivate(set) var isScheduleRequired: Bool {
get {
// 获取线程特有信息
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
// 设置线程特有信息
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
/// CurrentThreadScheduler的调度
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
// 本次调用 schedule 是否需要派发 action
// 也就是当前线程之前有没有调用过 schedule,或者没有执行完。
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
/// CurrentThreadScheduler调度完成后,将isScheduleRequired重置为true
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
/// 查看和当前线程关联的队列 queue 中是否有未派发的 action,如果有则执行
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
/// 派发action,并释放
latest.invoke()
}
return disposable
}
/// 如果当前线程有被调用过(还有任务在执行),
/// 先将 action 先保存到和当前线程关联的队列 queue 中
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
/// 初始化调度对象ScheduledItem,并将action(调度的具体事件)和state保存到ScheduledItem实例中。
let scheduledItem = ScheduledItem(action: action, state: state)
/// 将ScheduledItem添加到队列中
queue.value.enqueue(scheduledItem)
/// 返回一个调度对象
return scheduledItem
}
}
- 派发
action
,就会来到ObserveOnSerialDispatchQueue
的run
方法。run
方法里初始化了ObserveOnSerialDispatchQueueSink
,并将当前的scheduler
和observer
以及cancel
保存了起来。 - 调用源序列(SubscribeOn)的
subscribe
并将ObserveOnSerialDispatchQueueSink
实例当参数传递过去。 - 接下来走到了
SubscribeOn
的run
方法。run
方法里初始化了SubscribeOnSink
,SubscribeOnSink
持有了SubscribeOn
实例、订阅者observer
(这里的订阅者是ObserveOnSerialDispatchQueueSink
实例)以及销毁者Cancelable
(Cancelable
暂不做分析)。 - 接着调用了
SubscribeOnSink
的run
方法。
SubscribeOnSink
的实现如下:
final private class SubscribeOnSink<Ob: ObservableType, Observer: ObserverType>: Sink<Observer>, ObserverType where Ob.Element == Observer.Element {
typealias Element = Observer.Element
typealias Parent = SubscribeOn<Ob>
let parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
self.forwardOn(event)
if event.isStopEvent {
self.dispose()
}
}
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
}
对照源码,可以看出SubscribeOnSink
的run
主要逻辑如下:
- 1、在
subscribeOn
时设置好的scheduler
(ConcurrentDispatchQueueScheduler)下进行schedule
(调度),接着来到DispatchQueueConfiguration
的schedule
方法。该方法实现如下:
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
可以看出,action
会在之前设置好的queue(也就是ConcurrentDispatchQueueScheduler)内去执行。action
也就是我们源序列的订阅流程,这就充分说明了。RxSwift是如何在指定scheduler
下执行生成序列的函数。
接下来是源序列的订阅流程。
- 1、源序列(AnonymousObservable)调用
subscribe
并将SubscribeOnSink
作为订阅者传递过去。 - 2、接下来会来到
AnonymousObservable
(源序列)的run
方法。接下来的调用流程和RxSwift核心逻辑(一)-序列的产生以及订阅的流程很像。只不过这里中间多了几个订阅者。简单说明一下。 - 3、
AnonymousObservableSink
的run
方法,进行Event事件发送
AnonymousObservableSink
的on
-> SubscribeOnSink
的on
-> ObserveOnSerialDispatchQueueSink
的onCore
ObserveOnSerialDispatchQueueSink
的onCore
实现如下
override func onCore(_ event: Event<Element>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
也就是调用当前的scheduler
(observeOn是设置的,根据demo是MainScheduler)的schedule
方法。
接下来的调用MainScheduler
的scheduleInternal
MainScheduler
的scheduleInternal
的实现如下
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let previousNumberEnqueued = increment(self.numberEnqueued)
if DispatchQueue.isMain && previousNumberEnqueued == 0 {
let disposable = action(state)
decrement(self.numberEnqueued)
return disposable
}
let cancel = SingleAssignmentDisposable()
self._mainQueue.async {
if !cancel.isDisposed {
_ = action(state)
}
decrement(self.numberEnqueued)
}
return cancel
}
可以看到是在主线程调用action
。action
也就是ObserveOnSerialDispatchQueueSink
的cachedScheduleLambda
闭包。
接下来回来到ObserveOnSerialDispatchQueueSink
的observer
的on
方法。而ObserveOnSerialDispatchQueueSink
的observer
就是AnonymousObserver
。也就是说接下来调用AnonymousObserver
的onCore
方法去执行事件的回调。
至此,添加Schedulers
后,序列的产生和订阅流程已基本捋清。
文字描述可能不是太直观,后续会补一个流程图。
如有误,欢迎指正。