RXSwift源码阅读笔记

RXSwift_Core_Opinion

是什么?

ReactiveX is a library for composing asynchronous(异步) and event-based programs(基于事件) by using observable sequences(可观察序列)

RXSwiftReactiveXSwift 版本,那么我们可以理解成:基于事件和异步组成的可观察序列

RXSwift核心

本文主要对这几个核心通过源码进行展开讲解,描述它们是什么,做了什么事。

  • Observable - 可观察序列
  • Observer - 观察者
  • Observable & Observer 既是可观察序列也是观察者
  • Operator - 操作符,创建变化组合事件
  • Disposable - 管理绑定(订阅)的生命周期
  • Schedulers - 线程队列调配

1. Observable

Observable和ObservableType 最重要的就是提供func subscribe<Observer: ObserverType>订阅观察者Observer

先来看看Observable的源码

public class Observable<Element> : ObservableType {
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
}

ObservableType源码

public protocol ObservableType: ObservableConvertibleType {
    /** Subscribes `observer` to receive events for this sequence. */
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

Producer源码

class Producer<Element>: Observable<Element> {

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        ... 
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
        return disposer
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
}

Observable.of()构建一个可观察序列的实现

let observable = Observable.of("A", "B", "C")

extension ObservableType {
    public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
        ObservableSequence(elements: elements, scheduler: scheduler)
    }
}

final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
    ...
    func run() -> Disposable {
        return self.parent.scheduler.scheduleRecursive(self.parent.elements.makeIterator()) { iterator, recurse in
                ...                                                                                                             observer.on(event)
        }
    }
}

final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    ...
    init(elements: Sequence, scheduler: ImmediateSchedulerType) {
        self.elements = elements
        ...
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

可以看出来:

  1. Observable继承至ObservableType并提供了func subscribe(Observer)抽象函数
  2. Producer继承至Observable,实现了func subscribe(Observer)(主要是调用func run()),并提供了func run<Observer: ObserverType>()抽象函数
  3. RXSwift中提供了多种Observable的初始化方式,比如上面的Observable.of(),而这些初始化方法都继承至Producer并提供了func run<Observer: ObserverType>()的实现(主要调用observer.on(event)发出事件)

总体流程如下:

Observablefunc subscribe(Observer)订阅观察者 -> Producerfunc run()运行 -> 初始化器继承Producer实现抽象函数func run()调用observer.on(event)发出事件

总结:

Observable提供了订阅观察者的功能,并在内部提供了供订阅时调用的run方法来发出事件

1.1 普通序列

这些普通序列和of类似,都是提供了对func run<Observer: ObserverType>()的实现

// 1.0 empty() 方法 创建一个空内容的 Observable 序列。
    let observable = Observable<Int>.empty()

// 1.1 just()  该方法通过传入一个默认值来初始化。
    let observable = Observable<Int>.just(5)

// 1.2 of() 方法 可变数量的参数
    let observable = Observable.of("A", "B", "C")

// 1.3 from() 方法 需要一个数组参数
    let observable = Observable.from(["A", "B", "C"])

// 1.4 range() 方法 指定起始和结束数值
    let observable = Observable.range(start: 1, count: 5)

// 1.5 generate() 方法 提供判断条件
    let observable = Observable.generate(
     initialState: 0,
        condition: { $0 <= 10 },
        iterate: { $0 + 2 }
    )

// 1.6 create() 方法 接受一个 block 形式的参数
    let observable = Observable<String>.create{observer in
        observer.onNext("hangge.com")
         return Disposables.create()
    }

// 1.7 deferred() 方法
    相当于是创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable序列创建的行为,而这个 block 里就是真正的实例化序列对象的地方。
let factory : Observable<Int> = Observable.deferred {
    isOdd = !isOdd
    if isOdd {
        return Observable.of(1, 3, 5 ,7)
    }else {
        return Observable.of(2, 4, 6, 8)
    }
}

// 1.8 interval() 方法 每隔一段设定的时间,会发出一个索引数的元素
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

// 1.9 timer() 方法 延迟执行
    let observable = Observable<Int>.timer(5, scheduler: MainScheduler.instance)

// 延迟定时执行
    let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)

1.2 特征序列

特征序列的目的是为了我们更方便使用序列,在普通序列上封装的一层序列,在不同场景选用不同的特征序列

1.2.1 PrimitiveSequence特征序列

single:(常用在网络请求结果包装

  • 发出一个元素,或一个 error 事件

Completable:

  • 只会发出一个 completed 事件或者一个 error 事件

Maybe:

  • 发出一个元素、或者一个 completed 事件、或者一个 error 事件
public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>
public typealias Maybe<Element> = PrimitiveSequence<MaybeTrait, Element>
核心代码如下:    
extension PrimitiveSequenceType where Trait == XXXTrait {
    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
    let source = Observable<Element>.create { observer in
        return subscribe { event in
            switch event {
              /**  三个特征序列区别就在此
              single只处理.next()、.error()
              completed只处理.completed()、.error()
              Maybe只处理其中一种就结束
              */
                observer.on(xxx)
            }
        }
    }
    return PrimitiveSequence(raw: source)
    }

    public func subscribe(_ observer: @escaping (SingleEvent<Element>) -> Void) -> Disposable {
        ...
    }
}

总结:

  1. PrimitiveSequence 初始化方法为internal级别,Single、Completable、Maybe只能调用func create().asSingle()asCompletable().asMaybe()生成对象的特征序列
  2. func create()内部通过封装Observable<Element>.create()来创建一个序列
1.2.2 SharedSequence特征序列
    public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>
    public typealias Signal<Element> = SharedSequence<SignalSharingStrategy, Element>

Driver:

  • 主线程监听
  • 共享状态变化

Signal:

  • 和Driver唯一不同就是不会发送上一次的事件

主要代码:

public struct SignalSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { SharingScheduler.make() }
    
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        source.share(scope: .whileConnected)
    }
}
public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        source.share(replay: 1, scope: .whileConnected)
    }
}
public enum SharingScheduler {
    /// Default scheduler used in SharedSequence based traits.
    public private(set) static var make: () -> SchedulerType = { MainScheduler() }
}

总结:

  1. Driver和Signal的实现很简单,就是普通序列调用share()实现
  2. SharingScheduler.make() = MainScheduler()达到了主线程监听的功能
  3. Driver和Signal没有提供生成实例的方法,只能通过func asDriver(),func asSignal转化生成
  4. 它们通常用来描述需要共享状态的UI层序列事件
1.2.3 Control特征序列

通常用来描述UI控件产生的事件(ControlEvent)和由事件产生的数据变化(ControlProperty)

ControlProperty:

  • 主线程监听
  • 订阅时发送上一次的事件
  • 不会产生 error 事件

ControlEvent:

  • 主线程监听
  • 订阅时不会发送上一次的事件
  • 不会产生 error 事件

主要代码:

public struct ControlProperty<PropertyType> : ControlPropertyType {
    let values: Observable<PropertyType>
    let valueSink: AnyObserver<PropertyType>

    public init<Values: ObservableType, Sink: ObserverType>(values: Values, valueSink: Sink) where Element == Values.Element, Element == Sink.Element {
        self.values = values.subscribe(on: ConcurrentMainScheduler.instance)
        self.valueSink = valueSink.asObserver()
    }
}

public struct ControlEvent<PropertyType> : ControlEventType {
    public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
        self.events = events.subscribe(on: ConcurrentMainScheduler.instance)
    }
}
extension Reactive where Base: UIControl {
    public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
        let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
                MainScheduler.ensureRunningOnMainThread()

                guard let control = control else {
                    observer.on(.completed)
                    return Disposables.create()
                }

                let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) { _ in
                    observer.on(.next(()))
                }

                return Disposables.create(with: controlTarget.dispose)
            }
            .take(until: deallocated)

        return ControlEvent(events: source)
    }

    public func controlProperty<T>(
        editingEvents: UIControl.Event,
        getter: @escaping (Base) -> T,
        setter: @escaping (Base, T) -> Void
    ) -> ControlProperty<T> {
        let source: Observable<T> = Observable.create { [weak weakControl = base] observer in
                guard let control = weakControl else {
                    observer.on(.completed)
                    return Disposables.create()
                }

                observer.on(.next(getter(control)))

                let controlTarget = ControlTarget(control: control, controlEvents: editingEvents) { _ in
                    if let control = weakControl {
                        observer.on(.next(getter(control)))
                    }
                }
                
                return Disposables.create(with: controlTarget.dispose)
            }
            .take(until: deallocated)

        let bindingObserver = Binder(base, binding: setter)

        return ControlProperty<T>(values: source, valueSink: bindingObserver)
    }

    internal func controlPropertyWithDefaultEvents<T>(
        editingEvents: UIControl.Event = [.allEditingEvents, .valueChanged],
        getter: @escaping (Base) -> T,
        setter: @escaping (Base, T) -> Void
        ) -> ControlProperty<T> {
        return controlProperty(
            editingEvents: editingEvents,
            getter: getter,
            setter: setter
        )
    }
}

常见的一些使用:

extension Reactive where Base: UITextField {
    /// 描述事件和事件产生的数据
    public var text: ControlProperty<String?> {
        return base.rx.controlPropertyWithDefaultEvents(
            getter: { textField in
                textField.text
            },
            setter: { textField, value in
                if textField.text != value {
                    textField.text = value
                }
            }
        )
    }
}

extension Reactive where Base: UIButton {
        /// 描述事件
    public var tap: ControlEvent<Void> {
        controlEvent(.touchUpInside)
    }
}

extension Reactive where Base: UIApplication {
    /// 描述进入后台的通知事件
    public static var didEnterBackground: ControlEvent<Void> {
        let source = NotificationCenter.default.rx.notification(UIApplication.didEnterBackgroundNotification).map { _ in }
        return ControlEvent(events: source)
    }
}

2. Observer

ObserverType 最重要的功能就是提供了发送事件的功能func on(_ event: Event<Element>)

ObserverType源码

public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype Element

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<Element>)
}

/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

2.1普通观察者

2.1.1 AnyObserver
public struct AnyObserver<Element> : ObserverType {
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    public func on(_ event: Event<Element>) {
        self.observer(event)
    }
}
2.1.2 AnonymousObserver
final class AnonymousObserver<Element>: ObserverBase<Element> {
    init(_ eventHandler: @escaping EventHandler) {
        self.eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        self.eventHandler(event)
    }
}
2.1.3 subscribe
extension ObservableType {
    
    public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
        let observer = AnonymousObserver { e in
            on(e)
        }
        return self.asObservable().subscribe(observer)
    }
    
    public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
            let disposable: Disposable
            ...
            let observer = AnonymousObserver<Element> { event in
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    ...
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

总结:

  1. AnyObserverAnonymousObserver 的实现没有太大区别
  2. AnyObserverpublic,外部可以使用。AnonymousObserverinner,只能内部使用。
  3. ObservableType中提供的public func subscribe(onNext:...)实际上就是封装的AnonymousObserver.

2.2 特征观察者

和特征序列一样,特征观察者的目的是为了更方便的使用它,在普通观察者的基础上封装。

Binder

  • 不会处理错误事件
  • 在主线程上观察

源码:

public struct Binder<Value>: ObserverType {
    public typealias Element = Value
    
    private let binding: (Event<Value>) -> Void
    public init<Target: AnyObject>(_ target: Target, scheduler: ImmediateSchedulerType = MainScheduler(), binding: @escaping (Target, Value) -> Void) {
        weak var weakTarget = target

        self.binding = { event in
            switch event {
            case .next(let element):
                _ = scheduler.schedule(element) { element in
                    if let target = weakTarget {
                        binding(target, element)
                    }
                    return Disposables.create()
                }
            case .error(let error):
                rxFatalErrorInDebug("Binding error: \(error)")
            case .completed:
                break
            }
        }
    }

    public func on(_ event: Event<Value>) {
        self.binding(event)
    }

    public func asObserver() -> AnyObserver<Value> {
        AnyObserver(eventHandler: self.on)
    }
}

总结:

  1. Binder 内部封装了事件处理,并且只处理.next(), 如果发出一个.error()事件则会报错
  2. Binder通过AnyObserver包装成一个Observer
  3. 特别好用的语法躺

常见的使用:

let observer: Binder<Bool> = Binder(view) { (view, isHidden) in
    view.isHidden = isHidden
}
extension Reactive where Base: UIView {
  public var isHidden: Binder<Bool> {
      return Binder(self.base) { view, hidden in
          view.isHidden = hidden
      }
  }
}

3. Observer & Observable

ObserverType: 发出事件, ObservableType: 订阅观察者

RXSwift和RXRelay中提供了一些特征类型,它们既是Observer也是Observable

3.1 Subject

**AsyncSubject**            只发送最后一个信号,并且只在.onCompleted()之后才能接受到

**PublishSubject**      订阅之后才接收元素

**ReplaySubject**        无论什么时候订阅,发送存储的信号 ,bufferSize确定存储数量

**BehaviorSubject**   存储上一次的信号、初始化时附带一个默认元素(常用)

源码:

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    
    typealias Observers = AnyObserver<Element>.s
    
    /// Creates a subject.
    public override init() {
        super.init()
    }
    
    public func on(_ event: Event<Element>) {
        dispatch(self.synchronized_on(event), event)
    }

    func synchronized_on(_ event: Event<Element>) -> Observers {
          ...
        switch event {
        case .next:
            return self.observers
        case .completed, .error:
            ...
            return Observers()
        }
    }
    
    public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self.lock.performLocked { self.synchronized_subscribe(observer) }
    }

    func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        ...
        let key = self.observers.insert(observer.on)
    }
}

总结:

  1. 继承至Observable,并遵循了ObserverType协议,所以Subject既是Observable,也是Observer.
  2. 实现了Observable的fucn subscribe(Observer)来订阅观察者,实现了ObserverType的func on(event)来发出事件
  3. Subject与普通观序列不同,它不会继承Producer,它自己实现了subscribe来处理订阅逻辑
  4. subscribe订阅时保存了所有的observer,on发出事件时取出observers, 分发事件。与ShareReplay不同,Subject中的所有observer都被订阅,而ShareReplay只订阅了第一个观察者,所以ShareReplay是共享状态。

3.2 Relay

对Subject类型包装,唯一不同的是只发出onNext()事件

PublishRelay is a wrapper for PublishSubject

BehaviorRelay is a wrapper for BehaviorSubject

ReplayRelay is a wrapper for ReplaySubject

最后ControlProperty通用也是一个Observer & Observable, 它既可以最为观察者,也可以作为序列

4. 操作符

处理得到想要的序列

如何选择操作符讲的比较详细

5. Disposable

订阅后的生命周期管理

管理谁的生命周期?

消耗资源统计

需要在Debug环境中定义TRACE_RESOURCES,只需要在Podfile中添加下面的代码

public class DisposeBase {
    init() {
#if TRACE_RESOURCES
    _ = Resources.incrementTotal()
#endif
    }
    
    deinit {
#if TRACE_RESOURCES
    _ = Resources.decrementTotal()
#endif
    }
}

/// Podfile中添加如下实现
post_install do |installer|
    # Enable tracing resources
    installer.pods_project.targets.each do |target|
      if target.name == 'RxSwift'
        target.build_configurations.each do |config|
          if config.name == 'Debug'
            config.build_settings['OTHER_SWIFT_FLAGS'] ||= ['-D', 'TRACE_RESOURCES']
          end
        end
      end
    end
end

生命周期的管理出现在订阅后

/// 订阅后 -> Disposable
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable

5.1 手动管理

Disposable.dispose()销毁

5.2 加入销毁包

Disposable.disposed(by bag: DisposeBag)通过DisposeBag管理, DisposeBag销毁时会调用Disposable.dispose()

extension Disposable {
    /// Adds `self` to `bag`
    ///
    /// - parameter bag: `DisposeBag` to add `self` to.
    public func disposed(by bag: DisposeBag) {
        bag.insert(self)
    }
}

public final class DisposeBag: DisposeBase {
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()
    }

    public func insert(_ disposable: Disposable) {
        self._insert(disposable)?.dispose()
    }
    

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose() {
        let oldDisposables = self._dispose()

        for disposable in oldDisposables {
            disposable.dispose()
        }
    }
    
    deinit {
        self.dispose()
    }
}

5.3 dispose销毁了什么?

5.3.1 Producer类型的序列销毁

来看看Producer

class Producer<Element>: Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        }
    }
}


private final class SinkDisposer: Cancelable {
    func dispose() {
        let previousState = fetchOr(self.state, DisposeState.disposed.rawValue)

        if (previousState & DisposeState.disposed.rawValue) != 0 {
            return
        }

        if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
            guard let sink = self.sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = self.subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            self.sink = nil
            self.subscription = nil
        }
    }
}

我们最终拿到的disposerSinkDisposer, 而我们管理生命周期调用的dispose就是将SinkDisposer

sink.dispose()
subscription.dispose()
self.sink = nil
self.subscription = nil

sinksubscription来源于let sinkAndSubscription = self.run(observer, cancel: disposer)

这时候又回到了我们在Observable中提到的func run(),它的实现取决于它的使用者:

run

我们还是选取AnonymousObservable来看看

final private class AnonymousObservable<Element>: Producer<Element> {
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = AnonymousObservable<Element>

    // state
    private let isStopped = AtomicInt(0)

    #if DEBUG
        private let synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self.isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        parent.subscribeHandler(AnyObserver(self))
    }
}

class Sink<Observer: ObserverType>: Disposable {
    func dispose() {
        fetchOr(self.disposed, 1)
        self.cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}

结论

我们调用

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable

得到的Disposable就是AnonymousObservableSink, 它保存了我们的Observer, 我们销毁时就丢弃了AnonymousObservableSink信息并执行了Sinkdispose

5.3.2 Subject 类型的序列销毁

Producer不同,Subject订阅重写了订阅方法

 public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
      self.lock.performLocked { self.synchronized_subscribe(observer) }
  }

  func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
      if let stoppedEvent = self.stoppedEvent {
          observer.on(stoppedEvent)
          return Disposables.create()
      }

      if self.isDisposed {
          observer.on(.error(RxError.disposed(object: self)))
          return Disposables.create()
      }

      let key = self.observers.insert(observer.on)
      return SubscriptionDisposable(owner: self, key: key)
  }

    func synchronized_unsubscribe(_ disposeKey: DisposeKey) {
        _ = self.observers.removeKey(disposeKey)
  }

可以看出来我们订阅观察者后得到了SubscriptionDisposable(owner: self, key: key)

struct SubscriptionDisposable<T: SynchronizedUnsubscribeType> : Disposable {
    private let key: T.DisposeKey
    private weak var owner: T?

    init(owner: T, key: T.DisposeKey) {
        self.owner = owner
        self.key = key
    }

    func dispose() {
        self.owner?.synchronizedUnsubscribe(self.key)
    }
}

结论:

最终我们要调用的就是SubscriptionDisposable.dispose(), 而self.owner就是我们保持的Subject,最终调用了上面的

func synchronized_unsubscribe(_ disposeKey: DisposeKey) {
   _ = self.observers.removeKey(disposeKey)
}

移除了我们订阅时保存的观察者

6. Schedulers

描述任务执行在哪个线程,主要用在subscribeOnobserveOn执行线程的切换,那么我们需要知道:

  1. 哪些任务在subscribeOn切换的线程上执行?
  2. 哪些任务在observeOn切换的线程上执行?

6.1 subscribeOn

看看它的源码

final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class SubscribeOnSink<Ob: ObservableType, Observer: ObserverType>: Sink<Observer>, ObserverType where Ob.Element == Observer.Element {

      ...
    func run() -> Disposable {
        ...
        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

        cancelSchedule.setDisposable(disposeSchedule)
        return disposeEverything
    }
}
class Producer<Element>: Observable<Element> {
 

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        ...
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
}

总结:

  1. SubscribeOn继承至Producer,它把序列重新包装成新的Observable

  2. 在订阅后执行func run(), 然后调用我们指定的scheduler

    self.parent.scheduler.schedule(()) {
     let subscription = self.parent.source.subscribe(self)
    }
    

    执行后又回到了source.subscribe(self), 这时候就开始执行Producersubscribe,而这个订阅就发生在我们指定的scheduler

    CurrentThreadScheduler.instance.schedule(()) { _ in
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
        return disposer
    }
    

哪些任务在subscribeOn切换的线程上执行?

答案:取决于继承Producer后,func run(observer, cancel: disposer)做的事情

6.2 observeOn

看看源码

final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
 
        ...
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }

}


final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
    override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }
}

总结:

  1. ObserveOnSerialDispatchQueue继承至Producer,它把序列重新包装成新的Observable

  2. 在订阅后执行func run()

    let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
    let subscription = self.source.subscribe(sink)
    

    而这个时候订阅的就是ObserveOnSerialDispatchQueueSink,内部重写onCore(), 在产生事件就会调用它

    override func onCore(_ event: Event<Element>) {
            _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
        }
    

    事件在指定的scheduler发送

哪些任务在observeOn切换的线程上执行?

答案:发送事件和处理事件在切换后的线程执行

6.3 内置的Scheduler

RXSwfit中内置了如下几种 Scheduler:

  1. CurrentThreadScheduler:表示当前线程 Scheduler。(默认使用这个)
  2. MainScheduler:表示主线程。如果我们需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler运行。
  3. SerialDispatchQueueScheduler:封装了 GCD 的串行队列。如果我们需要执行一些串行任务,可以切换到这个 Scheduler 运行。
  4. ConcurrentDispatchQueueScheduler:封装了 GCD 的并行队列。如果我们需要执行一些并发任务,可以切换到这个 Scheduler 运行。
  5. OperationQueueScheduler:封装了 NSOperationQueue。

示例:

let serialQueue =  SerialDispatchQueueScheduler(internalSerialQueueName: "Serial")
let main = MainScheduler()
        
DispatchQueue.global().async {
  Observable<Int>.create { ob in
      print("线程 --- \(Thread.current)")
      ob.onNext(1)
      return Disposables.create {}
    }
    .subscribe(on: main)
    .observe(on: serialQueue)
    .subscribe(onNext: { value in
        print("线程 --- \(Thread.current)")
    }).disposed(by: self.bag)
}

log

线程 --- <_NSMainThread: 0x60000277c000>{number = 1, name = main}
线程 --- <NSThread: 0x600002761400>{number = 3, name = (null)}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容