前言
对于RxSwift这种函数式编程
的框架,熟练运用高阶函数能便捷地实现响应的功能,在不同的应用场景,都有响应的高阶函数,学好高阶函数的使用在以后项目中是很重要的。
1、startWith
在源序列元素前插入序列元素
Observable.of("1","2","3","4")
.startWith("A")
.startWith("B")
.startWith("C","a","b")
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
//输出:C a b B A 1 2 3 4
2、merge
将源可观察序列中的元素组合成一个新的可观察序列,被合并的序列内部元素须相同类型
。
let sub1 = PublishSubject<String>()
let sub2 = PublishSubject<String>()
Observable.of(sub1,sub2)
.merge()
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
sub1.onNext("L")
sub1.onNext("c")
sub2.onNext("r")
sub2.onNext("s")
sub1.onNext("q")
//输出 L c r s q
3、zip
将多个源可观测序列组合成一个新的可观测序列,但只有被观测的序列的相同位置上都有数据时候才会响应。否则暂时保存相应位置的数据,等待其余序列相同位置的数据有了再一起响应。
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
let intSub1 = PublishSubject<Int>()
Observable.zip(stringSub, intSub,intSub1) { strElement, intElement,intE in
"\(strElement) \(intElement) \(intE)"
}
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
stringSub.onNext("A")
// 到这里存储了 AB 但是不会响应,除非另一个响应
stringSub.onNext("B")
//
intSub.onNext(1)
intSub.onNext(2)
//此时三个序列的第一个位置都有数据才会响应
intSub1.onNext(4)
stringSub.onNext("C")
intSub.onNext(3)
//输出: A 1 4
4、combineLatest
在zip的基础上取每个序列里最新的一个元素,即被观测序列里只保留最新
一个元素。前提也是满足被观测序列都有元素时才会响应。
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
stringSub.onNext("A") // intSub没有元素,不响应 只保存 A
stringSub.onNext("B") //覆盖 A intSub没有元素,不响应
intSub.onNext(1) // 两个序列都有 会响应 B 1
intSub.onNext(2) //覆盖 1 => B 2
stringSub.onNext("C") //覆盖 B => C 2
intSub.onNext(3) //覆盖 2 => C 3
//输出: B 1
// B 2
// C 2
// C 3
注意
:应用非常频繁, 比如账户和密码同时满足相应条件->才能登录,不关心账户密码怎么变化,只要查看最后有值就可以 loginEnable
5、switchLatest
将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
let swiSub1 = BehaviorSubject(value: "L")
let swiSub2 = BehaviorSubject(value: "1")
let swiSub = BehaviorSubject(value: swiSub1)
//选择了 swiSub1 就不会监听 swiSub2
swiSub.asObservable()
.switchLatest()
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
//先响应 L
swiSub1.onNext("c") // 覆盖 L
swiSub1.onNext("r") // 覆盖 c
swiSub2.onNext("2") // 暂时不监听 保存 2 覆盖 1
swiSub2.onNext("3") // 暂时不监听 保存 3 覆盖 2
swiSub.onNext(swiSub2) // 切换 监听 swiSub2 响应最新 3
swiSub1.onNext("s") // 暂时不监听 保存 s 覆盖 r
swiSub1.onNext("q") // 暂时不监听 保存 q 覆盖 s
swiSub2.onNext("4") //覆盖 3
swiSub.onNext(swiSub1) // 响应 最新 q
//输出:L
// c
// r
// 3
// 4
// q
同一时间只响应最新观测的序列最新
的发出元素。
6、map
转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
let ob = Observable.of(1,2,3,4)
ob.map { num -> Int in
return num + 2
}
.subscribe{ print($0.element as Any)}
.disposed(by: disposeBag)
7、flatMap、flatMapLatest
将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。
这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列
中还有模型序列
)
//类声明
struct LcrPlayer {
init(score: Int) {
self.score = BehaviorSubject(value: score)
}
let score: BehaviorSubject<Int>
}
//
let boy = LcrPlayer(score: 100)
let girl = LcrPlayer(score: 90)
let player = BehaviorSubject(value: boy)
player.asObservable()
.flatMap {
$0.score.asObservable()
}
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
boy.score.onNext(70)
player.onNext(girl)
boy.score.onNext(50)
boy.score.onNext(40)
// 如果flatMap切换到 flatMapLatest 50 和 40 就不会打印
girl.score.onNext(10)
girl.score.onNext(0)
//输出:100 70 90 50 40 10 0
flatMap和flatMapLatest不同点就是flatMapLatest只会打印被观测在观测期间最新一个
,flatMapLatest实际上是map和switchLatest操作符的组合。
8、filter
仅从满足指定条件的可观察序列中发出元素
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出:2 4 6 8
9、distinctUntilChanged
抑制可观察序列发出的顺序重复元素,相邻元素相同的话就忽略。
Observable.of("1","1","2","3","2","3","3","4","5","6")
.distinctUntilChanged()
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
//输出:1 2 3 2 3 4 5 6
10、element
仅在可观察序列发出的所有元素的指定索引处发出元素
Observable.of("L", "c", "r", "s", "q")
.element(at: 3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
11、single
只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
如果指定某一个,就会返回某一个元素,但如果没有指定的元素的话,会响应.error(Sequence doesn't contain any elements.)
Observable.of("Lcr", "sq")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出:Lcr
//Unhandled error happened: Sequence contains more than one element.
Observable.of("Lcr", "sq")
.single { $0 == "sq" }
.subscribe { print($0) }
.disposed(by: disposeBag)
//输出:next(sq)
//completed
Observable.of("Lcr", "sq")
.single { $0 == "s" }
.subscribe { print($0) }
.disposed(by: disposeBag)
//输出:error(Sequence doesn't contain any elements.)
12、take、takeLast、take(while:)
如果single只发出一个,那take可以指定发出的个数,take从前往后发出多少个,takeLast从左往右发出后面多少个,take(while:)发出满足条件的元素。
Observable.of("Tom", "Jack", "Jim", "Lily")
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出:Tom
// Jack
Observable.of("Tom", "Jack", "Jim", "Lily")
.takeLast(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出:Jim
//Lily
Observable.of(1,2,3,4,5,6,7)
.take(while: {$0 < 5})
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出:1 2 3 4
13、take(until:)
响应有效期截至后面接的序列响应为止。
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.take(until: referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("A")
sourceSequence.onNext("B")
sourceSequence.onNext("C")
referenceSequence.onNext("D") // 条件一出来,下面就走不了
sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
//输出:next(A)
//next(B)
//next(C)
//completed
14、skip
从源可观察序列发出元素,直到指定位置参考可观察序列发出元素。
略过前面指定多少次响应,订阅忽略的下一次响应开始。这个要重点,应用非常频繁 不用解释 textfiled 都会有默认序列产生
。
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出: 3 4 5 6
抑制从源可观察序列发出元素,直到参考可观察序列发出元素。
Observable.of(1, 2, 3, 4, 5, 6)
.skip { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出:1 2 3
sourceSeq
.skip(until: referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//作用是和.take(until: referenceSequence)恰恰相反。
15、reduce
给定一个初始值,然后逐个增加发出的元素。
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出: 1111
16、concat
以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止。
即前一个序列为完成时,后一个序列即使订阅了也响应不了。
平时开发可用来控制顺序。
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("B")
subject1.onNext("C")
subjectsSubject.onNext(subject2)
subject2.onNext("打印不出来")
subject2.onNext("2")
//
//subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
subject2.onNext("3")
subject2.onNext("4")
//输出:next(A)
//next(B)
//next(C)
放开subject1.onCompleted()注释,才能相继响应输出2 3 4 ,只响应subject2中最新的。
17、catchAndReturn
从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchAndReturn("2")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("A")
sequenceThatFails.onNext("B") // 正常序列发送成功的
sequenceThatFails.onError(self.lgError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
//输出:next(Hank)
//next(Kody)
//next(Cooci)
//completed
18、retry
通过无限地重新订阅可观察序列来恢复重复的错误事件。
var count = 1 // 外界变量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("A")
observer.onNext("B")
observer.onNext("C")
if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
observer.onError(self.lgError) // 接收到了错误序列,重试序列发生
print("错误序列来了")
count += 1
}
observer.onNext("D")
observer.onNext("E")
observer.onNext("F")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出:A B C 错误序列来了 A B C D E F
也可设置retry(3),来设置重试的次数。
19、multicast
将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
//
let netOB = Observable<Any>.create { observer -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
//
netOB.subscribe(onNext: { anything in
print("订阅1:", anything)
})
.disposed(by: disposeBag)
//
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的地方
// 所以再订阅一次 会出现什么问题?
netOB.subscribe(onNext: { anything in
print("订阅2:", anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
//输出:我开始请求网络了
//订阅1: 请求到的网络数据
//订阅2: 请求到的网络数据
//订阅1: 请求到的本地
//订阅2: 请求到的本地
//销毁回调了
当我们注释掉publish
以及netOB.connect()
两行代码。运行结果如下:
可见网络请求来了两次,即订阅了两次网络请求来了两次。
那为什么利用publish以及connect能达到避免多次请求的问题呢?
publish内部封装了
multicast
:就来到了ConnectableObservableAdapter的初始化方法:
source为外界源序列,makeSubject为publish里面的
{PublishSubject()}
,即也是一个序列。查看ConnectableObservableAdapter的方法中有自己实现的scbscribe订阅函数,因为ConnectableObservableAdapter并没有继承自Producer,就自己实现了subscribe函数。不然外面序列无法用到
.subscribe
函数,也就无法订阅了。
以及变量lazySubject和connect()函数的实现:
lazySubject这么写是保证只会产生一个,避免外界不同订阅会产生不同的lazySubject。
由ConnectableObservableAdapter初始化可知,makeSubject就是.publish方法中的生成PublishSubject
实例的闭包。那么lazySubject.subscribe就需要去PublishSubject中寻找实现了。
observer为外界源序列订阅时候产生的·AnonymousObserver·序列,所以源序列每订阅一次,就会产生一个新的
observer
,所以两次订阅就会来到PublishSubject()的两次subscribe函数的调用。PublishSubject().subscribe函数中也就是将·observer.on·保存在observers的属性_pairs数组及属性字典_dictionary中。
此处的observer.on即为ObserverBase基类中的.on
函数。
接下来我们把目光放到connect函数里面,生成的connection作为外界序列订阅闭包的参数,那么connection必定会响应.on函数,不然响应不到外界序列的订阅。
所以除了初始化方法,我们果然看到.on函数,里面说的也就是上面生成的lazySubject调用on函数,即
所以就来到dispatch()方法中,最终响应观察者.on方法也就是在这里:
依次在订阅时候保存observer.on的字典及数组中带入外界的.onNext()
事件(断点调试发现是在pairs数组里面调用了相关的事件函数),最终就会来到下面的调用:
总结: publish封装的multicast
就是将多个订阅的序列合并为一个序列,并按序响应外界的订阅事件。将所有事物保存在一起,虽然生成不一样的观察者,但响应订阅事件是在一块处理的,所以就达到了一次请求网络,多处使用的目的。