1. ReactiveCompatible
比如当我们输入label.rx时,实际上是因为NSObject遵从了ReactiveCompatible协议添加了命名空间:
extension NSObject: ReactiveCompatible {}
public protocol ReactiveCompatible {
associatedtype CompatibleType
/// 元类型,此处为Reactive泛型并且用关联类型进行约束
static var rx: Reactive<CompatibleType>.Type { get set }
var rx: Reactive<CompatibleType> { get set }
}
extension ReactiveCompatible {
//接口中使用的类型就是实现这个接口本身的类型的话,需要使用 Self 进行指代
public static var rx: Reactive<Self>.Type {
get {
return Reactive<Self>.self
}
set {
// this enables using Reactive to "mutate" base type
}
}
/// Reactive extensions.
public var rx: Reactive<Self> {
get {
return Reactive(self)
}
set {
// this enables using Reactive to "mutate" base object
}
}
}
//上述调用即label会传入base
public struct Reactive<Base> {
public let base: Base
public init(_ base: Base) {
self.base = base
}
}
2. ObservableConvertibleType
定义了一个ObservableConvertibleType协议,表示可以转换为可观察序列类型。其中E表示序列元素的别名,asObservable方法是将self转换为Observable 序列。
public protocol ObservableConvertibleType {
//声明关联类型
associatedtype E
func asObservable() -> Observable<E>
}
说明:在定义协议时,可以用associatedtype声明一个或多个类型作为协议定义的一部分,叫关联类型。关联类型为协议中的某个类型提供了一个占位名(或者说别名),其代表的实际类型在协议被采纳时才会被指定。
例如:
protocol TableViewCell {
associatedtype T
func updateCell(_ data: T)
}
class MyTableViewCell: UITableViewCell, TableViewCell {
typealias T = Model
func updateCell(_ data: Model) {
// do something ...
}
}
3. Observable
Observable为遵从ObservableType协议的类,它表示一个push样式序列,可以让订阅“观察者”接收此序列的事件。而在ObservableType协议中,默认实现了asObservable()方法,以及定义了subscribe方法。该方法中参数需要是遵从ObserverType协议类型。
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
// MARK: 该subscribe方法的形参observer必须遵从ObserverType协议,返回类型是满足Disposable协议类型,并且需要满足参数.E 类型和当前Observable类中的E泛型为同一类型
//Disposable也是声明的一个协议,用来表示资源释放
public protocol Disposable {
func dispose()
}
4. ObserverType
ObserverType协议的作用是支持可观察序列上的推式迭代。我理解的意思是可以按顺序观察。
public protocol ObserverType {
/// 观察者可以观察到的按顺序排列的元素的类型。
associatedtype E
// 将序列事件通知给观察者,Event是枚举类型
func on(_ event: Event<E>)
}
其中定义了观察元素以及观察方法,并且默认实现了三种事件方法:onNext,onCompleted,onError。
5. Event
Event表示序列事件,是一个泛型枚举。并且遵从了系统的CustomDebugStringConvertible协议,重写了debugDescription属性让其可以在debug模式下打印。
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
其在extension中还新增了很多属性方便获取状态和值,比如:
//是否是`completed` 或 `error`事件
public var isStopEvent: Bool {
switch self {
case .next: return false
case .error, .completed: return true
}
}
/// 返回`next`事件的元素
public var element: Element? {
if case .next(let value) = self {
return value
}
return nil
}
6. ObservableType
ObservableType是继承自ObservableConvertibleType协议,它在extension中实现了从可观察序列订阅事件处理方法:
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
其在另一个subscribe方法中有使用enum Hooks的customCaptureSubscriptionCallstack扩展方法来获取自定义callstack信息。
extension Hooks {
public typealias CustomCaptureSubscriptionCallstack = () -> [String]
fileprivate static let _lock = RecursiveLock()
//此处重写了属性的set get方法用到 NSRecursiveLock 递归锁🔐 搭配defer保证线程安全
public static var customCaptureSubscriptionCallstack: CustomCaptureSubscriptionCallstack {
get {
_lock.lock(); defer { _lock.unlock() }
return _customCaptureSubscriptionCallstack
}
set {
_lock.lock(); defer { _lock.unlock() }
_customCaptureSubscriptionCallstack = newValue
}
}
}
在Rx中,打开TRACE_RESOURCES调试模式,可以使用public struct Resources查看内部Rx资源分配(可观察对象、观察者、可处理对象等),他提供了一种在开发过程中检测泄漏的简单方法。
/// 继承自NSLock保证原子性
final class AtomicInt: NSLock {
fileprivate var value: Int32
public init(_ value: Int32 = 0) {
self.value = value
}
}
/// 忽略返回值警告
@discardableResult
@inline(__always)
func add(_ this: AtomicInt, _ value: Int32) -> Int32 {
this.lock()
let oldValue = this.value
this.value += value
this.unlock()
return oldValue
}
/// 函数内联是一种编译器优化技术,它通过使用方法的内容替换直接调用该方法,就相当于假装该方法并不存在一样,这种做法在很大程度上优化了性能
@inline(__always)
func load(_ this: AtomicInt) -> Int32 {
this.lock()
let oldValue = this.value
this.unlock()
return oldValue
}
7. SchedulerType
表示调度工作单元的对象,继承自ImmediateSchedulerType,内部包含立即执行的调度和周期调用的调度。
///使用递归调度模拟周期性任务
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
let schedule = SchedulePeriodicRecursive(scheduler: self, startAfter: startAfter, period: period, action: action, state: state)
return schedule.start()
}
///SchedulePeriodicRecursive内部会调用到如下方法
func scheduleRecursive<State>(_ state: State, dueTime: RxTimeInterval, action: @escaping (State, AnyRecursiveScheduler<State>) -> Void) -> Disposable {
let scheduler = AnyRecursiveScheduler(scheduler: self, action: action)
scheduler.schedule(state, dueTime: dueTime)
return Disposables.create(with: scheduler.dispose)
}
8. DisposeBag
线程安全袋:在deinit方法中将添加的disposes释放。其内部持有一个Disposable数组,当调用disposed(by bag: DisposeBag)添加到一个bag中,实际上是insert( disposable: Disposable)到该数组中,对应的在deinit方法中会remove元素。
public final class DisposeBag: DisposeBase {
private var _lock = SpinLock()
fileprivate var _disposables = [Disposable]()
fileprivate var _isDisposed = false
private func _insert(_ disposable: Disposable) -> Disposable? {
self._lock.lock(); defer { self._lock.unlock() }
if self._isDisposed {
return disposable
}
self._disposables.append(disposable)
return nil
}
}
此处以PublishSubject为例:
当其subscribe时,会调用self._synchronized_subscribe(observer)方法,该方法会往bag中insert该AnyObserver<Element>对象并生成一个对应的BagKey,BagKey实际上内部持有一个UInt64类型的rawValue作为唯一标识。拿到key之后和当前PublishSubject对象用于创建一个SubscriptionDisposable对象,这样 _key 和 _owner就绑定了。
紧接着是释放,系统会调用了DisposeBag的deinit方法,除了释放数组元素,self._owner?.synchronizedUnsubscribe(self._key) 会根据key移除相应的观察者:
func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
_ = self._observers.removeKey(disposeKey)
}
8. PrimitiveSequence
其为遵从ObservableConvertibleType和PrimitiveSequenceType协议的泛型结构体,内部持有一个Observable<Element>属性,作为基类内部在extension中实现了deferred,delay,observeOn等方法。
public struct PrimitiveSequence<Trait, Element> {
let source: Observable<Element>
init(raw: Observable<Element>) {
self.source = raw
}
}
extension PrimitiveSequence: ObservableConvertibleType {
//asObservable就是返回source
public func asObservable() -> Observable<E> {
return self.source
}
}
9. Single
Single 是 Observable
的另外一个版本。不像 Observable
可以发出多个元素,它要么只能发出一个元素,要么产生一个 error
事件。
public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
//对应到SingleEvent中
public enum SingleEvent<Element> {
///只生成一个序列元素。(底层可观察序列发出:' .next(Element) ', ' .completed ') case success(Element)
///序列以错误结束。(底层可观察序列发出:' .error(Error) ')
case error(Swift.Error)
}
//从指定的订阅方法实现创建可观察序列
//(SingleEvent<ElementType>) -> Void作为SingleObserver参数,实际上Maybe和Single以及Completable在此处的区别就是处理不同的case
public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
let source = Observable<ElementType>.create { observer in
return subscribe { event in
switch event {
case .success(let element):
observer.on(.next(element))
observer.on(.completed)
case .error(let error):
observer.on(.error(error))
}
}
}
return PrimitiveSequence(raw: source)
}
//订阅“观察者”接收此序列的事件
public func subscribe(_ observer: @escaping (SingleEvent<ElementType>) -> Void) -> Disposable {
var stopped = false
//拿到source中当前的SingleEvent
return self.primitiveSequence.asObservable().subscribe { event in
if stopped { return }
stopped = true
switch event {
case .next(let element):
observer(.success(element))
case .error(let error):
observer(.error(error))
case .completed:
rxFatalErrorInDebug("Singles can't emit a completion event")
}
}
}
10. BehaviorSubject
既是可观察序列又是观察者的对象,每项通知会广播予所有已订阅的观察者。
Observable
先创建一个 AnonymousObserver
,将事件处理方法设置给它的 eventHandler
属性。所有的 Observable 订阅,都会进行这样的方法。在 create
中,由于继承关系调用的是 Producer
的 subscribe
;而 BehaviorSubject 中也实现了自己的 subscribe
方法,BehaviorSubject 的订阅私有部分做的是将刚创建的 AnonymousObserver
保存起来,然后以当前 value
值作为事件值,发出一个事件。
public final class PublishSubject<Element>
//发送
public func on(_ event: Event<Element>) {
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<E>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
//订阅
public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
//如果保存成功,则将该观察者以及对应的BagKey生成SubscriptionDisposable以便后续释放
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
//调用DisposeBag的deinit
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self._lock.lock()
self._synchronized_unsubscribe(disposeKey)
self._lock.unlock()
}
func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
_ = self._observers.removeKey(disposeKey)
}
}
11. MainScheduler
MainScheduler 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。
其继承自SerialDispatchQueueScheduler,SerialDispatchQueueScheduler 抽象了串行 DispatchQueue
。如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。
public class SerialDispatchQueueScheduler : SchedulerType {
//包含DispatchQueue和DispatchTimeInterval延时
let configuration: DispatchQueueConfiguration
init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
//调用self.queue.async {cancel.setDisposable(action(state))}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
}
public final class MainScheduler : SerialDispatchQueueScheduler {
public static let instance = MainScheduler()
//调度要立即执行的操作
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
//原子属性+1 相当于信号量
let previousNumberEnqueued = increment(self.numberEnqueued)
//DispatchQueue.getSpecific(key: token) != nil 设置标识判断是否在主线程
if DispatchQueue.isMain && previousNumberEnqueued == 0 {
let disposable = action(state)
//原子属性-1
decrement(self.numberEnqueued)
return disposable
}
let cancel = SingleAssignmentDisposable()
self._mainQueue.async {
if !cancel.isDisposed {
_ = action(state)
}
decrement(self.numberEnqueued)
}
return cancel
}
}
12. Amb
在多个源 Observables 中, 取第一个发出元素或产生事件的 Observable,然后只发出它的元素
//比如如下序列
Observable<Int>.amb([a, b]).subscribe{}.disposed(by: disposeBag)
public static func amb<S: Sequence>(_ sequence: S) -> Observable<E>
where S.Iterator.Element == Observable<E> {
//使用reduce作用一个不终止的可观察序列,可用于表示无限持续时间
return sequence.reduce(Observable<S.Iterator.Element.E>.never()) { a, o in
return a.amb(o.asObservable())
}
}
//该方法会生成一个Amb<Element>: Producer<Element>类,内部持有当前这两个Observable,当调用subscribe方法时调用run<O : ObserverType>(_ observer: O, cancel: Cancelable)
public func amb<O2: ObservableType>
(_ right: O2)
-> Observable<E> where O2.E == E {
return Amb(left: self.asObservable(), right: right.asObservable())
}
//该方法生成AmbSink<O: ObserverType>: Sink<O>对象并执行run方法,内部在状态为self._choice == me时执行self.forwardOn(event)
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AmbSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
13. buffer
buffer 操作符将缓存 Observable
中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。
let subject = PublishSubject<String>()
subject
.buffer(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
subject.onNext("d")
//调用buffer方法会生成一个BufferTimeCount对象,把对应的缓存时间,缓存个数,调度以及当前Observable保存;当subscribe时,调用run方法生成BufferTimeCountSink
final private class BufferTimeCount<Element>: Producer<[Element]> {
fileprivate let _timeSpan: RxTimeInterval
fileprivate let _count: Int
fileprivate let _scheduler: SchedulerType
fileprivate let _source: Observable<Element>
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == [Element] {
let sink = BufferTimeCountSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
//当调用onNext发送元素时调用
func on(_ event: Event<E>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<E>) {
switch event {
case .next(let element):
//元素添加到buffer数组中,并且当满足缓存个数时,发送
self._buffer.append(element)
if self._buffer.count == self._parent._count {
self.startNewWindowAndSendCurrentOne()
}
case .error(let error):
self._buffer = []
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.next(self._buffer))
self.forwardOn(.completed)
self.dispose()
}
}
//当执行run方法时调用
func createTimer(_ windowID: Int) {
//DisposeBase子类
let nextTimer = SingleAssignmentDisposable()
self._timerD.disposable = nextTimer
//调度之后定时器执行
let disposable = self._parent._scheduler.scheduleRelative(windowID, dueTime: self._parent._timeSpan) { previousWindowID in
self._lock.performLocked {
//当前窗口与回调的滑动窗口id不同则返回
if previousWindowID != self._windowID {
return
}
//窗口id+1,调用self.forwardOn(.next(buffer))
self.startNewWindowAndSendCurrentOne()
}
return Disposables.create()
}
nextTimer.setDisposable(disposable)
}
14. RxCocoa — UINavigationController+Rx
extension Reactive where Base: UINavigationController {
public typealias ShowEvent = (viewController: UIViewController, animated: Bool)
/// Reactive wrapper for `delegate`.
///
/// For more information take a look at `DelegateProxyType` protocol documentation.
public var delegate: DelegateProxy<UINavigationController, UINavigationControllerDelegate> {
//调用DelegateProxyType的proxy(for object: ParentObject)方法,如下
return RxNavigationControllerDelegateProxy.proxy(for: base)
}
/// Reactive wrapper for delegate method `navigationController(:willShow:animated:)`.
public var willShow: ControlEvent<ShowEvent> {
let source: Observable<ShowEvent> = delegate
.methodInvoked(#selector(UINavigationControllerDelegate.navigationController(_:willShow:animated:)))
.map { arg in
//arg为[Any]
let viewController = try castOrThrow(UIViewController.self, arg[1])
let animated = try castOrThrow(Bool.self, arg[2])
return (viewController, animated)
}
return ControlEvent(events: source)
}
}
public static func proxy(for object: ParentObject) -> Self {
//是否Thread.isMainThread
MainScheduler.ensureRunningOnMainThread()
//通过objc_getAssociatedObject获取指定控件(object)的 DelegateProxy 的实例
let maybeProxy = self.assignedProxy(for: object)
let proxy: AnyObject
if let existingProxy = maybeProxy {
proxy = existingProxy
}
else {
//不存在即创建
proxy = castOrFatalError(self.createProxy(for: object))
//通过objc_setAssociatedObject绑定
self.assignProxy(proxy, toObject: object)
assert(self.assignedProxy(for: object) === proxy)
}
//拿到自身的object.delegate
let currentDelegate = self._currentDelegate(for: object)
let delegateProxy: Self = castOrFatalError(proxy)
//是否已经设置
if currentDelegate !== delegateProxy {
//设置接收所有转发消息的delegate的引用
delegateProxy._setForwardToDelegate(currentDelegate, retainDelegate: false)
assert(delegateProxy._forwardToDelegate() === currentDelegate)
//将 proxy 设置为代理对象
self._setCurrentDelegate(proxy, to: object)
assert(self._currentDelegate(for: object) === proxy)
assert(delegateProxy._forwardToDelegate() === currentDelegate)
}
return delegateProxy
}
//返回调用的delegate方法的可观察序列。方法被调用后,元素才被发送。
open func methodInvoked(_ selector: Selector) -> Observable<[Any]> {
MainScheduler.ensureRunningOnMainThread()
//这个属性是一个字典,以selector为键,value为MessageDispatcher对象,其内部持有PublishSubject和Observable
let subject = self._methodInvokedForSelector[selector]
//如果存在 subject 就说明,已经创建了 selector 对应的 Observable 对象了,直接返回即可
if let subject = subject {
return subject.asObservable()
}
else {
//如果没有,那么就创建一个 PublishSubject,并且将其存入 methodInvokedForSelecotr 字典中去,最后返回这个 subject
let subject = MessageDispatcher(selector: selector, delegateProxy: self)
self._methodInvokedForSelector[selector] = subject
return subject.asObservable()
}
}
//_RXDelegateProxy.m
//我们将 DelegateProxy 设置为代理类,但是实现代理方法。所以系统会执行消息转发的方法
-(void)forwardInvocation:(NSInvocation *)anInvocation {
//判断当前方法是否有返回值,因为订阅 Observable 的处理方法不会有返回值
BOOL isVoid = RX_is_method_signature_void(anInvocation.methodSignature);
NSArray *arguments = nil;
if (isVoid) {
//真正的代理方法执行前调用_sentMessageForSelector[selector]
arguments = RX_extract_arguments(anInvocation);
[self _sentMessage:anInvocation.selector withArguments:arguments];
}
//检查原本的代理对象有没有实现这个 selector,如果有,那么执行,保证原本代理方法的执行
if (self._forwardToDelegate && [self._forwardToDelegate respondsToSelector:anInvocation.selector]) {
[anInvocation invokeWithTarget:self._forwardToDelegate];
}
if (isVoid) {
//真正的代理方法执行后调用_sentMessageForSelector[selector]
[self _methodInvoked:anInvocation.selector withArguments:arguments];
}
}