看官方注释:
/**
Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.
- parameter resultSelector: Function to invoke whenever any of the sources produces an element.
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
combineLatest
函数是一个由绑定多个序列元素通过一个方法返回的结果的序列。
其中参数可以有2个到8个序列,多于8个的,就使用数组来传递参数,如下:
public static func combineLatest<Collection>(_ collection: Collection, resultSelector: @escaping ([Collection.Element.Element]) throws -> Self.Element) -> RxSwift.Observable<Self.Element> where Collection : Collection, Collection.Element : ObservableType
我们分析原理,只需要看两个参数的就可以了,其他的都是类似的。
分析
combineLatest的原理和上一篇文章中分析的map函数有点类似。我们就具体分析上上一篇文章中combineLatest的例子,如下:
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSub.onNext("L") // 存一个 L
stringSub.onNext("G") // 存了一个覆盖 - 和zip不一样
intSub.onNext(1) // 发现strOB也有G 响应 G 1
intSub.onNext(2) // 覆盖1 -> 2 发现strOB 有值G 响应 G 2
stringSub.onNext("Cooci") // 覆盖G -> Cooci 发现intOB 有值2 响应 Cooci 2
// combineLatest 比较zip 会覆盖
// 应用非常频繁: 比如账户和密码同时满足->才能登陆. 不关心账户密码怎么变化的只要查看最后有值就可以 loginEnable
/*
输出结果:
G 1
G 2
Cooci 2
*/
我们也还是根据流程来走,条例要比较清晰一点:
1、首先来到combineLatest
的创建方法,如下:
public static func combineLatest<O1: ObservableType, O2: ObservableType>
(_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) throws -> Element)
-> Observable<Element> {
return CombineLatest2(
source1: source1.asObservable(), source2: source2.asObservable(),
resultSelector: resultSelector
)
}
这是有两个序列参数source1
和source2
的combineLatest
函数的创建,参数resultSelector
即是外面那个闭包,使用两个序列发出的元素后返回结果result
的方法。
2、然后来到初始化中的关键代码,这段和map
函数中一样:
final class CombineLatest2<E1, E2, Result> : Producer<Result> {
typealias ResultSelector = (E1, E2) throws -> Result
let _source1: Observable<E1>
let _source2: Observable<E2>
let _resultSelector: ResultSelector
init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
self._source1 = source1 // 保存传进来的参数序列1 source1
self._source2 = source2 // 保存传进来的参数序列2 source2
self._resultSelector = resultSelector // 保存传进来的闭包
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
- 继承自
Producer
- 保存了传进来的三个参数,两个序列参数
source1
和source2
,还有一个闭包参数resultSelector
- 重写了父类的
run
方法,意味着CombineLatest2
函数序列订阅过后,肯定会走到这个run
方法
3、初始化完成过后,就来到最外界的订阅这一行:
.subscribe(onNext: { print($0) })
注意这是CombineLatest2
函数序列的订阅的大括号中的内容,并不是两个源序列订阅的。
4、因为上面订阅是选用的带onNext
的具体订阅方法,所以就来到
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable
这个方法的具体实现中,从前面核心逻辑的文章分析可知,里面创建了一个CombineLatest2
函数序列的观察者AnonymousObserver
。观察者中保存了第3步中的订阅大括号中的打印事件。
然后来到关键代码:
self.asObservable().subscribe(observer),
这里的self
即上面的CombineLatest2
函数序列,因为它继承自Producer
,所以它的subscribe
就来到了Producer
的subscribe
方法,然后来到上面提到的子类它自己CombineLatest2
的run
方法中。
5、CombineLatest2
的run
方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
这里先要初始化CombineLatestSink2
,然后再走CombineLatestSink2
的run
方法。
6、CombineLatestSink2
的初始化方法
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(arity: 2, observer: observer, cancel: cancel) // 调用父类的初始化方法
}
父类初始化:
init(arity: Int, observer: Observer, cancel: Cancelable) {
self._arity = arity
self._hasValue = [Bool](repeating: false, count: arity) // 初始化都为false的数组,即默认参数序列都还没有发送过元素
self._isDone = [Bool](repeating: false, count: arity)
super.init(observer: observer, cancel: cancel)
}
- 这里的
parent
就是上一步传进来的CombineLatest2
- 调用父类的初始方法,确定了参数个数
arity
为2 - 父类中保存了源序列个数
_arity
- 初始化了一个元素个数为
_arity
个,默认都为false的_hasValue
数组。即默认还没有一个源序列发送元素。
7、然后来到关键一步CombineLatestSink2
的run
方法
// 核心逻辑
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) // 覆盖前面发出的元素,保存最新发出的元素
let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
// 原来是AnonymousObserver,现在换成了包装过后的 CombineLatestObserver
// 源序列_source1或_source2响应结果就走到observer1.on或observer2.on方法中去了
subscription1.setDisposable(self._parent._source1.subscribe(observer1))
subscription2.setDisposable(self._parent._source2.subscribe(observer2))
return Disposables.create([
subscription1,
subscription2
])
}
和map
函数实现的主要区别也就是在这里。
- 根据传进来的参数序列
source
都分别封装成了CombineLatestObserver
,即有几个参数序列source
就有几个CombineLatestObserver
-
self._parent._source1.subscribe(observer1)
参数序列订阅对应的CombineLatestObserver
,即最后响应结果就走到了CombineLatestObserver
的on
方法中
8、来到CombineLatestObserver
的初始化方法
init(lock: RecursiveLock, parent: CombineLatestProtocol, index: Int, setLatestValue: @escaping ValueSetter, this: Disposable) {
self._lock = lock // 锁
self._parent = parent // CombineLatestSink2
self._index = index // 参数下标
self._this = this // 销毁者
self._setLatestValue = setLatestValue // 始终保存最后一个元素
}
里面两个重要参数:源序列的参数下标:_index
和 始终保存最后一个元素:_setLatestValue
9、源序列发送元素,响应结果,最后来到CombineLatestObserver
的on
方法中
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) { // _synchronized_on里面加了锁,保证每次只是响应一个由源序列发出的元素
switch event {
case .next(let value):
// 调用CombineLatestSink2中的闭包,保存最新的元素 { (e: E1) -> Void in self._latestElement1 = e }
self._setLatestValue(value)
self._parent.next(self._index) // self._parent : CombineLatestSink2
case .error(let error):
self._this.dispose()
self._parent.fail(error)
case .completed:
self._this.dispose()
self._parent.done(self._index)
}
}
-
_synchronized_on
里面加了锁,保证每次只是会响应一个元素,这样保证了响应顺序,保证了最后保存的元素,一定是源序列最后发送的那个。 -
_setLatestValue
如上面代码注释中所写,调用CombineLatestSink2
中的闭包,始终覆盖前面的元素,保存最新的元素。 - 将
_index
传入CombineLatestSink2
的next
方法中,因为CombineLatestSink2
没有实现next
方法,所以来到它的父类CombineLatestSink
的next
方法中
10、CombineLatestSink
的next
方法中
func next(_ index: Int) {
if !self._hasValue[index] { // 判断索引对应的序列,是否发送过元素
self._hasValue[index] = true // 没有发送过元素,这次发送元素进来就标记为 true
self._numberOfValues += 1 // 并且将已经发送过元素的序列个数保存起来
}
if self._numberOfValues == self._arity { // 当已经发送过元素的序列个数,等于,初始化时参数的序列个数,就可以进行下一步
do {
let result = try self.getResult() // 走到子类CombineLatestSink2_重写的getResult中去,即调用的最外界闭包
self.forwardOn(.next(result)) // 因为是源码,所以就算你把这句注释了,还是能照常打印,所以平时谨记不能删除源码,不然你自己都分析不通了,谨记
}
catch let e { // 发送错误时候的处理
self.forwardOn(.error(e))
self.dispose()
}
}
else {
var allOthersDone = true
for i in 0 ..< self._arity { // 当小于_arity时,即其他的序列还没有完成发送元素
if i != index && !self._isDone[i] {
allOthersDone = false // 标记为否 false
break
}
}
if allOthersDone { //都处理完成过后,就发送完成事件
self.forwardOn(.completed)
self.dispose()
}
}
}
详细分析:
其一:
if !self._hasValue[index] { // 判断索引对应的序列,是否发送过元素
self._hasValue[index] = true // 没有发送过元素,这次发送元素进来就标记为 true
self._numberOfValues += 1 // 并且将已经发送过元素的序列个数保存起来
}
这段代码的意思是:每次有元素发送过来的时候,先看下_hasValue
数组中对应源序列下标的存储值,是否为true
?
如果为ture
, 说明之前这个序列发送过元素,直接跳过这个大括号往下走。
如果为false
, 就将它赋值为true
,表示这个序列已经发送过元素了。并且将已经发送过元素的序列个数_numberOfValues
加1
其二:
if self._numberOfValues == self._arity { // 当已经发送过元素的序列个数,等于,初始化时参数的序列个数,就可以进行下一步
do {
let result = try self.getResult() // 走到子类CombineLatestSink2_重写的getResult中去,即调用的最外界闭包
self.forwardOn(.next(result)) // 因为是源码,所以就算你把这句注释了,还是能照常打印,所以平时谨记不能删除源码,不然你自己都分析不通了,谨记
}
catch let e { // 发送错误时候的处理
self.forwardOn(.error(e))
self.dispose()
}
}
这段代码,上面的注释已经很详细了,意思是:
当已经发送过元素的序列个数,等于参数的序列个数时。即所有参数的序列都发送过响应时。
就调用getResult
这个方法,即外界传进来的闭包resultSelector
,返回由两个源序列的分别发出的最后一个元素,通过resultSelector
返回的一个结果result
。
再然后由self.forwardOn(.next(result))
这句代码,CombineLatestSink
发送上面的result
元素,最后响应在最外界的闭包(onNext: { print($0) })
中,并打印出来。
至此流程结束。
其他combineLatest
多个参数的时候,原理也还是一样,只是初始化的参数个数arity
增加。
总结
-
combineLatest
主要使用了CombineLatestObserver
来包装源序列source
,在CombineLatestObserver
始终保存了最新的那个元素,从而覆盖以前的 - 在
CombineLatestSink
中通过传进的序列参数下标记录了,发送元素的序列个数,当所有序列参数都发送过元素时,才会调用外面的闭包,并发送由那些元素通过闭包返回的结果result
元素 - 一定要小心不要删除源码,不然你会发现,逻辑明明走不通,但是打印还是照旧。切记,切记。