在RxSwift中,有许多实用的操作符,也可以称之为高阶函数,可以帮助我们创建特定功能的序列,或者组合变换原有的序列,生成一个新的序列。这里记录一下,可以作为手册以供快速查看。
组合操作符
- merge
将多个序列合并成一个新序列,当这多个序列中某一个序列发出一个元素时,新序列就将这个元素发出,当某一个序列发出error时,新序列也发出error,并终止序列。
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("R")
subject1.onNext("x")
subject2.onNext("S")
subject2.onNext("w")
subject1.onNext("i")
subject2.onNext("f")
subject1.onNext("t")
打印:
R
x
S
w
i
f
t
- zip
将多个(最多不超过8个) 序列的元素组合压缩,而且是等到每个序列元素事件一一对应地凑齐之后再合并,然后将合并的结果元素发出来。组合时,严格按照序列的索引数进行组合。即返回序列第一个元素,是由每一个源序列的第一个元素组合出来,它的第二个元素 ,是由每一个源序列的第二个元素组合出来的,以此类推。它的元素数量等于源序列中元素数量最少的那个序列。
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("R")
stringSubject.onNext("x") // 到这里存储了 R X, 但是不会响应
intSubject.onNext(1) // 与R 组合成一个新序列,元素:R 1
intSubject.onNext(2) // 与X 组合成一个新序列,元素:X 2
stringSubject.onNext("Swift") // 保存Swift
intSubject.onNext(3) // 与Swift 组合成一个新序列,元素:Swift 3
// 划重点: 只有两个序列同时有值的时候才会响应,否则存值
打印:
R 1
x 2
Swift 3
- combineLatest
将多个序列中最新的元素组合起来,然后将这个组合的结果发出来。这些源序列中任何一个发出一个元素,他都会发出一个元素(前提是,这些序列曾经发出过元素)。应用:同时满足条件下触发业务,如登录账号、密码合法性校验。
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSub.onNext("R") // 保存R
stringSub.onNext("x") // 保存x覆盖了R,和zip不一样
intSub.onNext(1) // 这时两个序列都有元素,响应 x 1
intSub.onNext(2) // 保存1覆盖了2,都有值, 响应 x 2
stringSub.onNext("Swift") // 保存Swift覆盖了x, 都有值,响应 Swift 2
打印:
x 1
x 2
Swift 2
- switchLatest
将序列发出的元素转换成一个新序列,并从新序列发出元素。
let switchLatestSub1 = BehaviorSubject(value: "R")
let switchLatestSub2 = BehaviorSubject(value: "1")
// 选择了 switchLatestSub1 就不会监听 switchLatestSub2
let switchLatestSub = BehaviorSubject(value: switchLatestSub1)
switchLatestSub.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
switchLatestSub1.onNext("x")
switchLatestSub1.onNext("_")
switchLatestSub2.onNext("2")
switchLatestSub2.onNext("3") // 2-3都会不会监听,保存的元素由2覆盖1,3覆盖2
switchLatestSub.onNext(switchLatestSub2) // 切换到 switchLatestSub2
switchLatestSub1.onNext("*")
switchLatestSub1.onNext("Swift") // 原理同上,最后保存的元素为Swift
switchLatestSub2.onNext("4")
switchLatestSub.onNext(switchLatestSub1) // 切换到 switchLatestSub1
打印:
R
x
_
3
4
Swift
- concat
将多个序列按顺序串联起来,只有当上一个序列发出了completed事件,才会开始发送下一个序列事件。
let concatSub1 = BehaviorSubject(value: "1")
let concatSub2 = BehaviorSubject(value: "A")
let concatSub = BehaviorSubject(value: concatSub1)
concatSub.asObservable()
.concat()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
concatSub1.onNext("2")
concatSub.onNext(concatSub2) // 切换到 concatSub2
concatSub2.onNext("B")
concatSub2.onNext("C") // B-C都不会监听,但是默认保存由 B覆盖A C覆盖B
concatSub1.onCompleted() // 结束concatSub1
concatSub2.onNext("D")
打印:
1
2
C
D
- startWith
在发出序列的事件元素之前,会先发出这些预先插入的事件元素。
Observable.of("1", "2", "3", "4")
.startWith("A")
.startWith("B")
.startWith("C", "a", "b")
.subscribe(onNext: { print($0) })
.disposed(by: DisposeBag())
打印:
C
a
b
B
A
1
2
3
4
- withLatestFrom
将两个序列最新的元素组合起来,当第一个序列发出一个元素,就将组合后的元素发送出来。
let withLatestFromStrSub = PublishSubject<String>()
let withLatestFromIntSub = PublishSubject<Int>()
// 当withLatestFromStrSub发出一个元素时,
// 就立即取出withLatestFromIntSub最新的元素,
// 将withLatestFromStrSub 中最新的元素strElement和withLatestFromIntSub最新的元素intElement组合,
//然后把组合结果 strElement+intElement发送出去
withLatestFromStrSub.withLatestFrom(withLatestFromIntSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// withLatestFromStrSub发出一个元素时,
// 立即取出withLatestFromIntSub最新的元素,
// 然后把withLatestFromIntSub中最新的元素发送出去
// withLatestFromStrSub.withLatestFrom(withLatestFromIntSub)
// .subscribe(onNext: { print($0) })
// .disposed(by: disposeBag)
withLatestFromStrSub.onNext("R")
withLatestFromStrSub.onNext("x") // 存了x覆盖R
withLatestFromIntSub.onNext(1)
withLatestFromIntSub.onNext(2) // 存了2覆盖1
withLatestFromStrSub.onNext("Swift") // 存了Swift覆盖x,withLatestFromIntSub有最新元素2,发出组合序列事件
withLatestFromStrSub.onNext("Hello") // 存了Hello覆盖Swift,withLatestFromIntSub有最新元素2,发出组合序列事件
打印:
Swift 2
Hello 2
2
2
转换操作符
- map
通过传入一个闭包把原序列转变为一个新序列,map函数会将原序列的所有元素进行转换。
let ob = Observable.of(1,2,3,4)
ob.map { (number) -> Int in
return number+2
}
.subscribe{
print("\($0)")
}
.disposed(by: disposeBag)
打印:
next(3)
next(4)
next(5)
next(6)
completed
- flatMap
升维:flatMap 操作符会对源序列的每一个元素应用一个转换方法,将他们转换成 Observables,降维:然后将这些 Observables 的元素合并成一个序列之后再发送出来。
let strSub = BehaviorSubject(value: "A")
let intSub = BehaviorSubject(value: "1")
let flatSub = BehaviorSubject(value: strSub)
flatSub.asObservable()
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
strSub.onNext("B")
flatSub.onNext(intSub) // 加入intSub序列,发出intSub和strSub序列元素转化后的序列
intSub.onNext("2")
strSub.onNext("C")
intSub.onNext("3")
打印:
A
B
1
2
C
3
- flatMapLatest
将序列元素转换成 Observables,然后取这些 Observables 中最新的一个发送。若转换出一个新的 Observables,就只发出它的元素,旧的 Observables 的元素将被忽略掉。
let strSub = BehaviorSubject(value: "A")
let intSub = BehaviorSubject(value: "1")
let flatSub = BehaviorSubject(value: strSub)
flatSub.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
strSub.onNext("B")
// 加入intSub序列,发出intSub元素转化后的序列,忽略strSub的转化序列
flatSub.onNext(intSub)
intSub.onNext("2")
strSub.onNext("C")
intSub.onNext("3")
打印:
A
B
1
2
3
flatMapLatest
实际上是map
和switchLatest
操作符的组合。
相对的有flatMapFirst
,flatMapFirst
只会接收最初加入序列的元素事件。
- concatMap
将源序列的每一个元素转换,转化成一个 Observables。然后将这些Observables 按顺序发出元素。与flatMap 的区别是:当前一个序列的元素发送完毕后,下一个 Observable 才可以开始发出元素。
let concatMapSubject1 = BehaviorSubject(value: "A")
let concatMapSubject2 = BehaviorSubject(value: "1")
let concatMapSubject = BehaviorSubject(value: concatMapSubject1)
concatMapSubject.asObservable()
.concatMap() { $0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
concatMapSubject1.onNext("B")
concatMapSubject1.onNext("C")
concatMapSubject.onNext(concatMapSubject2)
concatMapSubject2.onNext("2")
concatMapSubject2.onNext("3") // 保存3覆盖2
// concatMapSubject1结束,开始订阅concatMapSubject2的元素
concatMapSubject1.onCompleted()
concatMapSubject2.onNext("4")
打印:
next(A)
next(B)
next(C)
next(3)
next(4)
- scan
对第一个元素和传入的初始参数使用传入的闭包运算,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的闭包运算中,创建第二个元素。以此类推,直到遍历完全部的元素。也就是scan 先给一个初始化的参数数,然后不断的拿前一个结果和最新的元素进行运算。
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
aggregateValue + newValue // 10 + 2, 100 + 12, 1000 + 112
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
12
112
1112
- reduce
将对第一个元素使用传入的闭包运算。然后,将结果作为参数填入到第二个元素的闭包运算中。以此类推,直到遍历完全部的元素后发出最终结果。
Observable.of(10, 100, 1000)
.reduce(2) { aggregateValue, newValue in
aggregateValue + newValue // 1000 + 100 + 10 + 2
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
1112
- toArray
将一个序列转成一个数组,并作为一个单一的事件发送,然后结束。
Observable.of(1, 2, 3)
.toArray()
.subscribe({ print($0) })
.disposed(by: disposeBag)
打印:
success([1, 2, 3])
过滤操作符
- filter
用来过滤掉不符合条件的事件,仅仅发出序列中通过判定的元素。
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
2
4
6
8
0
- single
只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
Observable.of("Rx", "Swift")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
Rx
Unhandled error happened: Sequence contains more than one element.
subscription called from:
Observable.of("Rx", "Swift")
.single { $0 == "Swift" }
.subscribe { print($0) }
.disposed(by: disposeBag)
打印:
next(Swift)
completed
- take
只发出头n个元素,忽略掉后面的元素,直接结束序列。
print("*****take*****")
Observable.of("A", "B","C", "D")
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
A
B
- takeLast
仅发送序列的后n个元素,忽略前面的元素。
Observable.of("R", "x","S", "wift")
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
x
S
wift
- takeWhile
依次判断序列的每一个值是否满足给定的条件, 当第一个不满足条件的值出现时,序列便自动结束。
Observable.of(1, 2, 3, 4, 2, 6)
.takeWhile { $0 < 3 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
1
2
- takeUntil
监听源Observable,同时监听一个条件序列。若条件序列发出一个元素或者产生一个终止事件,那么源Observable 将自动完成,停止发送事件。
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("A")
sourceSequence.onNext("B")
sourceSequence.onNext("C")
referenceSequence.onNext("0") // 条件序列订阅发出,源序列就结束
sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
打印:
next(A)
next(B)
next(C)
completed
- skip
用于跳过序列发出的前n 个元素事件。
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
3
4
5
6
- skipWhile
忽略源序列中头几个满足条件的事件。
Observable.of(1, 7, 3, 4, 5, 6)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
7
3
4
5
6
- skipUntil
与takeUntil类似,监听源Observable,同时监听一个条件序列。跳过Observable 中头几个元素,直到条件序列发出一个元素。
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq.skipUntil(referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 没有条件序列事件,源序列无法订阅
sourceSeq.onNext("A")
sourceSeq.onNext("B")
sourceSeq.onNext("C")
referenceSeq.onNext("0") // 订阅条件序列,源序列就开始订阅
sourceSeq.onNext("1")
sourceSeq.onNext("2")
sourceSeq.onNext("3")
打印:
1
2
3
- elementAt
只发出出来中的第 n 个元素,即是只处理指定位置的元素事件。
Observable.of("R", "X", "S", "wift")
.elementAt(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
wift
- distinctUntilChanged
用于过滤掉连续重复的事件。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。不连续的相同元素不受影响。
Observable.of("1", "2", "2", "2", "3", "3", "4")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
1
2
3
4
- amb
amb操作符在多个源 Observables中, 取第一个发出元素或产生事件的 Observable,这个事件可以是一个 next,error 或者 completed事件,然后就只发出这个Observable的元素事件,忽略掉其他的Observables。
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let subject3 = PublishSubject<Int>()
subject1
.amb(subject2)
.amb(subject3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject2.onNext(2) // 先发出事件,之后就只发出subject2的事件
subject1.onNext(1)
subject2.onNext(2)
subject1.onNext(1)
subject3.onNext(3)
subject2.onNext(2)
subject1.onNext(1)
subject3.onNext(3)
subject3.onNext(3)
打印:
2
2
2
Debug 操作符
- debug
打印所有的订阅,事件以及销毁信息。
打开RxSwift的Debug Mode方式:在 podfile 的末端添加如下代码,重新 pod install 一下。
post_install do |installer|
installer.pods_project.targets.each do |target|
if target.name == 'RxSwift'
target.build_configurations.each do |config|
if config.name == 'Debug'
config.build_settings['OTHER_SWIFT_FLAGS'] ||= ['-D', 'TRACE_RESOURCES']
end
end
end
end
end
var count = 1
let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("A")
observer.onNext("B")
observer.onNext("C")
if count < 3 {
observer.onError(error)
print("onError")
count += 1
}
observer.onNext("D")
observer.onNext("E")
observer.onNext("F")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
2019-08-27 17:44:45.792: ViewController.swift:572 (testDebug()) -> subscribed
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(A)
A
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(B)
B
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(C)
C
2019-08-27 17:44:45.842: ViewController.swift:572 (testDebug()) -> Event error(Error Domain=com.xxx Code=10000 "(null)")
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
subscription called from:
2019-08-27 17:44:45.845: ViewController.swift:572 (testDebug()) -> isDisposed
onError
- RxSwift.Resources.total
提供所有Rx资源分配的计数,在开发期间检测泄漏非常有用。
print(RxSwift.Resources.total)
let subject = BehaviorSubject(value: "RxSwift")
let subscription1 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
let subscription2 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
subscription1.dispose()
print(RxSwift.Resources.total)
subscription2.dispose()
print(RxSwift.Resources.total)
打印:
4
RxSwift
11
RxSwift
14
12
10
连接操作符
- publish
将序列转换为可被连接的序列。可被连接的序列 在被订阅后不会发出元素,直到connect
操作符被应用为止。这样你就可以控制序列在什么时候开始发出元素。
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") })
.disposed(by: disposeBag)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
// 在connect之前的订阅不会发出元素,直到connect发出后
print("do connect")
_ = interval.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
interval.subscribe(onNext: { print("订阅: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
self.disposeBag = DisposeBag()
}
打印:
订阅: 1, 事件: 0
订阅: 2, 事件: 0
订阅: 1, 事件: 1
订阅: 2, 事件: 1
订阅: 1, 事件: 2
订阅: 2, 事件: 2
订阅: 3, 事件: 2
订阅: 1, 事件: 3
订阅: 2, 事件: 3
订阅: 3, 事件: 3
- replay
将序列转换为可被连接的序列,并且这个可被连接的序列将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("A")
observer.onNext("B")
observer.onNext("C")
observer.onNext("D")
observer.onNext("E")
observer.onNext("F")
return Disposables.create()
}
.replay(5) // 缓存5个元素
sequenceThatErrors
.subscribe(onNext: {
print("before connect subscribe \($0)")
})
.disposed(by: disposeBag)
// 在connect之前的订阅不会发出元素,直到connect发出后
print("do connect")
_ = sequenceThatErrors.connect()
sequenceThatErrors
.subscribe(onNext: {
print("after connect first subscribe \($0)")
})
.disposed(by: disposeBag)
sequenceThatErrors
.subscribe(onNext: {
print("after connect second subscribe \($0)")
})
.disposed(by: disposeBag)
打印:
do connect
before connect subscribe A
before connect subscribe B
before connect subscribe C
before connect subscribe D
before connect subscribe E
before connect subscribe F
after connect first subscribe B
after connect first subscribe C
after connect first subscribe D
after connect first subscribe E
after connect first subscribe F
after connect second subscribe B
after connect second subscribe C
after connect second subscribe D
after connect second subscribe E
after connect second subscribe F
- multicast
将序列转换为可被连接的序列,同时传入一个Subject
,每当序列发送事件时都会触发这个Subject
的发送。
let subject = PublishSubject<Any>()
subject.subscribe{print("subject:\($0)")}
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("开始请求网络")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁")
}
}.multicast(subject)
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
打印:
subject:next(请求到的网络数据)
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
subject:next(请求到的本地)
订阅1: 请求到的本地
订阅2: 请求到的本地
subject:completed
销毁
错误恢复操作符
- timeout
规定时间内没有产生元素就产生一个超时的error
事件
let times = [
[ "value": 1, "time": 1 ],
[ "value": 2, "time": 2 ],
[ "value": 3, "time": 4.1 ]
]
let timeOutOb = Observable.from(times)
.flatMap({ (anyTime) in
return Observable.of(anyTime["value"]!)
.delaySubscription(Double(anyTime["time"]!), scheduler: MainScheduler.instance)
})
.timeout(2, scheduler: MainScheduler.instance)
timeOutOb.subscribe(onNext: { (value) in
print("onNext:", value)
}, onError: { (error) in
print("error:", error)
})
.disposed(by: disposeBag)
打印:
onNext: 1.0
onNext: 2.0
error: Sequence timeout.
- catchErrorJustReturn
如果产生错误,返回一个可观察序列,该序列发出单个元素,然后终止。
let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("AA")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("BB")
sequenceThatFails.onNext("CC") // 正常序列发送
sequenceThatFails.onError(error) // 发送失败,返回设定的错误的预案
打印:
next(BB)
next(CC)
next(AA)
completed
- catchError
拦截一个error
事件,将它替换成其他序列,然后传递给观察者,订阅新的序列。
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence // 截获到了错误序列,,返回一个新序列
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AA")
sequenceThatFails.onNext("BB") // 正常序列发送成功的
sequenceThatFails.onError(error) // 发送失败的序列
recoverySequence.onNext("CC") // 替换的序列发送信号
sequenceThatFails.onNext("DD") // 原序列已结束,不能放松信号
recoverySequence.onNext("EE")
next(AA)
next(BB)
Error: Error Domain=com.xxx Code=10000 "(null)"
next(CC)
next(EE)
- retry
如果源序列产生一个错误事件,重新对它进行订阅,让它有机会不产生error
事件。即便源序列产生了一个error
事件,retry
总是对观察者发出 next 事件,所以这样可能会产生重复的元素。
var count = 1 // 外界变量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("AA")
observer.onNext("BB")
observer.onNext("CC")
if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
observer.onError(error) // 接收到了错误序列,重试序列发生
print("onError")
count += 1
}
observer.onNext("DD")
observer.onNext("EE")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
AA
BB
CC
onError
AA
BB
CC
DD
EE
设置重新订阅次数(retry(_:))
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("AA")
observer.onNext("BB")
observer.onNext("CC")
observer.onError(error)
observer.onNext("DD")
observer.onNext("EE")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3) // 重新订阅3次
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
打印:
AA
BB
CC
AA
BB
CC
AA
BB
CC
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
subscription called from:
总结:
篇幅较长,想快速了解请直接看这里。
-
组合操作符
- merge
将多个序列合并成一个新序列,当这多个序列中某一个序列发出一个元素时,新序列就将这个元素发出,当某一个序列发出error时,新序列也发出error,并终止序列。 - zip
将多个(最多不超过8个) 序列的元素组合压缩,而且是等到每个序列元素事件一一对应地凑齐之后再合并,然后将合并的结果元素发出来。 - combineLatest
将多个序列中最新的元素组合起来,然后将这个组合的结果发出来。 - switchLatest
将序列发出的元素转换成一个新序列,并从新序列发出元素。 - concat
将多个序列按顺序串联起来,只有当上一个序列发出了completed事件,才会开始发送下一个序列事件。 - startWith
在发出序列的事件元素之前,会先发出这些预先插入的事件元素。 - withLatestFrom
将两个序列最新的元素组合起来,当第一个序列发出一个元素,就将组合后的元素发送出来。
- merge
-
转换操作符
- map
通过传入的闭包把原序列转变为一个新序列,map函数会将原序列的所有元素进行转换。 - flatMap
升维:对源序列的每一个元素应用转换方法,将他们转换成 Observables,降维:然后将这些 Observables 的元素合并成一个序列之后再发送出来。 - flatMapLatest
将序列元素转换成 Observables,然后取这些 Observables 中最新的一个发送。若转换出一个新的 Observables,就只发出它的元素,旧的 Observables 的元素将被忽略掉。 - concatMap
将源序列的每一个元素转换,转化成一个 Observables。然后将这些Observables 按顺序发出元素。与flatMap 的区别是:当前一个序列的元素发送完毕后,下一个 Observable 才可以开始发出元素。 - scan
遍历全部元素,对第一个元素和传入的初始参数使用传入的闭包运算,将结果作为第一个元素发出。然后不断的拿前一个结果和最新的元素进行运算,并发出结果。 - reduce
将对第一个元素使用传入的闭包运算。然后,将结果作为参数填入到第二个元素的闭包运算中。以此类推,直到遍历完全部的元素后发出最终结果。 - toArray
将一个序列转成一个数组,并作为一个单一的事件发送,然后结束。
- map
-
过滤操作符
- filter
用来过滤掉不符合条件的事件,仅仅发出序列中通过判定的元素。 - single
只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。 - take
只发出头n个元素,忽略掉后面的元素,直接结束序列。 - takeLast
仅发送序列的后n个元素,忽略前面的元素。 - takeWhile
依次判断序列的每一个值是否满足给定的条件, 当第一个不满足条件的值出现时,序列便自动结束。 - takeUntil
监听源Observable,同时监听一个条件序列。若条件序列发出一个元素或者产生一个终止事件,那么源Observable 将自动完成,停止发送事件。 - skip
用于跳过序列发出的前n 个元素事件。 - skipWhile
忽略源序列中头几个满足条件的事件。 - skipUntil
监听源Observable,同时监听一个条件序列。跳过Observable 中头几个元素,直到条件序列发出一个元素。 - elementAt
只发出出来中的第 n 个元素,即是只处理指定位置的元素事件。 - distinctUntilChanged
用于过滤掉连续重复的事件。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。不连续的相同元素不受影响。 - amb
在多个源 Observables中, 取第一个产生事件的 Observable,忽略掉其他的Observables。
- filter
-
Debug 操作符
- debug
打印所有的订阅,事件以及销毁信息。 - RxSwift.Resources.total
提供所有Rx资源分配的计数,在开发期间检测泄漏非常有用
- debug
-
连接操作符。
- publish
将序列转换为可被连接的序列。可被连接的序列 在被订阅后不会发出元素,直到 connect 操作符被应用为止。 - replay
将序列转换为可被连接的序列,并且这个可被连接的序列将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。 - multicast
将序列转换为可被连接的序列,同时传入一个 Subject,每当序列发送事件时都会触发这个 Subject 的发送。
- publish
-
错误恢复操作符
- timeout
规定时间内没有产生元素就产生一个超时的 error 事件。 - catchErrorJustReturn
如果产生错误,返回一个可观察序列,该序列发出单个元素,然后终止。 - catchError
拦截 error 事件,将它替换成其他序列,然后传递给观察者,订阅新的序列。 - retry
如果源序列产生一个错误事件,重新对它进行订阅,让它有机会不产生error
事件。可设置重新订阅次数,默认遇到一次错误事件重新订阅一次。
- timeout