RxSwift(五)操作符的使用

在RxSwift中,有许多实用的操作符,也可以称之为高阶函数,可以帮助我们创建特定功能的序列,或者组合变换原有的序列,生成一个新的序列。这里记录一下,可以作为手册以供快速查看。

组合操作符

  • merge
    将多个序列合并成一个新序列,当这多个序列中某一个序列发出一个元素时,新序列就将这个元素发出,当某一个序列发出error时,新序列也发出error,并终止序列。
        let subject1 = PublishSubject<String>()
        let subject2 = PublishSubject<String>()
        Observable.of(subject1, subject2)
            .merge()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        subject1.onNext("R")
        subject1.onNext("x")
        subject2.onNext("S")
        subject2.onNext("w")
        subject1.onNext("i")
        subject2.onNext("f")
        subject1.onNext("t")

打印:

R
x
S
w
i
f
t
  • zip
    将多个(最多不超过8个) 序列的元素组合压缩,而且是等到每个序列元素事件一一对应地凑齐之后再合并,然后将合并的结果元素发出来。组合时,严格按照序列的索引数进行组合。即返回序列第一个元素,是由每一个源序列的第一个元素组合出来,它的第二个元素 ,是由每一个源序列的第二个元素组合出来的,以此类推。它的元素数量等于源序列中元素数量最少的那个序列。
        let stringSubject = PublishSubject<String>()
        let intSubject = PublishSubject<Int>()

        Observable.zip(stringSubject, intSubject) { stringElement, intElement in
                "\(stringElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSubject.onNext("R")
        stringSubject.onNext("x")   // 到这里存储了 R X, 但是不会响应
        intSubject.onNext(1)    // 与R 组合成一个新序列,元素:R 1
        intSubject.onNext(2)    // 与X 组合成一个新序列,元素:X 2
        stringSubject.onNext("Swift")   // 保存Swift
        intSubject.onNext(3)   // 与Swift 组合成一个新序列,元素:Swift 3
        // 划重点: 只有两个序列同时有值的时候才会响应,否则存值

打印:

R 1
x 2
Swift 3
  • combineLatest
    将多个序列中最新的元素组合起来,然后将这个组合的结果发出来。这些源序列中任何一个发出一个元素,他都会发出一个元素(前提是,这些序列曾经发出过元素)。应用:同时满足条件下触发业务,如登录账号、密码合法性校验。
        let stringSub = PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        Observable.combineLatest(stringSub, intSub) { strElement, intElement in
                "\(strElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSub.onNext("R")   // 保存R
        stringSub.onNext("x")   // 保存x覆盖了R,和zip不一样
        intSub.onNext(1)        // 这时两个序列都有元素,响应 x 1
        intSub.onNext(2)        // 保存1覆盖了2,都有值, 响应 x 2
        stringSub.onNext("Swift") // 保存Swift覆盖了x, 都有值,响应 Swift 2

打印:

x 1
x 2
Swift 2
  • switchLatest
    将序列发出的元素转换成一个新序列,并从新序列发出元素。
        let switchLatestSub1 = BehaviorSubject(value: "R")
        let switchLatestSub2 = BehaviorSubject(value: "1")
        // 选择了 switchLatestSub1 就不会监听 switchLatestSub2
        let switchLatestSub  = BehaviorSubject(value: switchLatestSub1)  
        
        switchLatestSub.asObservable()
            .switchLatest()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        switchLatestSub1.onNext("x")
        switchLatestSub1.onNext("_")
        switchLatestSub2.onNext("2")
        switchLatestSub2.onNext("3")  // 2-3都会不会监听,保存的元素由2覆盖1,3覆盖2
        switchLatestSub.onNext(switchLatestSub2)   // 切换到 switchLatestSub2
        switchLatestSub1.onNext("*")
        switchLatestSub1.onNext("Swift") // 原理同上,最后保存的元素为Swift
        switchLatestSub2.onNext("4")
        switchLatestSub.onNext(switchLatestSub1) // 切换到 switchLatestSub1

打印:

R
x
_
3
4
Swift
  • concat
    将多个序列按顺序串联起来,只有当上一个序列发出了completed事件,才会开始发送下一个序列事件。
        let concatSub1 = BehaviorSubject(value: "1")
        let concatSub2 = BehaviorSubject(value: "A")
        let concatSub = BehaviorSubject(value: concatSub1)
        
        concatSub.asObservable()
            .concat()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        concatSub1.onNext("2")
        concatSub.onNext(concatSub2)    // 切换到 concatSub2
        concatSub2.onNext("B")
        concatSub2.onNext("C")    // B-C都不会监听,但是默认保存由 B覆盖A C覆盖B
        concatSub1.onCompleted()    // 结束concatSub1
        concatSub2.onNext("D")

打印:

1
2
C
D
  • startWith
    在发出序列的事件元素之前,会先发出这些预先插入的事件元素。
        Observable.of("1", "2", "3", "4")
            .startWith("A")
            .startWith("B")
            .startWith("C", "a", "b")
            .subscribe(onNext: { print($0) })
            .disposed(by: DisposeBag())

打印:

C
a
b
B
A
1
2
3
4
  • withLatestFrom
    将两个序列最新的元素组合起来,当第一个序列发出一个元素,就将组合后的元素发送出来。
        let withLatestFromStrSub = PublishSubject<String>()
        let withLatestFromIntSub = PublishSubject<Int>()
        // 当withLatestFromStrSub发出一个元素时,
        // 就立即取出withLatestFromIntSub最新的元素,
        // 将withLatestFromStrSub 中最新的元素strElement和withLatestFromIntSub最新的元素intElement组合,
        //然后把组合结果 strElement+intElement发送出去
        withLatestFromStrSub.withLatestFrom(withLatestFromIntSub) { strElement, intElement in
            "\(strElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        // withLatestFromStrSub发出一个元素时,
        // 立即取出withLatestFromIntSub最新的元素,
        // 然后把withLatestFromIntSub中最新的元素发送出去
//        withLatestFromStrSub.withLatestFrom(withLatestFromIntSub)
//            .subscribe(onNext: { print($0) })
//            .disposed(by: disposeBag)
        withLatestFromStrSub.onNext("R")
        withLatestFromStrSub.onNext("x") // 存了x覆盖R
        withLatestFromIntSub.onNext(1)
        withLatestFromIntSub.onNext(2)      // 存了2覆盖1
        withLatestFromStrSub.onNext("Swift") // 存了Swift覆盖x,withLatestFromIntSub有最新元素2,发出组合序列事件
        withLatestFromStrSub.onNext("Hello") // 存了Hello覆盖Swift,withLatestFromIntSub有最新元素2,发出组合序列事件

打印:

Swift 2
Hello 2
2
2

转换操作符

  • map
    通过传入一个闭包把原序列转变为一个新序列,map函数会将原序列的所有元素进行转换。
        let ob = Observable.of(1,2,3,4)
        ob.map { (number) -> Int in
            return number+2
            }
            .subscribe{
                print("\($0)")
            }
            .disposed(by: disposeBag)

打印:

next(3)
next(4)
next(5)
next(6)
completed
  • flatMap
    升维:flatMap 操作符会对源序列的每一个元素应用一个转换方法,将他们转换成 Observables,降维:然后将这些 Observables 的元素合并成一个序列之后再发送出来。
        let strSub = BehaviorSubject(value: "A")
        let intSub = BehaviorSubject(value: "1")
        let flatSub = BehaviorSubject(value: strSub)
        
        flatSub.asObservable()
            .flatMap { $0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        strSub.onNext("B")
        flatSub.onNext(intSub)  // 加入intSub序列,发出intSub和strSub序列元素转化后的序列
        intSub.onNext("2")
        strSub.onNext("C")
        intSub.onNext("3")

打印:

A
B
1
2
C
3
  • flatMapLatest
    将序列元素转换成 Observables,然后取这些 Observables 中最新的一个发送。若转换出一个新的 Observables,就只发出它的元素,旧的 Observables 的元素将被忽略掉。
        let strSub = BehaviorSubject(value: "A")
        let intSub = BehaviorSubject(value: "1")
        let flatSub = BehaviorSubject(value: strSub)
        
        flatSub.asObservable()
            .flatMapLatest { $0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        strSub.onNext("B")
        // 加入intSub序列,发出intSub元素转化后的序列,忽略strSub的转化序列
        flatSub.onNext(intSub)  
        intSub.onNext("2")
        strSub.onNext("C")
        intSub.onNext("3")

打印:

A
B
1
2
3

flatMapLatest实际上是mapswitchLatest操作符的组合。
相对的有flatMapFirstflatMapFirst只会接收最初加入序列的元素事件。

  • concatMap
    将源序列的每一个元素转换,转化成一个 Observables。然后将这些Observables 按顺序发出元素。与flatMap 的区别是:当前一个序列的元素发送完毕后,下一个 Observable 才可以开始发出元素。
        let concatMapSubject1 = BehaviorSubject(value: "A")
        let concatMapSubject2 = BehaviorSubject(value: "1")
        let concatMapSubject = BehaviorSubject(value: concatMapSubject1)
        
        concatMapSubject.asObservable()
            .concatMap() { $0 }
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        concatMapSubject1.onNext("B")
        concatMapSubject1.onNext("C")
        concatMapSubject.onNext(concatMapSubject2)
        concatMapSubject2.onNext("2")
        concatMapSubject2.onNext("3")   // 保存3覆盖2
        // concatMapSubject1结束,开始订阅concatMapSubject2的元素
        concatMapSubject1.onCompleted() 
        concatMapSubject2.onNext("4")

打印:

next(A)
next(B)
next(C)
next(3)
next(4)
  • scan
    对第一个元素和传入的初始参数使用传入的闭包运算,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的闭包运算中,创建第二个元素。以此类推,直到遍历完全部的元素。也就是scan 先给一个初始化的参数数,然后不断的拿前一个结果和最新的元素进行运算。
        Observable.of(10, 100, 1000)
            .scan(2) { aggregateValue, newValue in
                aggregateValue + newValue // 10 + 2, 100 + 12, 1000 + 112
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

12
112
1112
  • reduce
    将对第一个元素使用传入的闭包运算。然后,将结果作为参数填入到第二个元素的闭包运算中。以此类推,直到遍历完全部的元素后发出最终结果。
        Observable.of(10, 100, 1000)
            .reduce(2) { aggregateValue, newValue in
                aggregateValue + newValue // 1000 + 100 + 10 + 2
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

1112
  • toArray
    将一个序列转成一个数组,并作为一个单一的事件发送,然后结束。
        Observable.of(1, 2, 3)
            .toArray()
            .subscribe({ print($0) })
            .disposed(by: disposeBag)

打印:

success([1, 2, 3])

过滤操作符

  • filter
    用来过滤掉不符合条件的事件,仅仅发出序列中通过判定的元素。
        Observable.of(1,2,3,4,5,6,7,8,9,0)
            .filter { $0 % 2 == 0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

2
4
6
8
0
  • single
    只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
        Observable.of("Rx", "Swift")
            .single()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

Rx
Unhandled error happened: Sequence contains more than one element.
 subscription called from:
        Observable.of("Rx", "Swift")
            .single { $0 == "Swift" }
            .subscribe { print($0) }
            .disposed(by: disposeBag)

打印:

next(Swift)
completed
  • take
    只发出头n个元素,忽略掉后面的元素,直接结束序列。
        print("*****take*****")
        Observable.of("A", "B","C", "D")
            .take(2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

A
B
  • takeLast
    仅发送序列的后n个元素,忽略前面的元素。
        Observable.of("R", "x","S", "wift")
            .takeLast(3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

x
S
wift
  • takeWhile
    依次判断序列的每一个值是否满足给定的条件, 当第一个不满足条件的值出现时,序列便自动结束。
        Observable.of(1, 2, 3, 4, 2, 6)
            .takeWhile { $0 < 3 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

1
2
  • takeUntil
    监听源Observable,同时监听一个条件序列。若条件序列发出一个元素或者产生一个终止事件,那么源Observable 将自动完成,停止发送事件。
        let sourceSequence = PublishSubject<String>()
        let referenceSequence = PublishSubject<String>()
        
        sourceSequence
            .takeUntil(referenceSequence)
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sourceSequence.onNext("A")
        sourceSequence.onNext("B")
        sourceSequence.onNext("C")

        referenceSequence.onNext("0") // 条件序列订阅发出,源序列就结束
        
        sourceSequence.onNext("1")
        sourceSequence.onNext("2")
        sourceSequence.onNext("3")

打印:

next(A)
next(B)
next(C)
completed
  • skip
    用于跳过序列发出的前n 个元素事件。
        Observable.of(1, 2, 3, 4, 5, 6)
            .skip(2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

3
4
5
6
  • skipWhile
    忽略源序列中头几个满足条件的事件。
        Observable.of(1, 7, 3, 4, 5, 6)
            .skipWhile { $0 < 4 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

7
3
4
5
6
  • skipUntil
    与takeUntil类似,监听源Observable,同时监听一个条件序列。跳过Observable 中头几个元素,直到条件序列发出一个元素。
        let sourceSeq = PublishSubject<String>()
        let referenceSeq = PublishSubject<String>()
        
        sourceSeq.skipUntil(referenceSeq)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        // 没有条件序列事件,源序列无法订阅
        sourceSeq.onNext("A")
        sourceSeq.onNext("B")
        sourceSeq.onNext("C")
        
        referenceSeq.onNext("0") // 订阅条件序列,源序列就开始订阅
        
        sourceSeq.onNext("1")
        sourceSeq.onNext("2")
        sourceSeq.onNext("3")

打印:

1
2
3
  • elementAt
    只发出出来中的第 n 个元素,即是只处理指定位置的元素事件。
        Observable.of("R", "X", "S", "wift")
            .elementAt(3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

wift
  • distinctUntilChanged
    用于过滤掉连续重复的事件。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。不连续的相同元素不受影响。
        Observable.of("1", "2", "2", "2", "3", "3", "4")
            .distinctUntilChanged()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

1
2
3
4
  • amb
    amb操作符在多个源 Observables中, 取第一个发出元素或产生事件的 Observable,这个事件可以是一个 next,error 或者 completed事件,然后就只发出这个Observable的元素事件,忽略掉其他的Observables。
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        let subject3 = PublishSubject<Int>()
        
        subject1
            .amb(subject2)
            .amb(subject3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        subject2.onNext(2)  // 先发出事件,之后就只发出subject2的事件
        subject1.onNext(1)
        subject2.onNext(2)
        subject1.onNext(1)
        subject3.onNext(3)
        subject2.onNext(2)
        subject1.onNext(1)
        subject3.onNext(3)
        subject3.onNext(3)

打印:

2
2
2

Debug 操作符

  • debug
    打印所有的订阅,事件以及销毁信息。
    打开RxSwift的Debug Mode方式:在 podfile 的末端添加如下代码,重新 pod install 一下。
post_install do |installer|
  installer.pods_project.targets.each do |target|
    if target.name == 'RxSwift'
      target.build_configurations.each do |config|
        if config.name == 'Debug'
          config.build_settings['OTHER_SWIFT_FLAGS'] ||= ['-D', 'TRACE_RESOURCES']
        end
      end
    end
  end
end
        var count = 1
        let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("A")
            observer.onNext("B")
            observer.onNext("C")
            
            if count < 3 {
                observer.onError(error)
                print("onError")
                count += 1
            }
            
            observer.onNext("D")
            observer.onNext("E")
            observer.onNext("F")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .debug()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

2019-08-27 17:44:45.792: ViewController.swift:572 (testDebug()) -> subscribed
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(A)
A
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(B)
B
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(C)
C
2019-08-27 17:44:45.842: ViewController.swift:572 (testDebug()) -> Event error(Error Domain=com.xxx Code=10000 "(null)")
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
 subscription called from:

2019-08-27 17:44:45.845: ViewController.swift:572 (testDebug()) -> isDisposed
onError
  • RxSwift.Resources.total
    提供所有Rx资源分配的计数,在开发期间检测泄漏非常有用。
        print(RxSwift.Resources.total)
        let subject = BehaviorSubject(value: "RxSwift")

        let subscription1 = subject.subscribe(onNext: { print($0) })
        print(RxSwift.Resources.total)

        let subscription2 = subject.subscribe(onNext: { print($0) })
        print(RxSwift.Resources.total)

        subscription1.dispose()
        print(RxSwift.Resources.total)

        subscription2.dispose()
        print(RxSwift.Resources.total)

打印:

4
RxSwift
11
RxSwift
14
12
10

连接操作符

  • publish
    将序列转换为可被连接的序列可被连接的序列 在被订阅后不会发出元素,直到 connect操作符被应用为止。这样你就可以控制序列在什么时候开始发出元素。
        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
        
        interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") })
            .disposed(by: disposeBag)
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            // 在connect之前的订阅不会发出元素,直到connect发出后
            print("do connect")
            _ = interval.connect()
        }
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
            interval.subscribe(onNext: { print("订阅: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
            self.disposeBag = DisposeBag()
        }

打印:

订阅: 1, 事件: 0
订阅: 2, 事件: 0
订阅: 1, 事件: 1
订阅: 2, 事件: 1
订阅: 1, 事件: 2
订阅: 2, 事件: 2
订阅: 3, 事件: 2
订阅: 1, 事件: 3
订阅: 2, 事件: 3
订阅: 3, 事件: 3
  • replay
    将序列转换为可被连接的序列,并且这个可被连接的序列将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。
        
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("A")
            observer.onNext("B")
            observer.onNext("C")
            observer.onNext("D")
            observer.onNext("E")
            observer.onNext("F")
            return Disposables.create()
        }
        .replay(5)  // 缓存5个元素
        
        sequenceThatErrors
            .subscribe(onNext: {
                print("before connect subscribe \($0)")
            })
            .disposed(by: disposeBag)

        // 在connect之前的订阅不会发出元素,直到connect发出后
        print("do connect")
        _ = sequenceThatErrors.connect()
        
        sequenceThatErrors
            .subscribe(onNext: {
                print("after connect first subscribe \($0)")
            })
            .disposed(by: disposeBag)
        sequenceThatErrors
            .subscribe(onNext: {
                print("after connect second subscribe \($0)")
                })
            .disposed(by: disposeBag)

打印:

do connect
before connect subscribe A
before connect subscribe B
before connect subscribe C
before connect subscribe D
before connect subscribe E
before connect subscribe F
after connect first subscribe B
after connect first subscribe C
after connect first subscribe D
after connect first subscribe E
after connect first subscribe F
after connect second subscribe B
after connect second subscribe C
after connect second subscribe D
after connect second subscribe E
after connect second subscribe F
  • multicast
    将序列转换为可被连接的序列,同时传入一个 Subject,每当序列发送事件时都会触发这个 Subject的发送。
        let subject = PublishSubject<Any>()
        subject.subscribe{print("subject:\($0)")}
            .disposed(by: disposeBag)
        
        let netOB = Observable<Any>.create { (observer) -> Disposable in
                sleep(2)// 模拟网络延迟
                print("开始请求网络")
                observer.onNext("请求到的网络数据")
                observer.onNext("请求到的本地")
                observer.onCompleted()
                return Disposables.create {
                    print("销毁")
                }
            }.multicast(subject)
        
        netOB.subscribe(onNext: { (anything) in
                print("订阅1:",anything)
            })
            .disposed(by: disposeBag)
        
        netOB.subscribe(onNext: { (anything) in
                print("订阅2:",anything)
            })
            .disposed(by: disposeBag)
        
        _ = netOB.connect()

打印:

subject:next(请求到的网络数据)
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
subject:next(请求到的本地)
订阅1: 请求到的本地
订阅2: 请求到的本地
subject:completed
销毁

错误恢复操作符

  • timeout
    规定时间内没有产生元素就产生一个超时的 error事件
        let times = [
            [ "value": 1, "time": 1 ],
            [ "value": 2, "time": 2 ],
            [ "value": 3, "time": 4.1 ]
        ]
        let timeOutOb = Observable.from(times)
            .flatMap({ (anyTime) in
                return Observable.of(anyTime["value"]!)
                    .delaySubscription(Double(anyTime["time"]!), scheduler: MainScheduler.instance)
            })
            .timeout(2, scheduler: MainScheduler.instance)
        timeOutOb.subscribe(onNext: { (value) in
            print("onNext:", value)
        }, onError: { (error) in
            print("error:", error)
        })
        .disposed(by: disposeBag)

打印:

onNext: 1.0
onNext: 2.0
error: Sequence timeout.
  • catchErrorJustReturn
    如果产生错误,返回一个可观察序列,该序列发出单个元素,然后终止。
        let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchErrorJustReturn("AA")
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("BB")
        sequenceThatFails.onNext("CC")  // 正常序列发送
        sequenceThatFails.onError(error)   // 发送失败,返回设定的错误的预案

打印:

next(BB)
next(CC)
next(AA)
completed
  • catchError
    拦截一个 error事件,将它替换成其他序列,然后传递给观察者,订阅新的序列。
        let recoverySequence = PublishSubject<String>()
        
        sequenceThatFails
            .catchError {
                print("Error:", $0)
                return recoverySequence  // 截获到了错误序列,,返回一个新序列
            }
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("AA")
        sequenceThatFails.onNext("BB") // 正常序列发送成功的
        sequenceThatFails.onError(error) // 发送失败的序列
        
        recoverySequence.onNext("CC")   // 替换的序列发送信号
        sequenceThatFails.onNext("DD")  // 原序列已结束,不能放松信号
        recoverySequence.onNext("EE")
next(AA)
next(BB)
Error: Error Domain=com.xxx Code=10000 "(null)"
next(CC)
next(EE)
  • retry
    如果源序列产生一个错误事件,重新对它进行订阅,让它有机会不产生 error 事件。即便源序列产生了一个 error事件,retry总是对观察者发出 next 事件,所以这样可能会产生重复的元素。
        var count = 1 // 外界变量控制流程
        let sequenceRetryErrors = Observable<String>.create { observer in
            observer.onNext("AA")
            observer.onNext("BB")
            observer.onNext("CC")
            
            if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
                observer.onError(error)  // 接收到了错误序列,重试序列发生
                print("onError")
                count += 1
            }
            
            observer.onNext("DD")
            observer.onNext("EE")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceRetryErrors
            .retry()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

AA
BB
CC
onError
AA
BB
CC
DD
EE

设置重新订阅次数(retry(_:))

        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("AA")
            observer.onNext("BB")
            observer.onNext("CC")
            
            observer.onError(error)
            observer.onNext("DD")
            observer.onNext("EE")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3)  // 重新订阅3次
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
    }

打印:

AA
BB
CC
AA
BB
CC
AA
BB
CC
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
 subscription called from:

总结:
篇幅较长,想快速了解请直接看这里。

  1. 组合操作符

    • merge
      将多个序列合并成一个新序列,当这多个序列中某一个序列发出一个元素时,新序列就将这个元素发出,当某一个序列发出error时,新序列也发出error,并终止序列。
    • zip
      将多个(最多不超过8个) 序列的元素组合压缩,而且是等到每个序列元素事件一一对应地凑齐之后再合并,然后将合并的结果元素发出来。
    • combineLatest
      将多个序列中最新的元素组合起来,然后将这个组合的结果发出来。
    • switchLatest
      将序列发出的元素转换成一个新序列,并从新序列发出元素。
    • concat
      将多个序列按顺序串联起来,只有当上一个序列发出了completed事件,才会开始发送下一个序列事件。
    • startWith
      在发出序列的事件元素之前,会先发出这些预先插入的事件元素。
    • withLatestFrom
      将两个序列最新的元素组合起来,当第一个序列发出一个元素,就将组合后的元素发送出来。
  2. 转换操作符

    • map
      通过传入的闭包把原序列转变为一个新序列,map函数会将原序列的所有元素进行转换。
    • flatMap
      升维:对源序列的每一个元素应用转换方法,将他们转换成 Observables,降维:然后将这些 Observables 的元素合并成一个序列之后再发送出来。
    • flatMapLatest
      将序列元素转换成 Observables,然后取这些 Observables 中最新的一个发送。若转换出一个新的 Observables,就只发出它的元素,旧的 Observables 的元素将被忽略掉。
    • concatMap
      将源序列的每一个元素转换,转化成一个 Observables。然后将这些Observables 按顺序发出元素。与flatMap 的区别是:当前一个序列的元素发送完毕后,下一个 Observable 才可以开始发出元素。
    • scan
      遍历全部元素,对第一个元素和传入的初始参数使用传入的闭包运算,将结果作为第一个元素发出。然后不断的拿前一个结果和最新的元素进行运算,并发出结果。
    • reduce
      将对第一个元素使用传入的闭包运算。然后,将结果作为参数填入到第二个元素的闭包运算中。以此类推,直到遍历完全部的元素后发出最终结果
    • toArray
      将一个序列转成一个数组,并作为一个单一的事件发送,然后结束。
  3. 过滤操作符

    • filter
      用来过滤掉不符合条件的事件,仅仅发出序列中通过判定的元素。
    • single
      只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
    • take
      只发出头n个元素,忽略掉后面的元素,直接结束序列。
    • takeLast
      仅发送序列的后n个元素,忽略前面的元素。
    • takeWhile
      依次判断序列的每一个值是否满足给定的条件, 当第一个不满足条件的值出现时,序列便自动结束。
    • takeUntil
      监听源Observable,同时监听一个条件序列。若条件序列发出一个元素或者产生一个终止事件,那么源Observable 将自动完成,停止发送事件。
    • skip
      用于跳过序列发出的前n 个元素事件。
    • skipWhile
      忽略源序列中头几个满足条件的事件。
    • skipUntil
      监听源Observable,同时监听一个条件序列。跳过Observable 中头几个元素,直到条件序列发出一个元素。
    • elementAt
      只发出出来中的第 n 个元素,即是只处理指定位置的元素事件。
    • distinctUntilChanged
      用于过滤掉连续重复的事件。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。不连续的相同元素不受影响。
    • amb
      在多个源 Observables中, 取第一个产生事件的 Observable,忽略掉其他的Observables。
  4. Debug 操作符

    • debug
      打印所有的订阅,事件以及销毁信息。
    • RxSwift.Resources.total
      提供所有Rx资源分配的计数,在开发期间检测泄漏非常有用
  5. 连接操作符。

    • publish
      将序列转换为可被连接的序列可被连接的序列 在被订阅后不会发出元素,直到 connect 操作符被应用为止。
    • replay
      将序列转换为可被连接的序列,并且这个可被连接的序列将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。
    • multicast
      将序列转换为可被连接的序列,同时传入一个 Subject,每当序列发送事件时都会触发这个 Subject 的发送。
  6. 错误恢复操作符

    • timeout
      规定时间内没有产生元素就产生一个超时的 error 事件。
    • catchErrorJustReturn
      如果产生错误,返回一个可观察序列,该序列发出单个元素,然后终止。
    • catchError
      拦截 error 事件,将它替换成其他序列,然后传递给观察者,订阅新的序列。
    • retry
      如果源序列产生一个错误事件,重新对它进行订阅,让它有机会不产生 error 事件。可设置重新订阅次数,默认遇到一次错误事件重新订阅一次。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容