【RXSwift】系列四 :变换操作符

变换操作符

对原始的Observable序列进行一些转换

1. map

通过使用一个闭包将原来的Observable序列,转成一个新的Observable

img
    func mapOperate() {
        let ofSequence = Observable.of(1,2,3)
        ofSequence.map { $0 * 2 }
            .subscribe { print($0) }
            .disposed(by: bag)
    }

运行结果:

next(2)
next(4)
next(6)
completed

2. flatMap

将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 合并。

Observable 的元素本身拥有其他的 Observable 时,你可以将所有 Observables 的元素合并之后在发出来,即将其 “拍扁”(降维)成一个 Observable。

img
    func flatMapOperate() {
        let subject1 = BehaviorRelay(value: "1")
        let subject2 = BehaviorRelay(value: "A")

        let subject = BehaviorRelay(value: subject1)
        subject.flatMap { $0 }
            .subscribe { print($0) }
            .disposed(by: bag)
        
        subject1.accept("2")
        subject.accept(subject2)
        subject1.accept("3")
        subject2.accept("B")
    }

运行结果:

next(1)
next(2)
next(A)
next(3)
next(B)

3. flatMapLatest

flatMapLatestflatMap 的区别在于:flatMapLatest 只会从最近的序列中发出事件。

flatMapFirstflatMapLatest 正好相反,flatMapLatest 只会从最初的序列中发出事件。下面的例子换成 flatMapFirst,只会接收subject1发出的事件

    func flatMapLatestOperate() {
        let subject1 = BehaviorRelay(value: "1")
        let subject2 = BehaviorRelay(value: "A")
        
        let subject = BehaviorRelay(value: subject1)
        subject.flatMapLatest { $0 }
            .subscribe { print($0) }
            .disposed(by: bag)
        
        subject1.accept("2")
        subject.accept(subject2)
        subject1.accept("3")     // 不会收到
        subject2.accept("B")
    }

运行结果:

next(1)
next(2)
next(A)
next(B)

4. concatMap

concatMapflatMap 的区别:只有当前一个序列发出 completed事件 (如果发出error事件,所以序列都会终止),才对后一个序列进行订阅。

    func concatMapOperate() {
        let subject1 = BehaviorSubject(value: "1")
        let subject2 = BehaviorSubject(value: "A")
        
        let subject = BehaviorRelay(value: subject1)
        subject.concatMap { $0 }
            .subscribe { print($0) }
            .disposed(by: bag)
        
        subject1.onNext("2")
        subject.accept(subject2)
        subject1.onNext("3")
        subject2.onNext("B")
        
        subject1.onCompleted()    //只有subject1结束,才能接收subject2发出的数据
    }

运行结果:

next(1)
next(2)
next(3)
next(B)

5. scan

scan 提供了一个初始化值,然后使用闭包不断将前一个元素和后一个元素进行处理,并将处理结果作为单个元素的Observable序列返回。

img
    func scanOperate() {
        let ofSequence = Observable.of(0, 1, 2, 3, 4, 5)
        ofSequence
            .scan(0, accumulator: { acum, elem in
                acum + elem
            })
            .subscribe {
                print($0)
            }.disposed(by: bag)
    }

运行结果:

next(0)
next(1)
next(3)
next(6)
next(10)
next(15)
completed

过滤操作符

从源 Observable 中选择特定的数据发送

1. filter

从源 Observable 中过滤出只符合要求的事件

img
    func filterOperate() {
        let ofSequence = Observable.of(1,2,3,4,5)
        ofSequence.filter { $0 > 3 }
            .subscribe { event in
                print(event)
            }
            .disposed(by: bag)
    }

运行结果:

next(4)
next(5)
completed

2. take

仅发送 Observable 的前 n 个事件,在满足数量后会自动发送 completed

    func takeOperate() {
        let ofSequence = Observable.of(1,2,3,4,5)
        ofSequence.take(2)
            .subscribe { event in
                print(event)
            }
            .disposed(by: bag)
    }

运行结果:

next(1)
next(2)
completed

takeLatesttake用法类似,仅发送 Observable 的后 n 个事件

skip,跳过源Observable 发出的前 n 个事件

3. distinctUntilChanged

过滤掉连续 重复的事件

img
    func distinctdOperate() {
        let ofSequence = Observable.of(1,2,3,3,4,1,5)
        ofSequence.distinctUntilChanged()
            .subscribe { event in
                print(event)
            }
            .disposed(by: bag)
    }

运行结果:

next(1)
next(2)
next(3)
next(4)
next(1)
next(5)
completed

4. single

限制只发出一个事件。如果存在多个事件或没有事件,会发出一个error事件。如果只存在一个事件,不会发出 error 事件。

    func singleOperate() {
        let ofSequence = Observable.of(1,2,3,4,5)
        ofSequence.single()
            .subscribe { event in
                print(event)
            }
            .disposed(by: bag)
    }

运行结果:

next(1)
error(Sequence contains more than one element.)

结合操作符

将多个Observable序列进行组合,拼接成一个新的Observable序列

1. zip

将多个Observable序列压缩成一个Observable序列,它会等到每个Observable事件一一对齐后再合并

img
    func zipOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<String>()
        
        Observable.zip(subject1, subject2) {
            "\($0) \($1)"
            }
            .subscribe { event in
                print(event)
            }
            .disposed(by: bag)
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onNext("A")
        subject1.onNext(3)
        subject2.onNext("B")
        subject1.onNext(4)
        subject2.onNext("C")
    }

运行结果:

next(1 A)
next(2 B)
next(3 C)
  1. 为了能够产生结果,两个序列中都必须保证至少有一个元素
  2. zip经常用于整合多个网络请求上,比如同时发送两个网络请求,只有当两个请求成功后才往下进行处理。

2. combineLatest

将多个Observable序列压缩成一个Observable序列,与zip区别在于:任意一个Observable发出新的事件,会将两个Observable最新的事件进行合并。

img
    func combineLatestOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<String>()
        
        Observable.combineLatest(subject1, subject2) {
            "\($0) \($1)"
            }
            .subscribe { event in
                print(event)
            }
            .disposed(by: bag)
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onNext("A")
        subject1.onNext(3)
        subject2.onNext("B")
        subject1.onNext(4)
        subject2.onNext("C")
    }

运行结果:

next(2 A)
next(3 A)
next(3 B)
next(4 B)
next(4 C)
    //超过2个参数版本
    func combineLatest3Operate() {
        let intOb1 = Observable.just(2)
        let intOb2 = Observable.of(0, 1, 2, 3)
        let intOb3 = Observable.of(0, 1, 2, 3, 4)
        
        Observable.combineLatest(intOb1, intOb2, intOb3) {
            return ($0 + $1) * $2
            }
            .subscribe {
                print($0)
            }.disposed(by: bag)
    }

运行结果:

next(0)
next(0)
next(3)
next(4)
next(8)
next(10)
next(15)
next(20)
completed

3. startWith

在Observable序列的开头增加一些数据,即在序列发出事件之前,先发出预先插入的事件消息

    func startWithOperate() {
        let ofSequence = Observable.of(3,4,5)
        ofSequence.startWith(2)
            .startWith(1)
            .startWith(11,12,13)
            .subscribe { event in
                print(event)
            }
            .disposed(by: bag)
    }

运行结果:

next(11)
next(12)
next(13)
next(1)
next(2)
next(3)
next(4)
next(5)
completed

4. withLatestFrom

将两个 Observable 序列合并为一个。每当self 队列发出一个事件时,从第二个序列中取出最新的一个值。

    func withLatestFromOperate() {
        let subject1 = PublishSubject<String>()
        let subject2 = PublishSubject<String>()
        
        subject1
            .withLatestFrom(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext("A")
        subject2.onNext("1")
        subject1.onNext("B")
        subject1.onNext("C")
        subject2.onNext("2")
        subject1.onNext("D")
    }

运行结果:

next(1)
next(1)
next(2)

5. switchLatest

可以更改订阅新的Observable序列,同时取消对之前Observable序列订阅

img
    func switchLatestOperate() {
        let subject1 = BehaviorSubject(value: "1")
        let subject2 = BehaviorSubject(value: "A")
        
        let subject = BehaviorRelay(value: subject1)
        subject.switchLatest()
            .subscribe { print($0) }
            .disposed(by: bag)
        
        subject1.onNext("2")
        
        //改变事件源
        subject.accept(subject2)
        subject1.onNext("3")
        subject2.onNext("B")
    }

运行结果:

next(1)
next(2)
next(A)
next(B)

6. merge

将多个 Observable 合并成一个Observable,按顺序依次发出事件。

img
    func mergeOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        Observable.of(subject1, subject2)
            .merge()
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext(10)
        subject2.onNext(20)
        subject1.onNext(30)
        subject1.onNext(40)
        subject2.onNext(50)
        subject1.onNext(60)
    }

运行结果:

next(10)
next(20)
next(30)
next(40)
next(50)
next(60)

条件和布尔操作符

1 .amb

当传入多个Observable到amb操作符时,它将取出第一个发出事件的Observable,然后只发出它的事件,并忽略其他事件。

    func ambOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        let subject3 = PublishSubject<Int>()
        
        subject1.amb(subject2)
            .amb(subject3)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject3.onNext(30)
        subject2.onNext(20)
        subject1.onNext(10)
        subject1.onNext(11)
        subject1.onNext(12)
        subject2.onNext(21)
        subject3.onNext(31)
    }

运行结果:

next(30)
next(31)

2 .takeWhile

判断Observable发出的事件中的元素是否满足条件,当出现一个不满足条件的事件时,便发出 completed 事件

img
    func takeWhileOperate() {
        let ofSequence = Observable.of(1,2,3,4,5)
        ofSequence.takeWhile {$0 < 3}
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
    }

运行结果:

next(1)
next(2)
completed

3 .takeUntil

当监听到第二个Observable发出 next 事件时,第一个Observable便会发出 completed 事件。

当监听到第二个Observable发出 completed 事件时,第一个Observable不受影响,正常发送事件。

当监听到第二个Observable发出 error 事件时,Observable序列终止。

    func takeUntilOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        subject1.takeUntil(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onNext(20)
        subject1.onNext(3)
    }

运行结果:

next(1)
next(2)
completed
    func takeUntilOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        subject1.takeUntil(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onCompleted()
        subject2.onNext(20)
        subject1.onNext(3)
        subject1.onNext(4)
    }

运行结果:

next(1)
next(2)
next(3)
next(4)
    func takeUntilOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        subject1.takeUntil(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onError(BaseError.ABC)
        subject2.onNext(20)
        subject1.onNext(3)
        subject1.onNext(4)
    }

运行结果:

next(1)
next(2)
error(ABC)

4 .skipWhile

判断Observable发出的事件中的元素是否满足条件,跳过满足条件直到出现一个不满足条件的事件时,开始接收事件

    func skipWhileOperate() {
        let ofSequence = Observable.of(1,2,3,4,5)
        ofSequence.skipWhile {$0 < 3}
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
    }

运行结果:

next(3)
next(4)
next(5)
completed

5 .skipUntil

当监听到第二个Observable发出 next 事件时,第一个Observable开始接收事件。

当监听到第二个Observable发出 completed 事件时,第一个Observable保持原状态。

当监听到第二个Observable发出 error 事件时,只接收到第二个Observable的 error 事件。

    func skipUntilOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        subject1.skipUntil(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onNext(20)
        subject1.onNext(3)
        subject1.onNext(4)
    }

运行结果:

next(3)
next(4)
    func skipUntilOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        subject1.skipUntil(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onCompleted()
        subject1.onNext(3)
        subject1.onNext(4)
    }

运行结果:


    func skipUntilOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        
        subject1.skipUntil(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        
        subject1.onNext(1)
        subject1.onNext(2)
        subject2.onError(BaseError.ABC)
        subject1.onNext(3)
        subject1.onNext(4)
    }

运行结果:

error(ABC)

算数、聚合操作符

1. toArray

Observable里面的值转成一个数组,并作为单一的事件发送出去。

    func toArrayOperate() {
        let ofSequence = Observable.of(1,2,3)
        ofSequence.toArray()
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
    }

运行结果:

next([1, 2, 3])
completed

2. reduce

将给的初始值和序列里面的每个值进行累计运算,得到最终的结果,并将结果发送出去。

img
    func reduceOperate() {
        let ofSequence = Observable.of(1,2,3)
        ofSequence.reduce(12, accumulator: +)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
    }

运行结果:

next(18)
completed

3. concat

合并两个或者以上的Observable的消息,只有当前面的Observable发出completed事件,才会开始发送下一个Observable序列事件。

img
    func concatOperate() {
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()

        subject1.concat(subject2)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        subject1.onNext(1)
        subject2.onNext(20)
        subject1.onNext(2)
        subject1.onNext(3)
        subject1.onCompleted()
        subject2.onNext(21)
    }

运行结果:

next(1)
next(2)
next(3)
next(21)

错误处理操作符

错误处理操作符可以用来帮助我们从 Observable 发出的 error 事件做出响应,或者从错误中恢复。

1 .catchErrorJustReturn

当遇到错误事件时,返回指定的值,然后结束,发出completed事件。

    func catchErrorJustReturnOperate() {
        let subject = PublishSubject<Int>()
        
        subject.catchErrorJustReturn(0)
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        subject.onNext(1)
        subject.onNext(2)
        subject.onError(BaseError.BCD)
        subject.onNext(3)
    }

运行结果:

next(1)
next(2)
next(0)
completed

2 .catchError

当遇到错误事件时,可以返回另一个Observable序列进行订阅

img
    func catchErrorOperate() {
        let subject = PublishSubject<Int>()
        let sequence = Observable.of(100,101,102)

        subject.catchError {_ in
            return sequence
            }
            .subscribe { event in
                print(event)
            }.disposed(by: bag)
        subject.onNext(1)
        subject.onNext(2)
        subject.onError(BaseError.BCD)
        subject.onNext(3)
    }

运行结果:

next(1)
next(2)
next(100)
next(101)
next(102)
completed

3 .retry

当遇到错误事件时,重新订阅该序列。retry()方法传入数字表示重复次数,不传只重复一次

    func retryOperate() {
        var count = 1
        let sequence = Observable<Int>.create { observer in
            let error = NSError(domain: "Test", code: 0, userInfo: nil)
            observer.onNext(0)
            observer.onNext(1)
            observer.onNext(2)
            if count < 2 {
                observer.onError(error)
                count += 1
            }
            observer.onNext(3)
            observer.onNext(4)
            observer.onNext(5)
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequence
            .retry()
            .subscribe {
                print($0)
            }.disposed(by: bag)
    }

运行结果:

next(0)
next(1)
next(2)
next(0)
next(1)
next(2)
next(3)
next(4)
next(5)
completed

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容