上文RxSwift-deallocating,deallocated源码解析,我们提到deallocating
序列和takeUntil
序列经常结合起来使用,本文将分析takeUntil
,探索一下takeUntil
内部是如何接收deallocating
发送的响应非常有必要。
takeUntil源码解析
public func takeUntil<Source: ObservableType>(_ other: Source)
-> Observable<Element> {
return TakeUntil(source: self.asObservable(), other: other.asObservable())
}
返回的是一个TakeUntil
序列,TakeUntil
对象依然继承自我们熟悉的Producer
。
final private class TakeUntil<Element, Other>: Producer<Element> {
fileprivate let _source: Observable<Element>
fileprivate let _other: Observable<Other>
init(source: Observable<Element>, other: Observable<Other>) {
self._source = source
self._other = other
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = TakeUntilSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
TakeUntil
序列保存了原始序列_source
(上文的例子中,_source
保存的是vc.publicOB
序列)以及序列_other
(上文的例子中,_other
保存的就是vc.rx.deallocating
序列)
外界的subscribe
的是TakeUntil
序列。TakeUntil
序列被订阅时,会执行TakeUntil.run
,TakeUntil.run
调用TakeUntilSink.run
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
TakeUntilSink
保存TakeUntil
到_parent
属性.
func run() -> Disposable {
let otherObserver = TakeUntilSinkOther(parent: self)
let otherSubscription = self._parent._other.subscribe(otherObserver)
otherObserver._subscription.setDisposable(otherSubscription)
let sourceSubscription = self._parent._source.subscribe(self)
return Disposables.create(sourceSubscription, otherObserver._subscription)
}
TakeUntilSink.run
中,
1.创建观察者TakeUntilSinkOther
, TakeUntilSinkOther
的_parent
保存TakeUntilSink
2.self._parent._other.subscribe(otherObserver)
订阅序列,这个序列就是上文的例子的vc.rx.deallocating
序列。
RxSwift-deallocating,deallocated源码解析中,我们提到当对象在dealloc
时,会向DeallocatingProxy
中存储ReplaySubject
序列(就是vc.rx.deallocating
序列)发送响应,会来到这里的观察者TakeUntilSinkOther.on(.next())
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
self._parent.forwardOn(.completed)
self._parent.dispose()
case .error(let e):
self._parent.forwardOn(.error(e))
self._parent.dispose()
case .completed:
self._subscription.dispose()
}
}
TakeUntilSinkOther.on(.next())
----->Sink.forwardOn(.completed)
----->TakeUntilSink.observer.(.completed)
.
最终会给TakeUntil
序列的observer
发送.completed
信号,序列完成并销毁。
总结
1.takeUntil
是一个中间层,在takeUntil
被订阅的流程中,中间层takeUntil
会订阅rx.deallocating()
序列.
2.当对象被销毁,对rx.deallocating()
发送响应时,会调用到观察者的TakeUntilSinkOther.on(.next())
,最终会给TakeUntil
序列的observer
发送.completed
信号,序列完成并销毁。