- 当序列发出
error
或者completed
事件,观察者就不能继续响应,内部做了什么? - 观察者返回的
Disposable
,手动调用,又做了什么?
这一切的一切,都是RxSwift
的销毁者dispose
在作怪
接下来lldb
调试对源码进行分析,运行以下demo
// 创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
observer.onNext("销毁者")
return Disposables.create { print("销毁释放了")} // dispose.dispose()
}
// 序列订阅
let dispose = ob.subscribe(onNext: { (anything) in
print("订阅到了:\(anything)")
}, onError: { (error) in
print("订阅到了:\(error)")
}, onCompleted: {
print("完成了")
}) {
print("销毁回调")
}
// 对象 是无法销毁的
print("执行完毕")
dispose.dispose()
输出日志
订阅到了:销毁者
执行完毕
销毁释放了
销毁回调
- 点击
create
,创建了一个匿名销毁者对象
public static func create(with dispose: @escaping () -> Void) -> Cancelable {
return AnonymousDisposable(disposeAction: dispose)
}
- 点击
AnonymousObservable
,初始化保存了响应回调闭包
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
public typealias DisposeAction = () -> Void
private let _isDisposed = AtomicInt(0)
private var _disposeAction: DisposeAction?
public var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
fileprivate init(_ disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
fileprivate init(disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
fileprivate func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
if let action = self._disposeAction {
self._disposeAction = nil
action()
}
}
}
}
- 通过上面代码可以看出,
self._disposeAction
是在调用dispose()
的时候销毁的,然后执行开始保存的action()
闭包,那么问题来了,什么时候开始调用的?
销毁调用
- 订阅序列时通过临时变量
disposable
保存外界的销毁闭包,当收到error
和completed
信号时,调用disposable.dispose()
,销毁临时销毁者变量,具体分析看注释
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
======保存外界销毁闭包======
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
======实际上保存的就是 print("销毁回调")======
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
//当收到序列发送的.error和.completed信号时,会触发disposable.dispose()
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
- 通过上面代码可知,调用
subscribe
返回return Disposables.create( self.asObservable().subscribe(observer), disposable )
,继续跟踪得知,实际上返回一个BinaryDisposable
实例,入参为self.asObservable().subscribe(observer)
和临时销毁者disposable
,如下:
extension Disposables {
/// Creates a disposable with the given disposables.
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
return BinaryDisposable(disposable1, disposable2)
}
}
- 跟踪进入
BinaryDisposable
源码如下:实际上就是用两个销毁者构建一个二元销毁者
private final class BinaryDisposable : DisposeBase, Cancelable {
private let _isDisposed = AtomicInt(0)
private var _disposable1: Disposable?
private var _disposable2: Disposable?
/// - returns: Was resource disposed.
var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
//通过两个销毁者得到一个元销毁者
init(_ disposable1: Disposable, _ disposable2: Disposable) {
self._disposable1 = disposable1
self._disposable2 = disposable2
super.init()
}
/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
///
/// After invoking disposal action, disposal action will be dereferenced.
func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
self._disposable1?.dispose()
self._disposable2?.dispose()
self._disposable1 = nil
self._disposable2 = nil
}
}
}
- 通过上面代码可以知道,在
subscribe
中返回的变量,具有直接调用dispose()
功能,也就是在BinaryDisposable
中的dispose()
,而他的功能,既销毁了订阅中的临时销毁者,也销毁了。。。。。。到底销毁了啥? - 关键点就是
self.asObservable().subscribe(observer)
做了什么,LLDB
调试一下,分析进入Producer
的subscribe
源码如下:
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
//返回一个销毁者
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
-
subscribe
就是返回了一个销毁者,这也验证了,BinaryDisposable(disposable1, disposable2)
传入是的两个销毁者,一个是SinkDisposer()
,另一个是保存外界销毁闭包的局部销毁者let disposable: Disposable
- 首先看看入参都是什么
disposer.setSinkAndSubscription
传入了一个元祖,一个AnonymousObservableSink
和parent._subscribeHandler(AnyObserver(self))
即(外界创建序列的闭包的返回销毁者)
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
- 接下来看看
SinkDisposer
做了什么,当状态满足的时候,就直接调用
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
执行销毁和释放,源码如下:
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self._sink = sink
self._subscription = subscription
//获取状态
let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
//如果状态满足就销毁
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}
- 当收到完成或者错误信号,传入的
AnonymousObservableSink
:Sink
:Disposable
就直接调用dispose()
销毁
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
- 进入
dispose()
查看如下:
init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
-
self._cancel
就是当时初始化传进来的销毁者SinkDisposer
,
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
- 因此可以得知,如果序列发出
complete
或error
信号,以后就无法再接收响应。
整个流程分析下来,可知:
-
sink
将序列、观察者、调度者、销毁者关联了起来,一旦将sink = nil
,他们之间就失去联系,内部创建的临时序列和观察者,也会随着对外的观察者和序列的生命周期而销毁释放,因此就无法继续响应; - 而外部创建的
let dispose = ob.subscribe()
局部变量,即使不主动调用dispose.dispose()
,也会随着作用域空间释放而销毁。