Observable 的流程
// 1: 创建序列
_ = Observable<String>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("Cooci - 框架班级")
return Disposables.create() // 这个销毁不影响我们这次的解读
// 2: 订阅序列
}.subscribe(onNext: { (text) in
print("订阅到:\(text)")
})
- 创建序列 -
Observable<Any>.create
- 订阅序列 -
ob.subscribe
- 发送信号 -
obserber.onNext()
obserber.onCompleted()
obserber.onError()
核心逻辑源码分析
几个主要类的继承结构,方便大家更好的理解:
1.Observable序列的创建
extension ObservableType {
// MARK: create
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
-
create
方法的时候创建了并返回了一个内部对象ob
:AnonymousObservable
-
AnonymousObservable
内部保存了_subscribeHandler
闭包 -
AnonymousObservable
继承了Producer
,获得了subscribe
功能
2.Observable序列的订阅
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
}
-
ObservableType
拓展实现了subscribe
- 创建了一个内部观察者
AnonymousObserver
对象,它这里的初始化是闭包参数,保存了外界的 onNext, onError , onCompleted , onDisposed 的处理回调闭包的调用 - 调用
self.asObservable()
返回一个原序列(AnonymousObservable
)对象:ob
- 最后
ob
调用父类Producer
的subscribe()
方法,并把这个内部观察observer
者带了过去
进入Producer
的subscribe()
方法:参数(observer
)
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
- 这里主要关心
self.run
的调用,其他无关订阅流程的先不管 -
Producer
调用run
最终会定位到子类AnonymousObservable.run
回到AnonymousObservable.run
:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
- 创建了中介者
AnonymousObservableSink
对象 - 中介者对象调用
run
,并传入参数:self
(AnonymousObservable
本身),观察者observer
进入AnonymousObservableSink
:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
- 给
AnonymousObservable
别名Parent
- 初始化调用了父类
Sink
的初始化:super.init
,并保存观察者_observer
- 实现了
on
, 就是next,error,completed
的事件闭包 -
on
内调用了forwardOn
-
run
方法里的parent
就是AnonymousObservable
,这里就相当于AnonymousObservable._subscribeHandler
,参数AnyObserver
对象 -
AnyObserver
通过传入self
(AnonymousObservableSink
本身)初始化发放信号对象
进入AnyObserver
内:
public struct AnyObserver<Element> : ObserverType {
...
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
...
public func on(_ event: Event<Element>) {
return self.observer(event)
}
}
- 将
sink
的on
保存为observer
(observer这里就是保存了一个函数,并不是之前的观察者),即:AnonymousObservableSink.on
-
on
方法内调用self.observer
,即调用了AnonymousObservableSink.on
- 到了这里最后执行了
AnonymousObservableSink.on
内部的forwardOn
这里需要注意:AnonymousObservableSink.run
内部通过AnonymousObservable._subscribeHandler
传入一个消息发送者AnyObserver
,这里AnyObserver
即我们create
闭包中的observer
3.Observable序列发送信号
//1.创建信号
let ob = Observable<Any>.create { (observer) -> Disposable in
//3.发送信号
observer.onNext("RxSwift核心逻辑")
//observer.onError("error的" as! Error)
observer.onCompleted()
return Disposables.create()
}
- 这里闭包中的
observer
本质就是AnyObserver
- 当我们发送信息
onNext, onCompleted, onError
的直接就来到ObserverType
的拓展 - 继承关系请看前边的继承图
public protocol ObserverType {
associatedtype E
func on(_ event: Event<E>)
}
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
- 内部调用
self.on
,本质就是调用(类的继承关系请看继承图)
AnyObserver.on
->AnonymousObservableSink.on
->AnonymousObservableSink.forwardOn
-> 通过父类调用
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
Sink.forwardOn
->Sink._observer.on
->AnonymousObserver.on
-> 通过父类调用
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
3.
ObserverBase.on
->AnonymousObserver.onCore
->AnonymousObserver._eventHandler
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
4.最后就通过AnonymousObserver创建时候的闭包,把
onNetxt,onError,onComplete
回调出去了
总结关系图: