let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2) //模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
}).disposed(by: disposeBag)
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
}).disposed(by: disposeBag)
我开始请求网络了
订阅1: 请求到的网络数据
订阅1: 请求到的本地
销毁回调了
我开始请求网络了
订阅2: 请求到的网络数据
订阅2: 请求到的本地
销毁回调了
请求两次
加上.publish() 和 _ = netOB.connect(),请求一次
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2) //模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
}).disposed(by: disposeBag)
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
}).disposed(by: disposeBag)
_ = netOB.connect()
我开始请求网络了
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
订阅1: 请求到的本地
订阅2: 请求到的本地
销毁回调了
步骤1:点击publish()
extension ObservableType {
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
}
步骤2:点击multicast
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
步骤3:点击ConnectableObservableAdapter
3.1 self.makeSubject = makeSubject 保存闭包
3.1 外界.subscribe 来到 override func subscribe<Observer: ObserverType>( observer: Observer) -> Disposable where Observer.Element == Subject.Element {}方法
3.2 外界.connect() 来到 connect()方法
3.3 self.lazySubject.asObserver() 懒加载,所以走一次
3.4 let subscription = self._source.subscribe(connection) -> subscribe 出去
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
typealias ConnectionType = Connection<Subject>
fileprivate let _source: Observable<Subject.Observer.Element>
fileprivate let _makeSubject: () -> Subject
fileprivate let _lock = RecursiveLock()
fileprivate var _subject: Subject?
// state
fileprivate var _connection: ConnectionType?
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source
self._makeSubject = makeSubject
self._subject = nil
self._connection = nil
}
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
}