在
Rx
的世界里,充斥着函数响应式,如果想在项目中游刃有余的玩转RxSwift
那么RxSwift
的高阶函数是不我们绕不开的内容
下面就一起在RxSwift
高阶函数的世界中畅游吧
RxSwift
高阶函数分为七类
- 1.组合操作符
- 2.映射操作符
- 3.过滤条件操作符
- 4.集合控制操作符
- 5.从可观察对象的错误通知中恢复的操作符
- 6.debug Rx流程操作符
- 7.链接操作符
一.组合操作符
1.1 startWith
在开始从可观察源发出元素之前,发出指定的元素序列
// *** startWith : 在开始从可观察源发出元素之前,发出指定的元素序列
print("***** startWith *****")
Observable.of(1,2,3,4)
.startWith(5)
.startWith(6)
.startWith(7,8,9)
.subscribe {
print($0)
}
.disposed(by: disposeBag)
打印结果:7,8,9,5,6,1,2,3,4
1.2 merge
将源可观察序列中的元素组合成一个新的可观察序列,并将像每个源可观察序列发出元素一样发出每个元素
// **** merge : 将源可观察序列中的元素组合成一个新的可观察序列,并将像每个源可观察序列发出元素一样发出每个元素
print("***** merge *****")
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe {
print($0)
}
.disposed(by: disposeBag)
subject1.onNext("M")
subject1.onNext("G")
subject2.onNext("L")
subject2.onNext("O")
subject1.onNext("V")
subject2.onNext("E")
//任何一个响应都会勾起新序列响应
打印结果: MGLOVE
1.3 zip
将多达8个源可观察序列组合成一个新的可观察序列,并将从组合的可观察序列中发射出对应索引处每个源可观察序列的元素
// *** zip: 将多达8个源可观测序列组合成一个新的可观测序列,并将从组合的可观测序列中发射出对应索引处每个源可观测序列的元素
print("***** zip *****")
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}.subscribe {
print($0)
}
.disposed(by: disposeBag)
stringSubject.onNext("A")
stringSubject.onNext("B") // 到这里存储了 AB 但是不会响应,除非另一个响应
intSubject.onNext(1) //响应第一个
intSubject.onNext(2) //响应第二个
stringSubject.onNext("C") //存C
intSubject.onNext(3) //响应第三个
//stringSubject,intSubject 必须一一对应,成对出现
打印结果: (A,1) (B,2) (C,3)
1.4 combineLatest
将8个源可观察序列组合成一个新的观察序列,并将开始发出联合观察序列的每个源的最新元素,可观察序列一旦所有排放源序列至少有一个元素,并且当源可观察序列发出的任何一个新元素
// ***** combineLatest:将8源可观测序列组合成一个新的观测序列,并将开始发出联合观测序列的每个源的最新元素可观测序列一旦所有排放源序列至少有一个元素,并且当源可观测序列发出的任何一个新元素
print("***** combineLatest *****")
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) {strElement, intElement in
"\(strElement) \(intElement)"
}.subscribe {
print($0)
}
.disposed(by: disposeBag)
stringSub.onNext("M") //和zip类似 存一个
stringSub.onNext("O") //这里和zip不一样了 O覆盖了M 只会存储一个
intSub.onNext(1) //发现stringSub有值 响应(O,1)
intSub.onNext(2) //由于M被覆盖了 响应(O,2)
stringSub.onNext("T") //响应 (T,2)
// combineLatest 比较zip 会覆盖
// 应用非常频繁: 比如账户和密码同时满足->才能登录,不关心账户密码怎么变化,只要查看最后有值就可以 loginEnable
打印结果:(O,1) (O,2)(T,2)
1.5 switchLatest
将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
// switchLatest : 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
print("***** switchLatest *****")
let switchLatestSub1 = BehaviorSubject(value: "M")
let switchLatestSub2 = BehaviorSubject(value: "1")
let switchLatestSub = BehaviorSubject(value: switchLatestSub1)// 选择了 switchLatestSub1 就不会监听 switchLatestSub2
switchLatestSub.asObserver()
.switchLatest()
.subscribe {
print($0)
}
.disposed(by: disposeBag)
switchLatestSub1.onNext("G")
switchLatestSub1.onNext("L")
switchLatestSub2.onNext("2")
switchLatestSub2.onNext("3") //2-3都不会监听,但是默认保存由 2覆盖1 3覆盖2
switchLatestSub.onNext(switchLatestSub2) // 切换到 switchLatestSub2
switchLatestSub1.onNext("E")
switchLatestSub1.onNext("MO") // 原理同上面 下面如果再次切换到 switchLatestSub1 会打印出 TO
switchLatestSub2.onNext("TO")
二.映射操作符
2.1 map
转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列
// ***** map: 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
print("***** map *****")
let ob = Observable.of(1,2,3,4)
ob.map { number in
return number + 1 //将上面序列每个元素都加1 变成一个新的序列
}
.subscribe {
print($0)
}
.disposed(by: disposeBag)
打印结果:2,3,4,5
2.2 flatMap
and flatMapLatest
将可观察序列发射的元素转换为可观察序列,并将两个可观察序列的发射合并为一个可观察序列
这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射作出反应(序列中的序列:比如网络序列中还有模型序列)
flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观察序列发射元素
// *** flatMap and flatMapLatest
// 将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。
// 这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列中还有模型序列)
// flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观测序列发射元素
print("***** flatMap *****")
let boy = BehaviorSubject(value: 100)
let girl = BehaviorSubject(value: 90)
let player = BehaviorSubject(value: boy)
player.asObserver()
.flatMap { $0.asObserver() }
.subscribe {
print($0)
}
.disposed(by: disposeBag)
boy.onNext(60)
player.onNext(girl)
boy.onNext(50)
boy.onNext(40) // 如果切换到 flatMapLatest 就不会打印
girl.onNext(10)
girl.onNext(0)
// flatMapLatest实际上是map和switchLatest操作符的组合。
2.3 scan
从初识就带有一个默认值,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果
// ** scan: 从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果
print("***** scan *****")
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2
}
.subscribe {
print($0)
}
.disposed(by: disposeBag)
// 这里主要强调序列值之间的关系
//打印结果 12,112,1112
三.过滤条件操作符
3.1 filter
仅从满足指定条件的可观察序列中发出那些元素
// **** filter : 仅从满足指定条件的可观察序列中发出那些元素
print("***** filter *****")
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter {$0 % 2 == 0} //挑选出其中的偶数
.subscribe {
print($0)
}
.disposed(by: disposeBag)
3.2 distinctUntilChanged
抑制可观察序列发出的顺序重复元素
// ***** distinctUntilChanged: 抑制可观察序列发出的顺序重复元素
print("***** distinctUntilChanged *****")
Observable.of("1", "2", "2", "2", "3", "3", "4")
.distinctUntilChanged()
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 1,2,3,4
3.3 elementAt
仅在可观察序列发出的所有元素的指定索引处发出元素
// **** elementAt: 仅在可观察序列发出的所有元素的指定索引处发出元素
print("***** elementAt *****")
Observable.of("1", "2", "3", "4", "5")
.element(at: 0) //下标
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 1
3.4 single
只发出可观察序列发出的第一个元素(或满足条件的第一个元素),如果可观察序列发出多个元素,将抛出一个错误
// *** single: 只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
print("***** single *****")
Observable.of("1", "2")
.single()
.subscribe {
print($0)
}
.disposed(by: disposeBag)
Observable.of("A", "B")
.single {
$0 == "B"
}
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 1,错误,B
3.5 take
只从一个可观察序列的开始发出指定数量的元素,上面single只有一个序列,在实际开发会受到局限,这里引出take想几个就几个
// **** take: 只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个
print("***** take *****")
Observable.of("A","B","C","D")
.take(2) //打印的数量
.subscribe {
print($0)
}
.disposed(by: disposeBag)
3.6 takeLast
仅从可观察序列的末尾发出指定数量的元素
// *** takeLast: 仅从可观察序列的末尾发出指定数量的元素
print("***** takeLast *****")
Observable.of("A", "B", "C", "D")
.takeLast(1) //打印的数量
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//打印结果 D
3.7 takeWhile
只要指定条件的值为true,就从可观察序列的开始发出元素
// **** takeWhile: 只要指定条件的值为true,就从可观察序列的开始发出元素
print("***** takeWhile *****")
Observable.of(1, 2, 3, 4, 5, 6)
.take(while: { $0 < 3 })
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 1,2
3.8 takeUntil
从源可观察序列发出元素,直到参考可观察序列发出元素,非常常用,比如我的页面销毁了,就不能获取值了(cell重用运用)
// ***** takeUntil: 从源可观察序列发出元素,直到参考可观察序列发出元素
// 这个要重点,应用非常频繁 比如我页面销毁了,就不能获取值了(cell重用运用)
print("***** takeUntil *****")
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.take(until: referenceSequence)
.subscribe {
print($0)
}
.disposed(by: disposeBag)
sourceSequence.onNext("A")
sourceSequence.onNext("B")
sourceSequence.onNext("C")
referenceSequence.onNext("1") //条件出来。下面就不走了
sourceSequence.onNext("D")
sourceSequence.onNext("E")
sourceSequence.onNext("F")
//打印结果 A,B,C
3.8 skip
从源可观察序列发出元素,直到参考可观察序列发出元素,非常常用,比如textField会有默认序列的产生
// ***** skip: 从源可观察序列发出元素,直到参考可观察序列发出元素
// 这个要重点,应用非常频繁 不用解释 textfiled 都会有默认序列产生
print("***** skip *****")
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2) //跳过2次,1,2不执行
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 3,4,5,6
3.9 skipUntil
抑制从源可观察序列发出元素,直到参考可观察序列发出元素
// *** skipUntil: 抑制从源可观察序列发出元素,直到参考可观察序列发出元素
print("***** skipUntil *****")
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq
.skip(until: referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 没有条件命令 下面走不了
sourceSeq.onNext("A")
sourceSeq.onNext("B")
sourceSeq.onNext("C")
referenceSeq.onNext("1") // 条件一出来,下面就可以走了
sourceSeq.onNext("D")
sourceSeq.onNext("E")
sourceSeq.onNext("F")
//打印结果 D,E,F
四.集合控制操作符
4.1 toArray
将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
// *** toArray: 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
print("***** toArray *****")
Observable.range(start: 1, count: 10)
.toArray()
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 1-10
4.2 reduce
从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似scan
// *** reduce: 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似scan
print("***** reduce *****")
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 1111
4.3 concat
以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止,用来控制顺序
// *** concat: 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止
// 用来控制顺序
print("***** concat *****")
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("B")
subject1.onNext("C")
subjectsSubject.onNext(subject2)
subject2.onNext("打印不出来")
subject2.onNext("2")
subject2.onNext("3") //会保存最后一个
subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
subject2.onNext("4")
//打印结果:A,B,C,3,4
五. 从可观察对象的错误通知中恢复的操作符。
5.1 catchErrorJustReturn
从错误事件中恢复,方式是返回一个可观察的序列,该序列发出单个元素,然后终止
// **** catchErrorJustReturn
// 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
print("***** catchErrorJustReturn *****")
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchAndReturn("A")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("B")
sequenceThatFails.onNext("C") // 正常序列发送成功的
sequenceThatFails.onError(self.mgError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
//打印结果 A,B,C,完成
5.2 catchError
通过切换到提供的恢复可观察序列,从错误事件中恢复
// **** catchError
// 通过切换到提供的恢复可观察序列,从错误事件中恢复
print("***** catchError *****")
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails.catch {
print("Error:", $0)
return recoverySequence // 获取到了错误序列-我们在中间的闭包操作处理完毕,返回给用户需要的序列(showAlert)
}
.subscribe {
print($0)
}
.disposed(by: disposeBag)
sequenceThatFails.onNext("A")
sequenceThatFails.onNext("B") // 正常序列发送成功的
sequenceThatFails.onError(mgError) // 发送失败的序列
recoverySequence.onNext("C")
//打印结果: 错误, C
5.3 retry
通过无限的重新订阅可观察序列来恢复重复的错误事件
// *** retry: 通过无限地重新订阅可观察序列来恢复重复的错误事件
print("***** retry *****")
var count = 1 //外界变量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("A")
if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
observer.onError(self.mgError)// 接收到了错误序列,重试序列发生
print("错误序列来了")
count += 1
}
observer.onNext("1")
return Disposables.create()
}
sequenceRetryErrors.retry()
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果: A,错误序列来了,A,1
5.4 retry(_:)
通过重新订阅可观察序列,重复的从错误事件中恢复,直到重试次数达到Max未遂计数
// **** retry(_:): 通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数
print("***** retry(_:) *****")
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("A")
if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
observer.onError(self.mgError)
print("错误序列来了")
count += 1
}
observer.onNext("1")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3) //重复次数
.subscribe {
print($0)
}
.disposed(by: disposeBag)
//打印结果 A,错误序列来了,A,错误序列来了,A,错误序列来了,error
六. Rx流程操作符
6.1 debug
打印所有订阅、事件和处理
// 打印所有订阅、事件和处理。
print("***** debug *****")
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("A")
if count < 5 {
observer.onError(self.mgError)
print("错误序列来了")
count += 1
}
observer.onNext("1")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(1) //重复次数
.debug()
.subscribe {
print($0)
}
.disposed(by: disposeBag)
6.2 RxSwift.Resources.total:
提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用
// ** RxSwift.Resources.total: 提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用。
print("***** RxSwift.Resources.total *****")
print(RxSwift.Resources.total)
let subject = BehaviorSubject(value: "Cooci")
let subscription1 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
let subscription2 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
subscription1.dispose()
print(RxSwift.Resources.total)
subscription2.dispose()
print(RxSwift.Resources.total)
七.链接操作符
7.1 multicast
将源可观察序列转换为可连接序列,并通过指定的主题广播其发射
// *** multicast : 将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
print("***** multicast *****")
let subject = PublishSubject<Any>()
subject.subscribe{ print("00:\($0)") }
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { observer -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
netOB.subscribe(onNext: { anything in
print("订阅1:", anything)
})
.disposed(by: disposeBag)
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的地方
// 所以再订阅一次 会出现什么问题?
netOB.subscribe(onNext: { anything in
print("订阅2:", anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
7.2 replay
将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前排放的缓冲大小
拥有和publish一样的能力,共享Observable sequence,其次使用replay还需要我们传入一个参数(buffer size)来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者
// **** replay: 将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前排放的缓冲大小
// 首先拥有和publish一样的能力,共享 Observable sequence, 其次使用replay还需要我们传入一个参数(buffer size)来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者
print("***** replay *****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
interval.subscribe(onNext: { print(Date.time, "订阅: 1, 事件: \($0)") })
.disposed(by: self.disposeBag)
delay(2) { _ = interval.connect() }
delay(4) {
interval.subscribe(onNext: { print(Date.time, "订阅: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(8) {
interval.subscribe(onNext: { print(Date.time, "订阅: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(20, closure: {
self.disposeBag = DisposeBag()
})
/**
订阅: 1, 事件: 4
订阅: 1, 事件: 0
2019-05-28 21-32-42 订阅: 2, 事件: 0
2019-05-28 21-32-42 订阅: 1, 事件: 1
2019-05-28 21-32-42 订阅: 2, 事件: 1
2019-05-28 21-32-45 订阅: 2, 事件: 4
2019-05-28 21-32-46 订阅: 3, 事件: 0
2019-05-28 21-32-46 订阅: 3, 事件: 1
2019-05-28 21-32-46 订阅: 3, 事件: 2
2019-05-28 21-32-46 订阅: 3, 事件: 3
2019-05-28 21-32-46 订阅: 3, 事件: 4
// 序列从 0开始
// 定时器也没有断层 sub2 sub3 和 sub1 是同步的
*/
7.3 push
将源可观察序列转换为可连接序列
共享一个Observable的事件序列,避免创建多个Observable sequence
注意:需要调用connect之后才会开始发送事件
// **** push:将源可观察序列转换为可连接序列
// 共享一个Observable的事件序列,避免创建多个Observable sequence。
// 注意:需要调用connect之后才会开始发送事件
print("***** testPushConnect *****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") })
.disposed(by: disposeBag)
delay(2) {
_ = interval.connect()
}
delay(4) {
interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(6) {
interval.subscribe(onNext: { print("订阅: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(10, closure: {
self.disposeBag = DisposeBag()
})
/**
订阅: 1, 事件: 1
订阅: 2, 事件: 1
订阅: 1, 事件: 2
订阅: 2, 事件: 2
订阅: 1, 事件: 3
订阅: 2, 事件: 3
订阅: 3, 事件: 3
订阅: 2 从1开始
订阅: 3 从3开始
*/
// 但是后面来的订阅者,却无法得到之前已发生的事件