这篇文章主要还是要通过 map{} 和 mapsink 来简单剖析 sink链的实现逻辑,来初步感受 RxSwift通过这种机制把Observable的创建和订阅时Observable的行为进行了拆分。
_ = Observable.just("1").map{ return $0 + "zp" }.subscribe(onNext: { (str) in
print(str)
})
先从 Observable.just("1")
开始
如果想系统了解 just 的调用流程 请点击这里。
通过的 Just 类里面的 extension ObservableType 扩展方法 just , 我们进入了 Just 类中,进行初始化操作。
Just 类
init
, 保存 just 参数。self._element = element
。
init(element: Element) {
self._element = element
}
- 接着会调用 Producer Observable 的初始化方法。
// Producer
override init() {
super.init()
}
// Observable
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
紧接着进入正题 map{}
, 这里是重点。
operator
的初始化套路,结合just
和map
的初始化我们先小小的总结一下。
首先 - 我们直接调用的
just()
和map{}
并不是Just
和Map
类的初始化方法,而是和它们在一个类的extension ObservableType
扩展方法。
其次 -Just
和Map
的初始化方法把相关的参数都会保存成属性,Just
的_element
以及Map
的_source
_transform
。
最后 - 就是把Producer
和Observable
初始化一遍。
- 通过 extension Observable 的
map<Result>(_ transform:
方法 调用 Map 的 init 方法。
public func map<Result>(_ transform: @escaping (Element) throws -> Result)
-> Observable<Result> {
return Map(source: self.asObservable(), transform: transform)
}
一、我们先来初始化
- 通过
init
方法把_source
和_transform
保存起来。当然Producer
和Observable
初始化一遍。 -
_source
是map Operator
要处理的 observable, 就是咱们的just("1")
。 -
_transform
就是我们要进行操作的逻辑,也就是{ return $0 + "zp" }
这部分代码。
typealias Transform = (SourceType) throws -> ResultType
private let _source: Observable<SourceType>
private let _transform: Transform
init(source: Observable<SourceType>, transform: @escaping Transform) {
self._source = source
self._transform = transform
}
二、 我们进入 subscribe(onNext: ...
这个承上启下的方法。
- 首先是
AnonymousObserver
初始化,并保存_eventHandler
闭包。
// subscribe AnonymousObserver 调用
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
// AnonymousObserver 初始化
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
- 其次
return Disposables.create( self.asObservable().subscribe(observer), disposable )
方法 -
self.asObservable()
是 map,subscribe
直接进入的是Producer
的方法,因为Map
没重写直接走的父类。
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
三、sink 调用入口就在这。
- 先不管
scheduler
相关的判断, 直接先看SinkDisposer
和run
方法。
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
- 为了方便理解 ,我们先看
run
方法。因为Map
重写了run
方法所以必然进入Map
的run
。而run
就是 sink 链的 入口
问:
run
两个入参Observer
和disposer
都是谁
答:disposer
毋庸置疑就是SinkDisposer
对象,Observer
是 AnonymousObserver<String>。
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
let subscription = self._source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
-
MapSink
初始化是把_transform
存在自己的属性中,observer
、cancel
是保存在父类Sink
中的。
// MapSink
typealias Transform = (SourceType) throws -> ResultType
typealias ResultType = Observer.Element
typealias Element = SourceType
private let _transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self._transform = transform
super.init(observer: observer, cancel: cancel)
}
// Sink
fileprivate let _observer: Observer
fileprivate let _cancel: Cancelable
private let _disposed = AtomicInt(0)
#if DEBUG
private let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
四、睁大你布灵布灵的大眼睛,花了呼哨的操作要开始了
- 随着
MapSink
初始化结束, 我们走到了let subscription = self._source.subscribe(sink)
这个方法
问:
self._source
是谁?subscribe
是谁的方法?
答:self._source
是初始化时就存下的 just("1"), 因为Just
override
了subscribe
,所以subscribe
就是Just
的无疑了。
- 我们进入
Just
的subscribe
方法,并执行第一个方法observer.on(.next(self._element))
问:
observer
是谁?,self._element
是谁?. next
又是什么玩意?on
调用谁的?
答:
observer
是在Map
调用run
方法是传进来的MapSink
对象。
self._element
就是在Just
初始化时存下的 “1”。
.next
就是enum Event<Element>
。
on
方法自然就是MapSink
的方法喽。
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
observer.on(.next(self._element)) // self._element 也就是 “1”
observer.on(.completed)
return Disposables.create()
}
- 下面我们进入
MapSink
的on
方法,进入case .next(let element)
方法,self._transform
就是Map
执行的逻辑部分,element
是在Just
类的subscribe
的on
穿过来的 “1”。
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
let mappedElement = try self._transform(element)
self.forwardOn(.next(mappedElement))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
- 进入
Sink
的forwardOn
。forwardOn
是不能重写的,所以直接进入父类的。这里self._observer.on(event)
的self._observer
是AnonymousObserver
, 这个on
方法则是AnonymousObserver
的父类ObserverBase
。
这里需要捋一下
self._observer
和on
是谁? 怎么来的?
self._observer
首先我们要追溯到很久的extension ObservableType
的subscribe(onNext
方法我们调用了self.asObservable().subscribe(observer)
, 这个方法可以理解成Map().subscribe(AnonymousObserver())
就是调用map
的subscribe
方法但是map
没有重写,直接进入了它的父类Producer
的subscribe
,在这个方法里面调用了self.run(observer, cancel: disposer)
方法,它可以理解成map.run(AnonymousObserver(),SinkDisposer())
,就是Map
的run
方法,在这个方法我们初始化了MapSink
而入参Observable
就是 AnonymousObserver对象。self._observer
简图
subscribe(onNext
(extension ObservableType) -> subscribe (Producer) -> run(map) -> init(MapSink)on
既然self._observer
是AnonymousObserver
,on
理应是AnonymousObserver
的,但是它没重写on
, 所以实际 调用的事它的父类ObserverBase
的on
方法。
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
五、从这里开始就接近尾声了
-
ObserverBase
的on
方法,调用self.onCore(event)
, 所以进入AnonymousObserver
的onCore
。onCore
又调用了self._eventHandler(event)
闭包,又回调到extension ObservableType
的subscribe(onNext: ((Element) ...
的 57行,然后就是通过onNext
onError
onCompleted
onDisposed
回到到最初的地方。
// ObserverBase
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
// AnonymousObserver
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
// extension ObservableType
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
六、 SinkDisposer
是干嘛的?
我们可以把
disposer
就是回收资源了,run
笼统理解成运行。
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
- 在
SinkDisposer
的setSinkAndSubscription
中我们保存了,sinkAndSubscription.sink
和sinkAndSubscription.subscription
。 -
sinkAndSubscription.sink
就是MapSink
-
sinkAndSubscription.subscription
就是Just
的Disposables.create()
SinkDisposer可以一边调用_sink.dispose()释放掉Sink派生类占据的资源,另一边调用_subscription.dispose()释放掉订阅上游事件时占据的资源;完成“双边释放”之后,它会把这两个optionals设置为nil
-
dispose
方法调用的时候 ,会把它持有的相关资源释放,并把自己置为nil
func dispose() {
let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)
if (previousState & DisposeState.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = self._sink else {
rxFatalError("Sink not set")
}
guard let subscription = self._subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}