RxSwift-01:Observable序列实现原理

1. Observable的简单使用

    let observable = Observable<Any>.create { (observable) -> Disposable in
        observable.onNext(10)
        return Disposables.create()
    }
    observable.subscribe { (value) in
        print(value)
    } .dispose()

2. Observable的实现原理

2.1Observable<Any>.create

  public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        AnonymousObservable(subscribe)
    }

2.2.observable.subscribe

observable.subscribe实质是调用extension ObservableType中的subscribe

    public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }

//或者
 public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil){
 return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
}

最后面的subscribe(observer)是调用AnonymousObservable的subscribe
AnonymousObservable继承自Producer,在AnonymousObservable方法中没有找到subscribe函数,所以我们看一下Producer的subscribe
通过断点我们可以看出observable的subscribe走的方法如下

 override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
}

关键代码是self.run,在AnonymousObservable中有实现

2.3. AnonymousObservable的run

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }

2.4. AnonymousObservableSink.run()

   func run(_ parent: Parent) -> Disposable {
        parent.subscribeHandler(AnyObserver(self))
    }

parent.subscribeHandler就是我们在create时的闭包
闭包的参数为AnyObserver(self)。 observable.onNext(10)就是AnyObserver发送onNext

2.5 onNext

extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

self.on(.next(element))调用的是AnyObserver中的

 public func on(_ event: Event<Element>) {
        self.observer(event)
    }

self.observer(event),
我们通过打断点可以知道AnyObserver(self)实质调用的是

   public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }

所以在AnyObserver中的self.observer(event),就是AnonymousObservableSink中的on函数

    func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self.isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

我们本次主要探究.next在AnonymousObservableSink中的函数on中

        switch event {
        case .next:
            if load(self.isStopped) == 1 {
                return
            }
            self.forwardOn(event)
}

forwardOn方法在AnonymousObservableSink的父类中sink

    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self.disposed, 1) {
            return
        }
       self._observer.on(event)
    }

self._observer就是观察者,是AnonymousObserver对象
self._observer.on(event)在AnonymousObserver父类的ObserverBase中有实现

class ObserverBase<ElementType> : Disposable, ObserverType {
    typealias E = ElementType

    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
}

self.onCore(event)方法在AnonymousObserver自己实现。

  override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }

_eventHandler是在订阅时设置的值

 public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            let observer = AnonymousObserver<E> { event in
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容