首先,Subject
是一个代理,它既是Observer
,也是Observable
.下面以PublishSubject为例讲解下:
// 1:初始化序列
let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
// 2:发送响应序列
publishSub.onNext(1)
// 3:订阅序列
publishSub.subscribe { print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
正常情况下:创建序列
--->序列订阅
---->观察者发送响应
--->sink.on
-->subscribe闭包调用
--->订阅到了XXX
;这里所有的操作都是publishSub
自己完成的。当然,上面先发送了一次响应才订阅的,因为PublishSubject
只接受订阅之后的内容,so没有作用。
源码解析到底PublishSubject
到底是如何实现的:
首先看到PublishSubject<Int>()
内部初始化只是重写了几个方法,先不去管它,我们主要先看订阅流程publishSub.subscribe { print("订阅到了:",$0)}
,这里老规矩,创建观察者observer
——>asObserverable.subscribe
——>进入到序列内重写的subscribe
(说先不管这一下就进来这个重写的方法了0.0):
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
…
public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
…
}
这里首先看下
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
.lock()
.unlock()
保证了我们发送的响应能够顺序的输出被订阅(上面的例子来说也就是为什么打印的是2,3
,不会出现3,2
的情况);let key = self._observers.insert(observer.on)
是对观察者回调的收集
订阅先到这,发送响应publishSub.onNext(2)
:
public func onNext(_ element: E) {
self.on(.next(element))
}
publishSub的on函数的实现:
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
。。。
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<E>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
。
public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
。。。
}
主要研究dispatch(self._synchronized_on(event), event)
方法:
可以看到self._synchronized_on(event)
函数返回了 之前保存的观察者回调的收集,也就是下面函数的bag
:
func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
//——之前观察者回调的收集,现在取出来用了,做了一个处理——
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
//这里是真正的对闭包的调用了,element(event)调用订阅是观察者闭包
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
pairs[i].value(event)
对观察者回调处理了一下---->然后element(event)
调用订阅时创建的观察者闭包---->事件响应打印输出----->完.
总结:
PublishSubject
遵从了ObservableType
,ObserverType
,可以是观察者也可以是序列,它重写了subscribe
方法;- 订阅时创建观察者,并对观察者回调的收集;
- 发送信号
onNext
,通过on
方法----dispatch
方法取出观察者回调---element(event)
执行观察者闭包---事件响应---->完(这里subscribe
方法后面并没有通过创建sink
去管理,这是一个与核心逻辑小小的区别)