RxSwift中的combineLatest函数

看官方注释:

/**
        Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.
        - parameter resultSelector: Function to invoke whenever any of the sources produces an element.
        - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
        */

combineLatest函数是一个由绑定多个序列元素通过一个方法返回的结果的序列。
其中参数可以有2个到8个序列,多于8个的,就使用数组来传递参数,如下:

public static func combineLatest<Collection>(_ collection: Collection, resultSelector: @escaping ([Collection.Element.Element]) throws -> Self.Element) -> RxSwift.Observable<Self.Element> where Collection : Collection, Collection.Element : ObservableType

我们分析原理,只需要看两个参数的就可以了,其他的都是类似的。

分析

combineLatest的原理和上一篇文章中分析的map函数有点类似。我们就具体分析上上一篇文章中combineLatest的例子,如下:

        let stringSub = PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        Observable.combineLatest(stringSub, intSub) { strElement, intElement in
            "\(strElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSub.onNext("L") // 存一个 L
        stringSub.onNext("G") // 存了一个覆盖 - 和zip不一样
        intSub.onNext(1)      // 发现strOB也有G 响应 G 1
        intSub.onNext(2)      // 覆盖1 -> 2 发现strOB 有值G 响应 G 2
        stringSub.onNext("Cooci") // 覆盖G -> Cooci 发现intOB 有值2 响应 Cooci 2
        // combineLatest 比较zip 会覆盖
        // 应用非常频繁: 比如账户和密码同时满足->才能登陆. 不关心账户密码怎么变化的只要查看最后有值就可以 loginEnable
        /*
         输出结果:
         G 1
         G 2
         Cooci 2
         */

我们也还是根据流程来走,条例要比较清晰一点:

1、首先来到combineLatest的创建方法,如下:

    public static func combineLatest<O1: ObservableType, O2: ObservableType>
        (_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) throws -> Element)
            -> Observable<Element> {
        return CombineLatest2(
            source1: source1.asObservable(), source2: source2.asObservable(),
            resultSelector: resultSelector
        )
    }

这是有两个序列参数source1source2combineLatest函数的创建,参数resultSelector即是外面那个闭包,使用两个序列发出的元素后返回结果result的方法。

2、然后来到初始化中的关键代码,这段和map函数中一样:

final class CombineLatest2<E1, E2, Result> : Producer<Result> {
    typealias ResultSelector = (E1, E2) throws -> Result

    let _source1: Observable<E1>
    let _source2: Observable<E2>

    let _resultSelector: ResultSelector

    init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
        self._source1 = source1      // 保存传进来的参数序列1 source1
        self._source2 = source2      // 保存传进来的参数序列2 source2

        self._resultSelector = resultSelector  // 保存传进来的闭包
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
        let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
  • 继承自Producer
  • 保存了传进来的三个参数,两个序列参数source1source2,还有一个闭包参数resultSelector
  • 重写了父类的run方法,意味着CombineLatest2函数序列订阅过后,肯定会走到这个run方法

3、初始化完成过后,就来到最外界的订阅这一行:

            .subscribe(onNext: { print($0) })

注意这是CombineLatest2函数序列的订阅的大括号中的内容,并不是两个源序列订阅的。

4、因为上面订阅是选用的带onNext的具体订阅方法,所以就来到

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable

这个方法的具体实现中,从前面核心逻辑的文章分析可知,里面创建了一个CombineLatest2函数序列的观察者AnonymousObserver。观察者中保存了第3步中的订阅大括号中的打印事件。

然后来到关键代码:

          self.asObservable().subscribe(observer),

这里的self即上面的CombineLatest2函数序列,因为它继承自Producer,所以它的subscribe就来到了Producersubscribe方法,然后来到上面提到的子类它自己CombineLatest2run方法中。

5、CombineLatest2run方法

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
        let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

这里先要初始化CombineLatestSink2,然后再走CombineLatestSink2run方法。

6、CombineLatestSink2的初始化方法

    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(arity: 2, observer: observer, cancel: cancel)  // 调用父类的初始化方法
    }

父类初始化:

init(arity: Int, observer: Observer, cancel: Cancelable) {
        self._arity = arity
        self._hasValue = [Bool](repeating: false, count: arity) // 初始化都为false的数组,即默认参数序列都还没有发送过元素
        self._isDone = [Bool](repeating: false, count: arity)
        
        super.init(observer: observer, cancel: cancel)
    }
  • 这里的parent就是上一步传进来的CombineLatest2
  • 调用父类的初始方法,确定了参数个数arity为2
  • 父类中保存了源序列个数_arity
  • 初始化了一个元素个数为_arity个,默认都为false的_hasValue数组。即默认还没有一个源序列发送元素。

7、然后来到关键一步CombineLatestSink2run方法

    // 核心逻辑
   func run() -> Disposable {
        let subscription1 = SingleAssignmentDisposable()
        let subscription2 = SingleAssignmentDisposable()

        let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)  // 覆盖前面发出的元素,保存最新发出的元素
        let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)

        // 原来是AnonymousObserver,现在换成了包装过后的 CombineLatestObserver
        // 源序列_source1或_source2响应结果就走到observer1.on或observer2.on方法中去了
         subscription1.setDisposable(self._parent._source1.subscribe(observer1))
         subscription2.setDisposable(self._parent._source2.subscribe(observer2))

        return Disposables.create([
                subscription1,
                subscription2
        ])
    }

map函数实现的主要区别也就是在这里。

  • 根据传进来的参数序列source都分别封装成了CombineLatestObserver,即有几个参数序列source就有几个CombineLatestObserver
  • self._parent._source1.subscribe(observer1)参数序列订阅对应的CombineLatestObserver,即最后响应结果就走到了CombineLatestObserveron方法中

8、来到CombineLatestObserver的初始化方法

    init(lock: RecursiveLock, parent: CombineLatestProtocol, index: Int, setLatestValue: @escaping ValueSetter, this: Disposable) {
        self._lock = lock      // 锁
        self._parent = parent  // CombineLatestSink2
        self._index = index    // 参数下标
        self._this = this      // 销毁者
        self._setLatestValue = setLatestValue  // 始终保存最后一个元素
    }

里面两个重要参数:源序列的参数下标:_index 和 始终保存最后一个元素:_setLatestValue

9、源序列发送元素,响应结果,最后来到CombineLatestObserveron方法中

    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }

    func _synchronized_on(_ event: Event<Element>) {  // _synchronized_on里面加了锁,保证每次只是响应一个由源序列发出的元素
        switch event {
        case .next(let value):
            // 调用CombineLatestSink2中的闭包,保存最新的元素 { (e: E1) -> Void in self._latestElement1 = e }
            self._setLatestValue(value)
            self._parent.next(self._index)  // self._parent : CombineLatestSink2
        case .error(let error):
            self._this.dispose()
            self._parent.fail(error)
        case .completed:
            self._this.dispose()
            self._parent.done(self._index)
        }
    }
  • _synchronized_on里面加了锁,保证每次只是会响应一个元素,这样保证了响应顺序,保证了最后保存的元素,一定是源序列最后发送的那个。
  • _setLatestValue如上面代码注释中所写,调用CombineLatestSink2中的闭包,始终覆盖前面的元素,保存最新的元素。
  • _index传入CombineLatestSink2next方法中,因为CombineLatestSink2没有实现next方法,所以来到它的父类CombineLatestSinknext方法中

10、CombineLatestSinknext方法中

    func next(_ index: Int) {
        if !self._hasValue[index] {  // 判断索引对应的序列,是否发送过元素
            self._hasValue[index] = true // 没有发送过元素,这次发送元素进来就标记为 true
            self._numberOfValues += 1 // 并且将已经发送过元素的序列个数保存起来
        }

        if self._numberOfValues == self._arity {  // 当已经发送过元素的序列个数,等于,初始化时参数的序列个数,就可以进行下一步
            do {
                let result = try self.getResult()  // 走到子类CombineLatestSink2_重写的getResult中去,即调用的最外界闭包
                self.forwardOn(.next(result))  // 因为是源码,所以就算你把这句注释了,还是能照常打印,所以平时谨记不能删除源码,不然你自己都分析不通了,谨记
            }
            catch let e {         // 发送错误时候的处理
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
        else {
            var allOthersDone = true

            for i in 0 ..< self._arity {  // 当小于_arity时,即其他的序列还没有完成发送元素
                if i != index && !self._isDone[i] {
                    allOthersDone = false // 标记为否 false
                    break
                }
            }
            
            if allOthersDone {  //都处理完成过后,就发送完成事件
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }

详细分析:
其一:

        if !self._hasValue[index] {  // 判断索引对应的序列,是否发送过元素
            self._hasValue[index] = true // 没有发送过元素,这次发送元素进来就标记为 true
            self._numberOfValues += 1 // 并且将已经发送过元素的序列个数保存起来
        }

这段代码的意思是:每次有元素发送过来的时候,先看下_hasValue数组中对应源序列下标的存储值,是否为true
如果为ture, 说明之前这个序列发送过元素,直接跳过这个大括号往下走。
如果为false, 就将它赋值为true,表示这个序列已经发送过元素了。并且将已经发送过元素的序列个数_numberOfValues加1
其二:

        if self._numberOfValues == self._arity {  // 当已经发送过元素的序列个数,等于,初始化时参数的序列个数,就可以进行下一步
            do {
                let result = try self.getResult()  // 走到子类CombineLatestSink2_重写的getResult中去,即调用的最外界闭包
                self.forwardOn(.next(result))  // 因为是源码,所以就算你把这句注释了,还是能照常打印,所以平时谨记不能删除源码,不然你自己都分析不通了,谨记
            }
            catch let e {         // 发送错误时候的处理
                self.forwardOn(.error(e))
                self.dispose()
            }
        }

这段代码,上面的注释已经很详细了,意思是:
当已经发送过元素的序列个数,等于参数的序列个数时。即所有参数的序列都发送过响应时。
就调用getResult这个方法,即外界传进来的闭包resultSelector,返回由两个源序列的分别发出的最后一个元素,通过resultSelector返回的一个结果result
再然后由self.forwardOn(.next(result))这句代码,CombineLatestSink发送上面的result元素,最后响应在最外界的闭包(onNext: { print($0) })中,并打印出来。

至此流程结束。

其他combineLatest多个参数的时候,原理也还是一样,只是初始化的参数个数arity增加。

总结

  • combineLatest主要使用了CombineLatestObserver来包装源序列source,在CombineLatestObserver始终保存了最新的那个元素,从而覆盖以前的
  • CombineLatestSink中通过传进的序列参数下标记录了,发送元素的序列个数,当所有序列参数都发送过元素时,才会调用外面的闭包,并发送由那些元素通过闭包返回的结果result元素
  • 一定要小心不要删除源码,不然你会发现,逻辑明明走不通,但是打印还是照旧。切记,切记。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352

推荐阅读更多精彩内容