使用示例
example("zip") {
let disposeBag = DisposeBag()
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("🆎")
intSubject.onNext(3)
}
// out put log
--- zip example ---
🅰️ 1
🅱️ 2
🆎 3
实现原理
Zip
有着一系列类簇,从Zip2 - Zip8
,实现原理都是一样的区别在于Observable
数量。所以这里只重点关注下Zip2
的实现
final class Zip2<E1, E2, R> : Producer<R> {
typealias ResultSelector = (E1, E2) throws -> R
let source1: Observable<E1>
let source2: Observable<E2>
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
self.source1 = source1
self.source2 = source2
_resultSelector = resultSelector
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
let sink = ZipSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
和其他操作符一样,仍然通过Sink
实现核心功能
// ZipSink2_.run
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
subscription1.setDisposable(_parent.source1.subscribe(observer1))
subscription2.setDisposable(_parent.source2.subscribe(observer2))
return Disposables.create([
subscription1,
subscription2
])
}
ZipSink2_
创建两个ZipObserver
, 分别订阅observerable1
, observerable2
。
// ZipObserver._synchronized_on
func _synchronized_on(_ event: Event<E>) {
if let _ = _parent {
switch event {
case .next(_):
break
case .error(_):
_this.dispose()
case .completed:
_this.dispose()
}
}
if let parent = _parent {
switch event {
case .next(let value):
_setNextValue(value)
parent.next(_index)
case .error(let error):
parent.fail(error)
case .completed:
parent.done(_index)
}
}
}
和Merge
操作符原理一样, ZipObserver
在接收事件后,会将事件传递给ZipSink2_
。在next
事件的时候会调用_setNextValue
, 这会触发self._values1.enqueue($0)
操作,即数据入列操作
// ZipSink.next
func next(_ index: Int) {
var hasValueAll = true
for i in 0 ..< _arity {
if !hasElements(i) {
hasValueAll = false
break
}
}
if hasValueAll {
do {
let result = try getResult()
self.forwardOn(.next(result))
}
catch let e {
self.forwardOn(.error(e))
dispose()
}
}
else {
var allOthersDone = true
let arity = _isDone.count
for i in 0 ..< arity {
if i != index && !_isDone[i] {
allOthersDone = false
break
}
}
if allOthersDone {
forwardOn(.completed)
self.dispose()
}
}
}
函数分为3个逻辑块:
- 检测是否都有值,
_arity
是被Zip
的Observable
数量,在本例子中等于2
// ZipSink2_. hasElements
override func hasElements(_ index: Int) -> Bool {
switch (index) {
case 0: return _values1.count > 0
case 1: return _values2.count > 0
default:
rxFatalError("Unhandled case (Function)")
}
return false
}
- 如果都有值便发送
forwardOn next
// ZipSink2_.getResult
override func getResult() throws -> R {
return try _parent._resultSelector(_values1.dequeue()!, _values2.dequeue()!)
}
这里要注意的是getResult
, 会触发数据的出列操作,也就是说getResult
前
_values1.count = 1, _values2.count = 1
之后
_values1.count = 0, _values2.count = 0
- 检测是否有
Observable
已经completed
了,如果是的话,发送forwardOn completed
。
简单验证一下
example("zip") {
let disposeBag = DisposeBag()
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")
// stringSubject.onNext("🆎")
stringSubject.onCompleted()
intSubject.onNext(1)
intSubject.onNext(2)
intSubject.onNext(3)
}
// out put log
--- zip example ---
2018-10-13 11:27:37.532: Rx.playground:73 (__lldb_expr_7) -> subscribed
2018-10-13 11:27:37.536: Rx.playground:73 (__lldb_expr_7) -> Event next(🅰️ 1)
🅰️ 1
2018-10-13 11:27:37.537: Rx.playground:73 (__lldb_expr_7) -> Event next(🅱️ 2)
🅱️ 2
2018-10-13 11:27:37.539: Rx.playground:73 (__lldb_expr_7) -> Event completed
2018-10-13 11:27:37.539: Rx.playground:73 (__lldb_expr_7) -> isDisposed
这里 如果去掉 stringSubject.onCompleted()
或者 intSubject.onNext(3)
的话,都不会收到completed
事件
注: ZipSink
是 ZipSink2_
的父类