RxSwift高阶函数skipUntil解读
skipUntil
的作用:抑制从源可观察序列发出元素,直到参考可观察序列发出元素
示例
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq
.skipUntil(referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 没有条件命令 下面走不了
sourceSeq.onNext("🍆")
sourceSeq.onNext("🥒")
sourceSeq.onNext("🥕")
referenceSeq.onNext("✅") // 条件一出来,下面就可以走了
sourceSeq.onNext("🐥")
sourceSeq.onNext("🦋")
sourceSeq.onNext("🐝")
输出:🐥 🦋 🐝
PublishSubject 是 Observable 的子类。
PublishSubject 将对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。
如果源 Observable 因为产生了一个 error 事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error 事件发送出来。
设置参考序列
示例代码中,首先,我们创建了一个源序列 sourceSeq 和 参考序列 referenceSeq。然后给源序列设置了参考序列并订阅信号。
先看看skipUntil
是怎么设置参考序列的:
public func skipUntil<Source: ObservableType>(_ other: Source) -> Observable<Element> {
return SkipUntil(source: self.asObservable(), other: other.asObservable())
}
新建了个SkipUntil
序列,并保存了源序列和参考序列。
final private class SkipUntil<Element, Other>: Producer<Element> {
fileprivate let _source: Observable<Element>
fileprivate let _other: Observable<Other>
init(source: Observable<Element>, other: Observable<Other>) {
self._source = source
self._other = other
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = SkipUntilSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
订阅
接着我们订阅信号的时候,创建了匿名观察者,最终调用了SkipUntil
的run
。这里又创建了意料之中的SkipUntilSink
,并调用run
函数:
final private class SkipUntilSink<Other, Observer: ObserverType>
: Sink<Observer>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Parent = SkipUntil<Element, Other>
fileprivate let _parent: Parent
fileprivate var _forwardElements = false
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
if self._forwardElements {
self.forwardOn(event)
}
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
if self._forwardElements {
self.forwardOn(event)
}
self.dispose()
}
}
func run() -> Disposable {
let sourceSubscription = self._parent._source.subscribe(self)
let otherObserver = SkipUntilSinkOther(parent: self)
let otherSubscription = self._parent._other.subscribe(otherObserver)
self._sourceSubscription.setDisposable(sourceSubscription)
otherObserver._subscription.setDisposable(otherSubscription)
return Disposables.create(_sourceSubscription, otherObserver._subscription)
}
}
SkipUntil
的run
里面主要做了两个动作:
- 源序列订阅信号,observer 参数为:SkipUntilSink
- 参考序列订阅信号,observer 参数为:SkipUntilSinkOther
这两个序列都是PublishSubject
类的,
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
typealias Observers = AnyObserver<Element>.s
private var _observers = Observers()
public func on(_ event: Event<Element>) {
......
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
......
return self._synchronized_subscribe(observer)
}
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
......
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
}
PublishSubject
实例的subscribe
就直接调用了_synchronized_subscribe
,这里正常情况下都是直接把observer.on
装进袋子(_observers
)里了。_observers
其实就是结构体Bag
,AnyObserver 的集合。
extension AnyObserver {
/// Collection of `AnyObserver`s
typealias s = Bag<(Event<Element>) -> Void>
}
这个袋子的insert
函数中,对observer.on
的保存做了些优化,如果只有一个的话,就直接赋值给属性_key0
和_value0
快速访问,多的话才会放在容器中。
mutating func insert(_ element: T) -> BagKey {
......
if _key0 == nil {
_key0 = key
_value0 = element
return key
}
......
return key
}
到此为止,完成了信号的订阅。
发出信号
现在我们可以开始探索skipUntil
是怎么跳过参考序列发出信号之前的元素了。
PublishSubject
序列onNext
后必然会走到on(event:)
中(可以回看下上面PublishSubject
的代码)。然后先通过_synchronized_on
来决定给哪些观察者派发响应事件,其实就是去翻那个装着observer.on
的袋子Bag
。然后再回头调用
dispatch
:
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
....
}
因为只有一个函数,直接通过_value0
访问调用了。_onlyFastPath
一直都是默认值true
。_value0
就是之前订阅时装进袋子里的observer.on
。两个PublishSubject
序列的on
也不同:
- 源序列的 observer 参数为:SkipUntilSink,
observer.on
就是SkipUntilSink
的on
- 参考序列的 observer 参数为:SkipUntilSinkOther,
observer.on
就是SkipUntilSinkOther
的on
两个 sink 对象的on
都会直接调用_synchronized_on
函数,我们来对比一下:
final private class SkipUntilSinkOther<Other, Observer: ObserverType>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
self._parent._forwardElements = true
self._subscription.dispose()
case .error(let e):
self._parent.forwardOn(.error(e))
self._parent.dispose()
case .completed:
self._subscription.dispose()
}
}
}
final private class SkipUntilSink<Other, Observer: ObserverType>
: Sink<Observer>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
if self._forwardElements {
self.forwardOn(event)
}
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
if self._forwardElements {
self.forwardOn(event)
}
self.dispose()
}
}
}
源序列管道SkipUntilSink
的.next
中,信号发送是由_forwardElements
属性的值来控制的。而参考序列管道SkipUntilSinkOther
在初始化的时候就用_parent
引用了源序列管道,所以在参考序列管道的.next
中,可以设置源序列管道的_forwardElements
属性值,这样就管控了源序列的信号发送。
在参考序列的onNext
调用之前,源序列管道的_forwardElements
开关一直都是关闭的,直到参考序列管道发出一次信号后,才打开_forwardElements
开关。