shareReplay运算

/**
Returns an observable sequence that shares a single subscription
 to the underlying sequence, and immediately upon subscription 
replays maximum number of elements in buffer
*/
public func shareReplay(bufferSize: Int)->Observable {
    if bufferSize == 1 {
        return ShareReplay1(source: self.asObservable())
    else {
        return self.replay(bufferSize).refCount()
    }
}

// optimized version of share replay for most common case
final class ShareReplay1<Element>: Observable<Element>, ObserverType, SynchronizedUnsubscribeType {
    typealias DisposeKey = Bag<AnyObserverElement>>.KeyType

    private let _source: Observable<Element>

    private var _lock = NSRecursiveLock()

    private var _connection: SingleAssignmentDisposable?
    private var _element: Element?
    private var _stopped = false
    private var _stopEvent = nil as Event<Element>?
    private var _observers = Bag<AnyObserver<Element>>()

    init(source: Observable<Element>) {
        self._source = source
    }

    override func subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
        _lock.lock(); defer { _lock.unlock() }
        return _synchronized_subscribe(observer)
    }

    func _synchronized_subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
        if let element = self._element {
            observer.on(.Next(element))
        }

        if let stopEvent = self._stopEvent {
            observer.on(stopEvent)
            return NopDisposable.instance
        }

        let initialCount = self._observers.count

        let disposeKey = self._observers.insert(AnyObserver(observer))

        if initialCount == 0 {
            let connection = SingleAssignmentDisposable()
            _connection = connection

            connection.disposable = self._source.subscribe(self)
        }

        return SubscriptionDisposable(owner: self, key: disposeKey)
    }

    func synchronizedUnsubscribe(disposeKey: DisposeKey) {
        _lock.lock(); defer { _lock.unlock() }
        _synchronized_unsubscribe(disposeKey)
    }

    func _synchronized_unsubscribe(disposeKey: DisposeKey) {
        // if already unsubscribed, just return
        if self._observers.removeKey(disposeKey) == nil {
            return
        }

        if _observers.count == 0 {
            _connection?.dispose()
            _connection = nil
        }
    }

    func on(event: Event<E>) {
        _lock.lock(); defer { _lock.unlock() }
        _synchronized_on(event)
    }

    func _synchronized_on(event: Event<E>) {
        if _stopped {
            return
        }

        switch event {
        case .Next(let element):
            _element = element
        case .Error, .Completed:
            _stopEvent = event
            _stopped = true
            _connection?.dispose()
            connection = nil
        }

        observers.on(event)
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容