变换操作符
对原始的Observable序列进行一些转换
1. map
通过使用一个闭包将原来的Observable序列,转成一个新的Observable
func mapOperate() {
let ofSequence = Observable.of(1,2,3)
ofSequence.map { $0 * 2 }
.subscribe { print($0) }
.disposed(by: bag)
}
运行结果:
next(2)
next(4)
next(6)
completed
2. flatMap
将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 合并。
当 Observable
的元素本身拥有其他的 Observable
时,你可以将所有子 Observables
的元素合并之后在发出来,即将其 “拍扁”(降维)成一个 Observable。
func flatMapOperate() {
let subject1 = BehaviorRelay(value: "1")
let subject2 = BehaviorRelay(value: "A")
let subject = BehaviorRelay(value: subject1)
subject.flatMap { $0 }
.subscribe { print($0) }
.disposed(by: bag)
subject1.accept("2")
subject.accept(subject2)
subject1.accept("3")
subject2.accept("B")
}
运行结果:
next(1)
next(2)
next(A)
next(3)
next(B)
3. flatMapLatest
flatMapLatest
和 flatMap
的区别在于:flatMapLatest
只会从最近的序列中发出事件。
flatMapFirst
与 flatMapLatest
正好相反,flatMapLatest
只会从最初的序列中发出事件。下面的例子换成 flatMapFirst
,只会接收subject1发出的事件
func flatMapLatestOperate() {
let subject1 = BehaviorRelay(value: "1")
let subject2 = BehaviorRelay(value: "A")
let subject = BehaviorRelay(value: subject1)
subject.flatMapLatest { $0 }
.subscribe { print($0) }
.disposed(by: bag)
subject1.accept("2")
subject.accept(subject2)
subject1.accept("3") // 不会收到
subject2.accept("B")
}
运行结果:
next(1)
next(2)
next(A)
next(B)
4. concatMap
concatMap
和 flatMap
的区别:只有当前一个序列发出 completed
事件 (如果发出error
事件,所以序列都会终止),才对后一个序列进行订阅。
func concatMapOperate() {
let subject1 = BehaviorSubject(value: "1")
let subject2 = BehaviorSubject(value: "A")
let subject = BehaviorRelay(value: subject1)
subject.concatMap { $0 }
.subscribe { print($0) }
.disposed(by: bag)
subject1.onNext("2")
subject.accept(subject2)
subject1.onNext("3")
subject2.onNext("B")
subject1.onCompleted() //只有subject1结束,才能接收subject2发出的数据
}
运行结果:
next(1)
next(2)
next(3)
next(B)
5. scan
scan
提供了一个初始化值,然后使用闭包不断将前一个元素和后一个元素进行处理,并将处理结果作为单个元素的Observable序列返回。
func scanOperate() {
let ofSequence = Observable.of(0, 1, 2, 3, 4, 5)
ofSequence
.scan(0, accumulator: { acum, elem in
acum + elem
})
.subscribe {
print($0)
}.disposed(by: bag)
}
运行结果:
next(0)
next(1)
next(3)
next(6)
next(10)
next(15)
completed
过滤操作符
从源 Observable 中选择特定的数据发送
1. filter
从源 Observable 中过滤出只符合要求的事件
func filterOperate() {
let ofSequence = Observable.of(1,2,3,4,5)
ofSequence.filter { $0 > 3 }
.subscribe { event in
print(event)
}
.disposed(by: bag)
}
运行结果:
next(4)
next(5)
completed
2. take
仅发送 Observable 的前 n 个事件,在满足数量后会自动发送 completed
func takeOperate() {
let ofSequence = Observable.of(1,2,3,4,5)
ofSequence.take(2)
.subscribe { event in
print(event)
}
.disposed(by: bag)
}
运行结果:
next(1)
next(2)
completed
takeLatest
和take
用法类似,仅发送 Observable 的后 n 个事件
skip
,跳过源Observable 发出的前 n 个事件
3. distinctUntilChanged
过滤掉连续
重复的事件
func distinctdOperate() {
let ofSequence = Observable.of(1,2,3,3,4,1,5)
ofSequence.distinctUntilChanged()
.subscribe { event in
print(event)
}
.disposed(by: bag)
}
运行结果:
next(1)
next(2)
next(3)
next(4)
next(1)
next(5)
completed
4. single
限制只发出一个事件。如果存在多个事件或没有事件,会发出一个error
事件。如果只存在一个事件,不会发出 error
事件。
func singleOperate() {
let ofSequence = Observable.of(1,2,3,4,5)
ofSequence.single()
.subscribe { event in
print(event)
}
.disposed(by: bag)
}
运行结果:
next(1)
error(Sequence contains more than one element.)
结合操作符
将多个Observable序列进行组合,拼接成一个新的Observable序列
1. zip
将多个Observable序列压缩成一个Observable序列,它会等到每个Observable事件一一对齐后再合并
func zipOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<String>()
Observable.zip(subject1, subject2) {
"\($0) \($1)"
}
.subscribe { event in
print(event)
}
.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onNext("A")
subject1.onNext(3)
subject2.onNext("B")
subject1.onNext(4)
subject2.onNext("C")
}
运行结果:
next(1 A)
next(2 B)
next(3 C)
- 为了能够产生结果,两个序列中都必须保证至少有一个元素
- zip经常用于整合多个网络请求上,比如同时发送两个网络请求,只有当两个请求成功后才往下进行处理。
2. combineLatest
将多个Observable序列压缩成一个Observable序列,与zip区别在于:任意一个Observable发出新的事件,会将两个Observable最新的事件进行合并。
func combineLatestOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<String>()
Observable.combineLatest(subject1, subject2) {
"\($0) \($1)"
}
.subscribe { event in
print(event)
}
.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onNext("A")
subject1.onNext(3)
subject2.onNext("B")
subject1.onNext(4)
subject2.onNext("C")
}
运行结果:
next(2 A)
next(3 A)
next(3 B)
next(4 B)
next(4 C)
//超过2个参数版本
func combineLatest3Operate() {
let intOb1 = Observable.just(2)
let intOb2 = Observable.of(0, 1, 2, 3)
let intOb3 = Observable.of(0, 1, 2, 3, 4)
Observable.combineLatest(intOb1, intOb2, intOb3) {
return ($0 + $1) * $2
}
.subscribe {
print($0)
}.disposed(by: bag)
}
运行结果:
next(0)
next(0)
next(3)
next(4)
next(8)
next(10)
next(15)
next(20)
completed
3. startWith
在Observable序列的开头增加一些数据,即在序列发出事件之前,先发出预先插入的事件消息
func startWithOperate() {
let ofSequence = Observable.of(3,4,5)
ofSequence.startWith(2)
.startWith(1)
.startWith(11,12,13)
.subscribe { event in
print(event)
}
.disposed(by: bag)
}
运行结果:
next(11)
next(12)
next(13)
next(1)
next(2)
next(3)
next(4)
next(5)
completed
4. withLatestFrom
将两个 Observable 序列合并为一个。每当self 队列发出一个事件时,从第二个序列中取出最新的一个值。
func withLatestFromOperate() {
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
subject1
.withLatestFrom(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext("A")
subject2.onNext("1")
subject1.onNext("B")
subject1.onNext("C")
subject2.onNext("2")
subject1.onNext("D")
}
运行结果:
next(1)
next(1)
next(2)
5. switchLatest
可以更改订阅新的Observable序列,同时取消对之前Observable序列订阅
func switchLatestOperate() {
let subject1 = BehaviorSubject(value: "1")
let subject2 = BehaviorSubject(value: "A")
let subject = BehaviorRelay(value: subject1)
subject.switchLatest()
.subscribe { print($0) }
.disposed(by: bag)
subject1.onNext("2")
//改变事件源
subject.accept(subject2)
subject1.onNext("3")
subject2.onNext("B")
}
运行结果:
next(1)
next(2)
next(A)
next(B)
6. merge
将多个 Observable 合并成一个Observable,按顺序依次发出事件。
func mergeOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
Observable.of(subject1, subject2)
.merge()
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(10)
subject2.onNext(20)
subject1.onNext(30)
subject1.onNext(40)
subject2.onNext(50)
subject1.onNext(60)
}
运行结果:
next(10)
next(20)
next(30)
next(40)
next(50)
next(60)
条件和布尔操作符
1 .amb
当传入多个Observable到amb
操作符时,它将取出第一个发出事件的Observable,然后只发出它的事件,并忽略其他事件。
func ambOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let subject3 = PublishSubject<Int>()
subject1.amb(subject2)
.amb(subject3)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject3.onNext(30)
subject2.onNext(20)
subject1.onNext(10)
subject1.onNext(11)
subject1.onNext(12)
subject2.onNext(21)
subject3.onNext(31)
}
运行结果:
next(30)
next(31)
2 .takeWhile
判断Observable
发出的事件中的元素是否满足条件,当出现一个不满足条件的事件时,便发出 completed
事件
func takeWhileOperate() {
let ofSequence = Observable.of(1,2,3,4,5)
ofSequence.takeWhile {$0 < 3}
.subscribe { event in
print(event)
}.disposed(by: bag)
}
运行结果:
next(1)
next(2)
completed
3 .takeUntil
当监听到第二个Observable发出 next
事件时,第一个Observable便会发出 completed
事件。
当监听到第二个Observable发出 completed
事件时,第一个Observable不受影响,正常发送事件。
当监听到第二个Observable发出 error
事件时,Observable序列终止。
func takeUntilOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
subject1.takeUntil(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onNext(20)
subject1.onNext(3)
}
运行结果:
next(1)
next(2)
completed
func takeUntilOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
subject1.takeUntil(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onCompleted()
subject2.onNext(20)
subject1.onNext(3)
subject1.onNext(4)
}
运行结果:
next(1)
next(2)
next(3)
next(4)
func takeUntilOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
subject1.takeUntil(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onError(BaseError.ABC)
subject2.onNext(20)
subject1.onNext(3)
subject1.onNext(4)
}
运行结果:
next(1)
next(2)
error(ABC)
4 .skipWhile
判断Observable
发出的事件中的元素是否满足条件,跳过满足条件直到出现一个不满足条件的事件时,开始接收事件
func skipWhileOperate() {
let ofSequence = Observable.of(1,2,3,4,5)
ofSequence.skipWhile {$0 < 3}
.subscribe { event in
print(event)
}.disposed(by: bag)
}
运行结果:
next(3)
next(4)
next(5)
completed
5 .skipUntil
当监听到第二个Observable发出 next
事件时,第一个Observable开始接收事件。
当监听到第二个Observable发出 completed
事件时,第一个Observable保持原状态。
当监听到第二个Observable发出 error
事件时,只接收到第二个Observable的 error
事件。
func skipUntilOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
subject1.skipUntil(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onNext(20)
subject1.onNext(3)
subject1.onNext(4)
}
运行结果:
next(3)
next(4)
func skipUntilOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
subject1.skipUntil(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onCompleted()
subject1.onNext(3)
subject1.onNext(4)
}
运行结果:
func skipUntilOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
subject1.skipUntil(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onError(BaseError.ABC)
subject1.onNext(3)
subject1.onNext(4)
}
运行结果:
error(ABC)
算数、聚合操作符
1. toArray
将Observable
里面的值转成一个数组,并作为单一的事件发送出去。
func toArrayOperate() {
let ofSequence = Observable.of(1,2,3)
ofSequence.toArray()
.subscribe { event in
print(event)
}.disposed(by: bag)
}
运行结果:
next([1, 2, 3])
completed
2. reduce
将给的初始值和序列里面的每个值进行累计运算,得到最终的结果,并将结果发送出去。
func reduceOperate() {
let ofSequence = Observable.of(1,2,3)
ofSequence.reduce(12, accumulator: +)
.subscribe { event in
print(event)
}.disposed(by: bag)
}
运行结果:
next(18)
completed
3. concat
合并两个或者以上的Observable
的消息,只有当前面的Observable
发出completed
事件,才会开始发送下一个Observable
序列事件。
func concatOperate() {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
subject1.concat(subject2)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject1.onNext(1)
subject2.onNext(20)
subject1.onNext(2)
subject1.onNext(3)
subject1.onCompleted()
subject2.onNext(21)
}
运行结果:
next(1)
next(2)
next(3)
next(21)
错误处理操作符
错误处理操作符可以用来帮助我们从 Observable 发出的 error 事件做出响应,或者从错误中恢复。
1 .catchErrorJustReturn
当遇到错误事件时,返回指定的值,然后结束,发出completed
事件。
func catchErrorJustReturnOperate() {
let subject = PublishSubject<Int>()
subject.catchErrorJustReturn(0)
.subscribe { event in
print(event)
}.disposed(by: bag)
subject.onNext(1)
subject.onNext(2)
subject.onError(BaseError.BCD)
subject.onNext(3)
}
运行结果:
next(1)
next(2)
next(0)
completed
2 .catchError
当遇到错误事件时,可以返回另一个Observable
序列进行订阅
func catchErrorOperate() {
let subject = PublishSubject<Int>()
let sequence = Observable.of(100,101,102)
subject.catchError {_ in
return sequence
}
.subscribe { event in
print(event)
}.disposed(by: bag)
subject.onNext(1)
subject.onNext(2)
subject.onError(BaseError.BCD)
subject.onNext(3)
}
运行结果:
next(1)
next(2)
next(100)
next(101)
next(102)
completed
3 .retry
当遇到错误事件时,重新订阅该序列。retry()
方法传入数字表示重复次数,不传只重复一次
func retryOperate() {
var count = 1
let sequence = Observable<Int>.create { observer in
let error = NSError(domain: "Test", code: 0, userInfo: nil)
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
if count < 2 {
observer.onError(error)
count += 1
}
observer.onNext(3)
observer.onNext(4)
observer.onNext(5)
observer.onCompleted()
return Disposables.create()
}
sequence
.retry()
.subscribe {
print($0)
}.disposed(by: bag)
}
运行结果:
next(0)
next(1)
next(2)
next(0)
next(1)
next(2)
next(3)
next(4)
next(5)
completed