publish使用
let subject = PublishSubject<Any>()
subject.subscribe{print("00:\($0)")}
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
// 所以在订阅一次 会出现什么问题?
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
/*打印结果:
我开始请求网络了
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
订阅1: 请求到的本地
订阅2: 请求到的本地
销毁回调了
*/
我们看到网络只会请求一次。这种请求一次,订阅到多个不同的地方的场景很多。所以我们有必要了解一下publish是怎么实现的。
探索publish的源码,还是从RxSwift的流程:创建序列,订阅序列,发送响应入手分析。
publish的序列创建
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
我们看到publish实际返回的是multicast
public func multicast<Subject: SubjectType>(_ subject: Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: { subject })
}
返回类型为ConnectableObservableAdapter
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source
self._makeSubject = makeSubject
self._subject = nil
self._connection = nil
}
1.保存源序列_source
2.保存初始化序列PublishSubject()至_makeSubject
序列的订阅
当序列订阅时,调用ConnectableObservableAdapter的subscribe(observer)
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
这里订阅lazySubject序列,这是一个懒加载方式,每次订阅的都是序列_subject,即创建中保存的_makeSubject。
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
订阅_subject时,将会调用PublishSubject.subscribe(observer)
let subscription = self._synchronized_subscribe(observer)
调用_synchronized_subscribe(observer)
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
调用self._observers.insert(observer.on)
mutating func insert(_ element: T) -> BagKey {
let key = _nextKey
_nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
if _key0 == nil {
_key0 = key
_value0 = element
return key
}
_onlyFastPath = false
if _dictionary != nil {
_dictionary![key] = element
return key
}
if _pairs.count < arrayDictionaryMaxSize {
_pairs.append((key: key, value: element))
return key
}
_dictionary = [key: element]
return key
}
publish订阅时,将observer.on的回调方法保存起来,如果是首次订阅将key保存在_key0,observer.on保存在_value0,不是首次保存订阅,那么就保存在_dictionary中。
key是BagKey类型:
struct BagKey {
fileprivate let rawValue: UInt64
}
publish将所有订阅的观察者回调方法保存起来,以备后续发起响应时,调用所有观察者的回调方法。
发送响应
使用connect发送响应
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
self._lock.calculateLocked加了递归锁
final func calculateLocked<T>(_ action: () -> T) -> T {
self.lock(); defer { self.unlock() }
return action()
}
然后执行action(),就是下列代码:
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
1.保证只有一个_connection,当_connection存在,直接返回_connection。保证下列的原序列_source永远只被订阅一次,那么外界网络请求的闭包就只会执行一次。
2.当_connection为nil时,会创建
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
(1)创建Connection
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver
}
保存ConnectableObservableAdapter到_parent,self.lazySubject.asObserver()保存至_subscription
(2)保存Connection至_connection
(3)订阅源序列_source,connection是观察者. 将会来到AnonymousObservable.run,然后再到sink.run,最终会调用到_subscribeHandler执行到外界的闭包中。
在外界闭包中,observer.onNext("请求到的网络数据")发起响应,原source的观察者是Connection,Connection.on
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
Connection.on调用self._subjectObserver.on(event)
self._subjectObserver就是publish创建时保存的那个PublishSubject()
self._subjectObserver.on(event)等同于PublishSubject.on(event)
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
1.调用_synchronized_on
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
_synchronized_on中返回self._observers,即订阅时保存的所有的observer.on
2.dispatch(self._synchronized_on(event), event)调用dispatch
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
bag._value0?(event)首先调用第一个订阅者的回调方法。
然后循环执行_pairs、_dictionary中所有保存的回调方法。
以上就是publish所有流程的源码解析。
总结:
1.publish创建,实际返回是ConnectableObservableAdapter序列,保存了源序列,并且创建保存PublishSubject()序列。
2.publish订阅时,订阅的是中间层ConnectableObservableAdapter的PublishSubject()序列,并保存所有的回调方法observer.on
3.connect时,实际上是订阅了源序列,观察者为我们自己创建的Connection对象,这个时候将会来到创建源序列时的闭包,我们就是在这个闭包中请求网络.
4.给源序列发送响应时,实际上会来到观察者Connection.on方法,Connection中将会对订阅时保存的_observers一一执行。
以上就是publish序列的所有流程。