/**
Returns an observable sequence that shares a single subscription
to the underlying sequence, and immediately upon subscription
replays maximum number of elements in buffer
*/
public func shareReplay(bufferSize: Int)->Observable {
if bufferSize == 1 {
return ShareReplay1(source: self.asObservable())
else {
return self.replay(bufferSize).refCount()
}
}
// optimized version of share replay for most common case
final class ShareReplay1<Element>: Observable<Element>, ObserverType, SynchronizedUnsubscribeType {
typealias DisposeKey = Bag<AnyObserverElement>>.KeyType
private let _source: Observable<Element>
private var _lock = NSRecursiveLock()
private var _connection: SingleAssignmentDisposable?
private var _element: Element?
private var _stopped = false
private var _stopEvent = nil as Event<Element>?
private var _observers = Bag<AnyObserver<Element>>()
init(source: Observable<Element>) {
self._source = source
}
override func subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
_lock.lock(); defer { _lock.unlock() }
return _synchronized_subscribe(observer)
}
func _synchronized_subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
if let element = self._element {
observer.on(.Next(element))
}
if let stopEvent = self._stopEvent {
observer.on(stopEvent)
return NopDisposable.instance
}
let initialCount = self._observers.count
let disposeKey = self._observers.insert(AnyObserver(observer))
if initialCount == 0 {
let connection = SingleAssignmentDisposable()
_connection = connection
connection.disposable = self._source.subscribe(self)
}
return SubscriptionDisposable(owner: self, key: disposeKey)
}
func synchronizedUnsubscribe(disposeKey: DisposeKey) {
_lock.lock(); defer { _lock.unlock() }
_synchronized_unsubscribe(disposeKey)
}
func _synchronized_unsubscribe(disposeKey: DisposeKey) {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return
}
if _observers.count == 0 {
_connection?.dispose()
_connection = nil
}
}
func on(event: Event<E>) {
_lock.lock(); defer { _lock.unlock() }
_synchronized_on(event)
}
func _synchronized_on(event: Event<E>) {
if _stopped {
return
}
switch event {
case .Next(let element):
_element = element
case .Error, .Completed:
_stopEvent = event
_stopped = true
_connection?.dispose()
connection = nil
}
observers.on(event)
}
}
shareReplay运算
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- 1. 赋值运算符 "=" Swift赋值语句不可作为条件判断语句 2.基础运算符 "+ - * / %" 3.单目...
- 一元运算符 delete delete 运算符删除对以前定义的对象属性或方法的引用。例如: delete运算符不能...
- 关系运算符 关系运算符小于、大于、小于等于和大于等于执行的是两个数的比较运算,比较方式与算术比较运算相同。每个关系...