4. 集合控制操作符
4.1 toArray
- 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
print("*****toArray*****")
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
4.2 reduce
- 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似
scan
print("*****reduce*****")
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
4.3 concat
- 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止, 用来控制顺序
print("*****concat*****")
let subject1 = BehaviorSubject(value: "AAA")
let subject2 = BehaviorSubject(value: "1")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("BBB")
subject1.onNext("CCC")
subjectsSubject.onNext(subject2)
subject2.onNext("打印不出来")
subject2.onNext("2")
subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
subject2.onNext("3")
5. 从可观察对象的错误通知中恢复的操作符
5.1 catchErrorJustReturn
- 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
print("*****catchErrorJustReturn*****")
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("Cooci")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AAA")
sequenceThatFails.onNext("BBB") // 正常序列发送成功的
sequenceThatFails.onError(self.lgError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
5.2 catchError
print("*****catchError*****")
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence // 获取到了错误序列-我们在中间的闭包操作处理完毕,返回给用户需要的序列(showAlert)
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AAA")
sequenceThatFails.onNext("BBB") // 正常序列发送成功的
sequenceThatFails.onError(lgError) // 发送失败的序列
recoverySequence.onNext("CCC")
5.3 retry
var count = 1 // 外界变量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
observer.onError(self.lgError) // 接收到了错误序列,重试序列发生
print("错误序列来了")
count += 1
}
observer.onNext("DDD")
observer.onNext("EEE")
observer.onNext("FFF")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
5.4 retry(_:)
- 通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数
print("*****retry(_:)*****")
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
observer.onError(self.lgError)
print("错误序列来了")
count += 1
}
observer.onNext("DDD")
observer.onNext("EEE")
observer.onNext("FFF")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
6. debug Rx流程操作符
6.1 debug
print("*****debug*****")
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count < 5 {
observer.onError(self.lgError)
print("错误序列来了")
count += 1
}
observer.onNext("DDD")
observer.onNext("EEE")
observer.onNext("FFF")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
7. RxSwift.Resources.total 操作符
7. 1 RxSwift.Resources.total
- 提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用。
print("*****RxSwift.Resources.total*****")
print(RxSwift.Resources.total)
let subject = BehaviorSubject(value: "Cooci")
let subscription1 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
let subscription2 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
subscription1.dispose()
print(RxSwift.Resources.total)
subscription2.dispose()
print(RxSwift.Resources.total)
8. 链接操作符
8.1 multicast
- 将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
print("*****multicast*****")
let subject = PublishSubject<Any>()
subject.subscribe{print("00:\($0)")}
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
// 所以在订阅一次 会出现什么问题?
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
8.2 replay
- 将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前排放的缓冲大小
- 首先拥有和
publish
一样的能力,共享 Observable sequence
, 其次使用replay
还需要我们传入一个参数(buffer size)
来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者
print("*****replay*****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
interval.subscribe(onNext: { print(Date.time,"订阅: 1, 事件: \($0)") })
.disposed(by: self.disposeBag)
delay(2) { _ = interval.connect() }
delay(4) {
interval.subscribe(onNext: { print(Date.time,"订阅: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(8) {
interval.subscribe(onNext: { print(Date.time,"订阅: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(20, closure: {
self.disposeBag = DisposeBag()
})
/**
订阅: 1, 事件: 4
订阅: 1, 事件: 0
2019-05-28 21-32-42 订阅: 2, 事件: 0
2019-05-28 21-32-42 订阅: 1, 事件: 1
2019-05-28 21-32-42 订阅: 2, 事件: 1
2019-05-28 21-32-45 订阅: 2, 事件: 4
2019-05-28 21-32-46 订阅: 3, 事件: 0
2019-05-28 21-32-46 订阅: 3, 事件: 1
2019-05-28 21-32-46 订阅: 3, 事件: 2
2019-05-28 21-32-46 订阅: 3, 事件: 3
2019-05-28 21-32-46 订阅: 3, 事件: 4
// 序列从 0开始
// 定时器也没有断层 sub2 sub3 和 sub1 是同步的
*/
8.3 push
- 将源可观察序列转换为可连接序列,共享一个Observable的事件序列,避免创建多个
Observable sequence
注意:需要调用connect之后才会开始发送事件
print("*****testPushConnect*****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") })
.disposed(by: disposeBag)
delay(2) {
_ = interval.connect()
}
delay(4) {
interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(6) {
interval.subscribe(onNext: { print("订阅: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(10, closure: {
self.disposeBag = DisposeBag()
})
/**
订阅: 1, 事件: 1
订阅: 2, 事件: 1
订阅: 1, 事件: 2
订阅: 2, 事件: 2
订阅: 1, 事件: 3
订阅: 2, 事件: 3
订阅: 3, 事件: 3
订阅: 2 从1开始
订阅: 3 从3开始
*/
// 但是后面来的订阅者,却无法得到之前已发生的事件
8.4 没有共享序列
print("*****testWithoutConnect*****")
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") })
.disposed(by: disposeBag)
delay(3) {
interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(10, closure: {
self.disposeBag = DisposeBag()
})
// 发现有一个问题:在延时3s之后订阅的Subscription: 2的计数并没有和Subscription: 1一致,而是又从0开始了,如果想共享,怎么办?