RxSwift体系中,四种角色不可获取:
- 可观察序列 Observable
- 观察者 Observer
- 调度者 Scheduler
- 销毁者 Dispose
今天来说说RxSwift的一重要角色,调度者scheduler
RxSwift 针对于GCD进行了一套scheduler的封装。
- CurrentThreadScheduler:表示当前线程 Scheduler。(默认使用这个)
- 外部获取判断当前队列是否被关联:isScheduleRequired
- MainScheduler:表示主线程,如果需要执行一些UI方面的相关任务,需要切换至主线程Scheduler运行,上源码:
public final class MainScheduler : SerialDispatchQueueScheduler {
private let _mainQueue: DispatchQueue
let numberEnqueued = AtomicInt(0)
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
}
- 同是这里还有继承了SerialDispatchQueueScheduler就是串行调度者,其实我们也是可以理解的,主队列其实就是一种串行队列。
public class SerialDispatchQueueScheduler : SchedulerType {
let configuration: DispatchQueueConfiguration
init(serialQueue: DispatchQueue, leeway:) {
self.configuration = DispatchQueueConfiguration(queue: leeway:)
}
public convenience init(internalSerialQueueName: serialQueueConfiguration: leeway: ) {
let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
serialQueueConfiguration?(queue)
self.init(serialQueue: queue, leeway: leeway)
}
}
调度执行
调度器(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。
observeOn&subscribeOn
看一下这段代码:
DispatchQueue.global().async {
self.actionBtn.rx.tap
.subscribe(onNext: { () in
print("点击了按钮 --- \(Thread.current)")
})
.disposed(by: self.bag)
}
线程打印情况:
点击了按钮 --- <NSThread: 0x600000c2d980>{number = 1, name = main}
WHY????
看这里:
public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
// 调度主线程判断
MainScheduler.ensureRunningOnMainThread()
}
return ControlEvent(events: source)
}
- controlEvent,发现我们调度执行必须要在主队列!
public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
}
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable {
return self._events.subscribe(observer)
}
- 很明显我们的 ControlEvent 的序列 subscribe 是调用了一个函数就是:subscribeOn.
- 其中ConcurrentMainScheduler.instance 内部封装了 主队列!那么所有事情也就清晰了!
- 其实还有一个地方还是不明确的就是:subscribeOn 到底内部的逻辑又是什么?请看下面的分析
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
return SubscribeOn(source: self, scheduler: scheduler)
}
- 看到返回值的类型就知道,原来的序列是被subscribeOn进行处理了,封装了中间层:SubscribeOn 的序列
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run(_ observer: cancel:) -> (sink:subscription:) {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
- 看到 SubscribeOn 的继承关系(Producer),我们对他也就放心了。
- 序列订阅的时候,会创建一个observer的观察者
- 经过Producer 流回SubscribeOn的run
- 在经过 SubscribeOnSink.run 到观察者的回调(或者内部源序列的订阅,传sink作为观察者回调,后面的流程只是重复走了一次)
- 由观察者的发送响应,回到 sink 的 on
- 由 sink的属性观察者(也就是中间封装保存的)响应event事件
- 最后调用外界的subscriber的闭包
- 下面我们直接跳过上面的流程,进入关于调度相关代码
总结
整个流程是比较复杂,其实如果你这个时候,整体看源码,不难得出:
- 源序列包装
- 内部序列创建
- 调度环境&观察者传递准备
- 源序列订阅 - 根据调度环境调度 - 流程流到观察者就是我们中间内部序列的Sink
- Sink 调度执行 响应发给观察者
- 由观察者响应 订阅事件event
就是两层序列订阅响应,我的第二层的 sink 就是源序列的观察者