RxSwift - 高阶函数

前言

对于RxSwift这种函数式编程的框架,熟练运用高阶函数能便捷地实现响应的功能,在不同的应用场景,都有响应的高阶函数,学好高阶函数的使用在以后项目中是很重要的。

1、startWith

在源序列元素前插入序列元素

Observable.of("1","2","3","4")
      .startWith("A")
      .startWith("B")
      .startWith("C","a","b")
      .subscribe(onNext: {print($0)})
      .disposed(by: disposeBag)
//输出:C a b B  A 1 2 3 4
2、merge

将源可观察序列中的元素组合成一个新的可观察序列,被合并的序列内部元素须相同类型

let sub1 = PublishSubject<String>()
let sub2 = PublishSubject<String>()
        
Observable.of(sub1,sub2)
     .merge()
     .subscribe(onNext: {print($0)})
     .disposed(by: disposeBag)
        
sub1.onNext("L")
sub1.onNext("c")
sub2.onNext("r")
sub2.onNext("s")
sub1.onNext("q")
//输出 L c r s q 
3、zip

将多个源可观测序列组合成一个新的可观测序列,但只有被观测的序列的相同位置上都有数据时候才会响应。否则暂时保存相应位置的数据,等待其余序列相同位置的数据有了再一起响应。

let stringSub = PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        let intSub1 = PublishSubject<Int>()
        
        Observable.zip(stringSub, intSub,intSub1) { strElement, intElement,intE in
            "\(strElement) \(intElement) \(intE)"
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        stringSub.onNext("A")
        // 到这里存储了 AB 但是不会响应,除非另一个响应
        stringSub.onNext("B")
        //
        intSub.onNext(1)
        intSub.onNext(2)
        //此时三个序列的第一个位置都有数据才会响应
        intSub1.onNext(4)
        stringSub.onNext("C")
        intSub.onNext(3)
//输出: A 1 4
4、combineLatest

在zip的基础上取每个序列里最新的一个元素,即被观测序列里只保留最新一个元素。前提也是满足被观测序列都有元素时才会响应。

let stringSub = PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        
        Observable.combineLatest(stringSub, intSub) { strElement, intElement in
            "\(strElement) \(intElement)"
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        stringSub.onNext("A") // intSub没有元素,不响应 只保存 A 
        stringSub.onNext("B") //覆盖 A   intSub没有元素,不响应 
        
        intSub.onNext(1)  // 两个序列都有 会响应  B  1 
        intSub.onNext(2) //覆盖 1  =>  B  2 
        stringSub.onNext("C") //覆盖 B  =>  C  2  
        intSub.onNext(3) //覆盖 2  =>  C   3
//输出: B  1
// B  2
// C  2 
// C  3

注意:应用非常频繁, 比如账户和密码同时满足相应条件->才能登录,不关心账户密码怎么变化,只要查看最后有值就可以 loginEnable

5、switchLatest

将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素

        let swiSub1 = BehaviorSubject(value: "L")
        let swiSub2 = BehaviorSubject(value: "1")
        let swiSub = BehaviorSubject(value: swiSub1)
        //选择了 swiSub1 就不会监听 swiSub2  
        swiSub.asObservable()
            .switchLatest()
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        //先响应 L
        swiSub1.onNext("c") // 覆盖 L
        swiSub1.onNext("r") // 覆盖 c
        swiSub2.onNext("2") // 暂时不监听  保存 2 覆盖 1
        swiSub2.onNext("3") // 暂时不监听  保存 3 覆盖 2
        swiSub.onNext(swiSub2) // 切换 监听 swiSub2  响应最新 3
        swiSub1.onNext("s") // 暂时不监听  保存 s 覆盖 r
        swiSub1.onNext("q") // 暂时不监听  保存 q 覆盖 s
        swiSub2.onNext("4") //覆盖 3 
        swiSub.onNext(swiSub1)  // 响应 最新 q
//输出:L
// c
// r
// 3
// 4
// q

同一时间只响应最新观测的序列最新的发出元素。

6、map

转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。

let ob = Observable.of(1,2,3,4)
        ob.map { num -> Int in
            return num + 2
        }
        .subscribe{ print($0.element as Any)}
        .disposed(by: disposeBag)
7、flatMap、flatMapLatest

将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。
这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列中还有模型序列)

//类声明
struct LcrPlayer {
    init(score: Int) {
        self.score = BehaviorSubject(value: score)
    }
    let score: BehaviorSubject<Int>
}
//
        let boy = LcrPlayer(score: 100)
        let girl = LcrPlayer(score: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObservable()
            .flatMap {
                $0.score.asObservable()
            }
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        boy.score.onNext(70)
        player.onNext(girl)
        boy.score.onNext(50)
        boy.score.onNext(40)
//  如果flatMap切换到 flatMapLatest 50 和 40 就不会打印
        girl.score.onNext(10)
        girl.score.onNext(0)

//输出:100 70 90 50 40 10 0

flatMap和flatMapLatest不同点就是flatMapLatest只会打印被观测在观测期间最新一个,flatMapLatest实际上是map和switchLatest操作符的组合。

8、filter

仅从满足指定条件的可观察序列中发出元素

Observable.of(1,2,3,4,5,6,7,8,9,0)
            .filter { $0 % 2 == 0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出:2 4 6 8
9、distinctUntilChanged

抑制可观察序列发出的顺序重复元素,相邻元素相同的话就忽略。

Observable.of("1","1","2","3","2","3","3","4","5","6")
            .distinctUntilChanged()
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
//输出:1 2 3 2 3 4 5 6
10、element

仅在可观察序列发出的所有元素的指定索引处发出元素

Observable.of("L", "c", "r", "s", "q")
            .element(at: 3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
11、single

只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
如果指定某一个,就会返回某一个元素,但如果没有指定的元素的话,会响应.error(Sequence doesn't contain any elements.)

Observable.of("Lcr", "sq")
            .single()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出:Lcr
//Unhandled error happened: Sequence contains more than one element.

Observable.of("Lcr", "sq")
            .single { $0 == "sq" }
            .subscribe { print($0) }
            .disposed(by: disposeBag)
//输出:next(sq)  
//completed

Observable.of("Lcr", "sq")
            .single { $0 == "s" }
            .subscribe { print($0) }
            .disposed(by: disposeBag)
//输出:error(Sequence doesn't contain any elements.)
12、take、takeLast、take(while:)

如果single只发出一个,那take可以指定发出的个数,take从前往后发出多少个,takeLast从左往右发出后面多少个,take(while:)发出满足条件的元素。

Observable.of("Tom", "Jack", "Jim", "Lily")
            .take(2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出:Tom
// Jack
Observable.of("Tom", "Jack", "Jim", "Lily")
            .takeLast(2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出:Jim 
//Lily
Observable.of(1,2,3,4,5,6,7)
            .take(while: {$0 < 5})
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出:1 2 3 4
13、take(until:)

响应有效期截至后面接的序列响应为止。

let sourceSequence = PublishSubject<String>()
        let referenceSequence = PublishSubject<String>()
        
        sourceSequence
            .take(until: referenceSequence)
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sourceSequence.onNext("A")
        sourceSequence.onNext("B")
        sourceSequence.onNext("C")
        referenceSequence.onNext("D") // 条件一出来,下面就走不了
        sourceSequence.onNext("1")
        sourceSequence.onNext("2")
        sourceSequence.onNext("3")
//输出:next(A)
//next(B)
//next(C)
//completed
14、skip

从源可观察序列发出元素,直到指定位置参考可观察序列发出元素。
略过前面指定多少次响应,订阅忽略的下一次响应开始。这个要重点,应用非常频繁 不用解释 textfiled 都会有默认序列产生

Observable.of(1, 2, 3, 4, 5, 6)
            .skip(2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出: 3 4 5 6

抑制从源可观察序列发出元素,直到参考可观察序列发出元素。

Observable.of(1, 2, 3, 4, 5, 6)
            .skip { $0 < 4 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出:1 2 3 
sourceSeq
            .skip(until: referenceSeq)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//作用是和.take(until: referenceSequence)恰恰相反。
15、reduce

给定一个初始值,然后逐个增加发出的元素。

Observable.of(10, 100, 1000)
            .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出: 1111
16、concat

以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止。
即前一个序列为完成时,后一个序列即使订阅了也响应不了。
平时开发可用来控制顺序。

let subject1 = BehaviorSubject(value: "A")
        let subject2 = BehaviorSubject(value: "1")
        
        let subjectsSubject = BehaviorSubject(value: subject1)
        
        subjectsSubject.asObservable()
            .concat()
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        subject1.onNext("B")
        subject1.onNext("C")
        
        subjectsSubject.onNext(subject2)
        
        subject2.onNext("打印不出来")
        subject2.onNext("2")
//
        //subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
        subject2.onNext("3")
        subject2.onNext("4")
//输出:next(A)
//next(B)
//next(C)

放开subject1.onCompleted()注释,才能相继响应输出2 3 4 ,只响应subject2中最新的。

17、catchAndReturn

从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止

let sequenceThatFails = PublishSubject<String>()
        
sequenceThatFails
     .catchAndReturn("2")
     .subscribe { print($0) }
     .disposed(by: disposeBag)
        
sequenceThatFails.onNext("A")
sequenceThatFails.onNext("B") // 正常序列发送成功的
sequenceThatFails.onError(self.lgError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
//输出:next(Hank)
//next(Kody)
//next(Cooci)
//completed
18、retry

通过无限地重新订阅可观察序列来恢复重复的错误事件。

var count = 1 // 外界变量控制流程
        let sequenceRetryErrors = Observable<String>.create { observer in
            observer.onNext("A")
            observer.onNext("B")
            observer.onNext("C")
            
            if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
                observer.onError(self.lgError)  // 接收到了错误序列,重试序列发生
                print("错误序列来了")
                count += 1
            }
            
            observer.onNext("D")
            observer.onNext("E")
            observer.onNext("F")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceRetryErrors
            .retry()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
//输出:A B C 错误序列来了 A B C D E F

也可设置retry(3),来设置重试的次数。

19、multicast

将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。

//
let netOB = Observable<Any>.create { observer -> Disposable in
            sleep(2)// 模拟网络延迟
            print("我开始请求网络了")
            observer.onNext("请求到的网络数据")
            observer.onNext("请求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("销毁回调了")
            }
        }.publish()
//      
        netOB.subscribe(onNext: { anything in
            print("订阅1:", anything)
        })
        .disposed(by: disposeBag) 
// 
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的地方
// 所以再订阅一次 会出现什么问题?
        netOB.subscribe(onNext: { anything in
            print("订阅2:", anything)
        })
        .disposed(by: disposeBag)
        
        _ = netOB.connect()
//输出:我开始请求网络了
//订阅1: 请求到的网络数据
//订阅2: 请求到的网络数据
//订阅1: 请求到的本地
//订阅2: 请求到的本地
//销毁回调了

当我们注释掉publish以及netOB.connect()两行代码。运行结果如下:

非publish

可见网络请求来了两次,即订阅了两次网络请求来了两次。
那为什么利用publish以及connect能达到避免多次请求的问题呢?
publish内部封装了multicast
publish

multicast

就来到了ConnectableObservableAdapter的初始化方法:
ConnectableObservableAdapter

source为外界源序列,makeSubject为publish里面的{PublishSubject()},即也是一个序列。
查看ConnectableObservableAdapter的方法中有自己实现的scbscribe订阅函数,因为ConnectableObservableAdapter并没有继承自Producer,就自己实现了subscribe函数。不然外面序列无法用到.subscribe函数,也就无法订阅了。

以及变量lazySubject和connect()函数的实现:



自定义subscribe

lazySubject这么写是保证只会产生一个,避免外界不同订阅会产生不同的lazySubject。
由ConnectableObservableAdapter初始化可知,makeSubject就是.publish方法中的生成PublishSubject实例的闭包。那么lazySubject.subscribe就需要去PublishSubject中寻找实现了。

PublishSubject.subscribe

observer为外界源序列订阅时候产生的·AnonymousObserver·序列,所以源序列每订阅一次,就会产生一个新的observer,所以两次订阅就会来到PublishSubject()的两次subscribe函数的调用。

PublishSubject().subscribe函数中也就是将·observer.on·保存在observers的属性_pairs数组及属性字典_dictionary中。
此处的observer.on即为ObserverBase基类中的.on函数。

ObserverBase.on

接下来我们把目光放到connect函数里面,生成的connection作为外界序列订阅闭包的参数,那么connection必定会响应.on函数,不然响应不到外界序列的订阅。


connection

所以除了初始化方法,我们果然看到.on函数,里面说的也就是上面生成的lazySubject调用on函数,即


lazySubject.on

所以就来到dispatch()方法中,最终响应观察者.on方法也就是在这里:


dispacth

依次在订阅时候保存observer.on的字典及数组中带入外界的.onNext()事件(断点调试发现是在pairs数组里面调用了相关的事件函数),最终就会来到下面的调用:

总结: publish封装的multicast就是将多个订阅的序列合并为一个序列,并按序响应外界的订阅事件。将所有事物保存在一起,虽然生成不一样的观察者,但响应订阅事件是在一块处理的,所以就达到了一次请求网络,多处使用的目的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容

  • 组合操作符 startWith 在开始从可观察源发出元素之前,发出指定的元素序列。可以理解为“+”号,并且后加的先...
    简_爱SimpleLove阅读 500评论 0 2
  • RxSwift高阶函数skipUntil解读 skipUntil的作用:抑制从源可观察序列发出元素,直到参考可观察...
    silasjs阅读 459评论 1 6
  • RxSwift高阶函数map解读 1.map 通过一个转换函数,将 Observable 的每个元素转换一遍。 d...
    silasjs阅读 1,047评论 0 4
  • 组合操作符 一、startWith 在开始从可观察源发出元素之前,发出指定的元素序列 二、merge 将源可观察序...
    king_jensen阅读 618评论 0 1
  • 4. 集合控制操作符 4.1 toArray 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列...
    smart_M阅读 345评论 0 0