1、决策树
我想要创建一个 Observable
- 产生特定的一个元素:
just
* 经过一段延时:timer
- 从一个序列拉取元素:
from
- 重复的产生某一个元素:
repeatElement
- 存在自定义逻辑:
create
- 每次订阅时产生:
deferred
- 每隔一段时间,发出一个元素:
interval
* 在一段延时后:timer
- 一个空序列,只有一个完成事件:
empty
- 一个任何事件都没有产生的序列:
never
我想要创建一个 Observable
通过组合其他的 Observables
- 任意一个
Observable
产生了元素,就发出这个元素:merge
- 让这些
Observables
一个接一个的发出元素,当上一个Observable
元素发送完毕后,下一个Observable
才能开始发出元素:concat
- 组合多个
Observables
的元素
* 当每一个Observable
都发出一个新的元素:zip
* 当任意一个Observable
发出一个新的元素:combineLatest
我想要转换 Observable
的元素后,再将它们发出来
- 对每个元素直接转换:
map
- 转换到另一个
Observable
:flatMap
* 只接收最新的元素转换的Observable
所产生的元素:flatMapLatest
* 每一个元素转换的Observable
按顺序产生元素:concatMap
- 基于所有遍历过的元素:
scan
我想要将产生的每一个元素,拖延一段时间后再发出:delay
我想要将产生的事件封装成元素发送出来
- 将他们封装成
Event<Element>
:materialize
* 然后解封出来:dematerialize
我想要忽略掉所有的 next
事件,只接收 completed
和 error
事件:ignoreElements
我想创建一个新的 Observable
在原有的序列前面加入一些元素:startWith
我想从 Observable
中收集元素,缓存这些元素之后在发出:buffer
我想将 Observable
拆分成多个 Observables
:window
- 基于元素的共同特征:
groupBy
我想只接收 Observable
中特定的元素
- 发出唯一的元素:
single
我想重新从 Observable
中发出某些元素
- 通过判定条件过滤出一些元素:
filter
- 仅仅发出头几个元素:
take
- 仅仅发出尾部的几个元素:
takeLast
- 仅仅发出第 n 个元素:
elementAt
- 跳过头几个元素
* 跳过头 n 个元素:skip
* 跳过头几个满足判定的元素:skipWhile
,skipWhileWithIndex
* 跳过某段时间内产生的头几个元素:skip
* 跳过头几个元素直到另一个Observable
发出一个元素:skipUntil
- 只取头几个元素
* 只取头几个满足判定的元素:takeWhile
,takeWhileWithIndex
* 只取某段时间内产生的头几个元素:take
* 只取头几个元素直到另一个Observable
发出一个元素:takeUntil
- 周期性的对
Observable
抽样:sample
- 发出那些元素,这些元素产生后的特定的时间内,没有新的元素产生:
debounce
- 直到元素的值发生变化,才发出新的元素:
distinctUntilChanged
* 并提供元素是否相等的判定函数:distinctUntilChanged
- 在开始发出元素时,延时后进行订阅:
delaySubscription
我想要从一些 Observables
中,只取第一个产生元素的 Observable
:amb
我想评估 Observable
的全部元素
- 并且对每个元素应用聚合方法,待所有元素都应用聚合方法后,发出结果:
reduce
- 并且对每个元素应用聚合方法,每次应用聚合方法后,发出结果:
scan
我想把 Observable
转换为其他的数据结构:as...
我想在某个 Scheduler
应用操作符:subscribeOn
- 在某个
Scheduler
监听:observeOn
我想要 Observable
发生某个事件时, 采取某个行动:do
我想要 Observable
发出一个 error
事件:error
- 如果规定时间内没有产生元素:
timeout
我想要 Observable
发生错误时,优雅的恢复
- 如果规定时间内没有产生元素,就切换到备选
Observable
:timeout
- 如果产生错误,将错误替换成某个元素 :
catchErrorJustReturn
- 如果产生错误,就切换到备选
Observable
:catchError
- 如果产生错误,就重试 :
retry
我创建一个 Disposable
资源,使它与 Observable
具有相同的寿命:using
我创建一个 Observable
,直到我通知它可以产生元素后,才能产生元素:publish
- 并且,就算是在产生元素后订阅,也要发出全部元素:
replay
- 并且,一旦所有观察者取消观察,他就被释放掉:
refCount
- 通知它可以产生元素了:
connect
2、Observable 的创建方式
just() :创建 Observable 发出唯一的一个元素。该方法通过传入一个默认值来初始化,构建一个只有一个元素的 Observable 队列,订阅完信息自动 complete。
let just = Observable.just(0)
//它相当于
let just = Observable<Int>.create { observer in
observer.onNext(0)
observer.onCompleted()
return Disposables.create()
}
Observable<[String]>.just(["Jack","Rose"])
.subscribe{(event) in
print(event)
}.disposed(by: disposeBag)
//next(["Jack", "Rose"])
//completed
of():此方法创建一个新的可观察实例,该实例具有可变数量的元素。该方法可以接受可变数量的参数(必需要是同类型的)。
// 多个元素 - 针对序列处理
Observable<String>.of("Jack","Rose")
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
//next(Jack)
//next(Rose)
//completed
// 字典
Observable<[String: Any]>.of(["name":"Jack","age":22])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
//next(["age": 22, "name": "Jack"])
//completed
// 数组
Observable<[String]>.of(["Jack","Rose"])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
//next(["Jack", "Rose"])
//completed
from():将其他类型或者数据结构转换为 Observable。
//将一个数组转换为 Observable:
let numbers = Observable.from([0, 1, 2])
//相当于
let numbers = Observable<Int>.create { observer in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onCompleted()
return Disposables.create()
}
//将一个可选值转换为 Observable:
let num: Int? = 1
let value = Observable.from(optional: num)
//相当于
let num: Int? = 1
let value = Observable<Int>.create { observer in
if let element = num {
observer.onNext(element)
}
observer.onCompleted()
return Disposables.create()
}
empty():empty 操作符将创建一个 Observable,这个 Observable 只有一个完成事件。
//使用
let emtyOb = Observable<Int>.empty()
//相当于
let emtyOb = Observable<Int>.create { observer in
observer.onCompleted()
return Disposables.create()
}
_ = emtyOb.subscribe(onNext: { (number) in
print("订阅:",number)
}, onError: { (error) in
print("error:",error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}
//完成回调
//释放回调
never():never 操作符将创建一个 Observable,这个 Observable 不会产生任何事件。
let id = Observable<Int>.never()
//它相当于
let id = Observable<Int>.create { observer in
return Disposables.create()
}
Observable<String>.never()
.subscribe { (event) in
print("走你",event)
}
.disposed(by: disposeBag)
error():error 操作符将创建一个 Observable,这个 Observable 只会产生一个 error 事件。
Observable<String>.error((NSError.init(domain: "domain", code: 4444, userInfo: ["errorInfo":"errorMsg"]))
.subscribe { (event) in
print("订阅:",event)
}
.disposed(by: disposeBag)
range():使用指定的调度程序生成并发送观察者消息,生成指定范围内的可观察整数序列。
Observable.range(start: 2, count: 5)
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 底层源码
init(start: E, count: E, scheduler: ImmediateSchedulerType) {
self._start = start
self._count = count
self._scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
repeatElement():创建一个 Observable,这个 Observable 将无止尽地发出同一个元素。
let id = Observable.repeatElement(0)
//相当于
let id = Observable<Int>.create { observer in
observer.onNext(0)
observer.onNext(0)
observer.onNext(0)
observer.onNext(0)
... // 无数次
return Disposables.create()
}
Observable<Int>.repeatElement(5)
.subscribe { (event) in
// print("订阅:",event)
}
.disposed(by: disposeBag)
generate():创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列
Observable.generate(initialState: 0,// 初始值
condition: { $0 < 10}, // 条件1
iterate: { $0 + 2 }) // 条件2 +2
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 数组遍历
let arr = ["Jack_1","Jack_2","Jack_3","Jack_4","Jack_5","Jack_6","Jack_7","Jack_8","Jack_9","Jack_10"]
Observable.generate(initialState: 0,// 初始值
condition: { $0 < arr.count}, // 条件1
iterate: { $0 + 1 }) // 条件2 +2
.subscribe(onNext: {
print("遍历arr:",arr[$0])
})
.disposed(by: disposeBag)
create():通过一个构建函数完整的创建一个 Observable。create 操作符将创建一个 Observable,你需要提供一个构建函数,在构建函数里面描述事件(next,error,completed)的产生过程。通常情况下一个有限的序列,只会调用一次观察者的 onCompleted 或者 onError 方法。并且在调用它们后,不会再去调用观察者的其他方法。
//创建一个 [0, 1, ... 8, 9] 的序列:
let id = Observable<Int>.create { observer in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onNext(4)
observer.onNext(5)
observer.onNext(6)
observer.onNext(7)
observer.onNext(8)
observer.onNext(9)
observer.onCompleted()
return Disposables.create()
}
deferred():deferred 操作符将等待观察者订阅它,才创建一个 Observable,它会通过一个构建函数为每一位订阅者创建新的 Observable。看上去每位订阅者都是对同一个 Observable 产生订阅,实际上它们都获得了独立的序列。在一些情况下,直到订阅时才创建 Observable 是可以保证拿到的数据都是最新的。
//用于标记是奇数、还是偶数
var isOdd = true
//使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
let factory : Observable<Int> = Observable.deferred {
//让每次执行这个block时候都会让奇、偶数进行交替
isOdd = !isOdd
//根据isOdd参数,决定创建并返回的是奇数Observable、还是偶数Observable
if isOdd {
return Observable.of(1, 3, 5 ,7)
}else {
return Observable.of(2, 4, 6, 8)
}
}
//第1次订阅测试
factory.subscribe { event in
print("\(isOdd)", event)
}
//第2次订阅测试
factory.subscribe { event in
print("\(isOdd)", event)
}
interval():interval 操作符将创建一个 Observable,它每隔一段设定的时间,发出一个索引数的元素。它将发出无数个元素。
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.subscribe{(event) in
print(event)
}
//.disposed(by: disposeBag)
timer():timer 操作符将创建一个 Observable,它在经过设定的一段时间后,产生唯一的一个元素。
//5秒种后发出唯一的一个元素0
let observable = Observable<Int>.timer(.seconds(5), scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
//.disposed(by: disposeBag)
另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素.
//延时5秒种后,每隔1秒钟发出一个元素
let observable = Observable<Int>.timer(.seconds(5), period: .seconds(1), scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
//.disposed(by: disposeBag)
3、高阶函数
1、组合
startWith:startWith 操作符会在 Observable
头部插入一些元素。
(如果你想在尾部加入一些元素可以用 concat)
let disposeBag = DisposeBag()
Observable.of("🐶", "🐱", "🐭", "🐹")
.startWith("1")
.startWith("2")
.startWith("3", "🅰️", "🅱️")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
3
🅰️
🅱️
2
1
🐶
🐱
🐭
🐹
merge:将多个 Observables 合并成一个。通过使用 merge 操作符你可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出。如果,某一个 Observable 发出一个 onError 事件,那么被合并的 Observable 也会将它发出,并且立即终止序列。
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("🅰️")
subject1.onNext("🅱️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("🆎")
subject2.onNext("③")
//result
🅰️
🅱️
①
②
🆎
③
zip:zip 操作符将多个(最多不超过8个) Observables 的元素通过一个函数组合起来,然后将这个组合的结果发出来。它会严格的按照序列的索引数进行组合。它的元素数量等于源 Observables 中元素数量最少的那个。
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.zip(first, second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
//result
1A
2B
3C
4D
combineLatest:当多个 Observables 中任何一个 Observable 发出一个新元素,combineLatest 就会发出一个元素。这个元素是由这些 Observables 中最新的元素,通过一个函数组合起来的。combineLatest 操作符将多个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables 曾经都发出过元素)。
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.combineLatest(first, second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
//结果
1A
2A
2B
2C
2D
3D
4D
switchLatest:将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素。
let switchLatestSub1 = BehaviorSubject(value: "L")
let switchLatestSub2 = BehaviorSubject(value: "1")
let switchLatestSub = BehaviorSubject(value: switchLatestSub1)// 选择了 switchLatestSub1 就不会监听 switchLatestSub2
switchLatestSub.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//L
switchLatestSub1.onNext("G")//G
switchLatestSub1.onNext("_")//_
switchLatestSub2.onNext("2")// 不会监听,但是默认保存由 2 覆盖 1
switchLatestSub2.onNext("3") // 不会监听,但是默认保存由 3 覆盖 2
switchLatestSub.onNext(switchLatestSub2) // 切换到 switchLatestSub2
switchLatestSub1.onNext("*")//不会监听,但是默认保存由 * 覆盖 _
switchLatestSub1.onNext("Cooci") //不会监听,但是默认保存由 Cooci 覆盖 *
switchLatestSub2.onNext("4")//4
2、映射
map:通过一个转换函数,将 Observable 的每个元素转换一遍。map 操作符将源 Observable 的每个元素应用你提供的转换方法,然后返回含有转换结果的 Observable。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * 10 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
10
20
30
*/
flatMap:先将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 合并。flatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let subject = BehaviorSubject(value: first)
subject.asObservable()
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("🐱")
subject.onNext(second)
second.onNext("🅱️")
first.onNext("🐶")
//result
👦🏻
🐱
🅰️
🅱️
🐶
flatMapLatest:将 Observable 的元素转换成其他的 Observable,然后取这些 Observables 中最新的一个。flatMapLatest 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。一旦转换出一个新的 Observable,就只发出它的元素,旧的 Observables 的元素将被忽略掉。
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let subject = BehaviorSubject(value: first)
subject.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("🐱")
subject.onNext(second)
second.onNext("🅱️")
first.onNext("🐶")
//result
👦🏻
🐱
🅰️
🅱️
scan:scan 操作符将对第一个元素应用一个函数,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的应用函数中,创建第二个元素。以此类推,直到遍历完全部的元素。
let disposeBag = DisposeBag()
Observable.of(10, 100, 1000)
.scan(1) { aggregateValue, newValue in
aggregateValue + newValue
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
11
111
1111
3、过滤
filter:仅仅发出 Observable 中通过判定的元素。filter 操作符将通过你提供的判定方法过滤一个 Observable。
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
2
4
6
8
0
*/
distinctUntilChanged:distinctUntilChanged 操作符将阻止 Observable 发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。
let disposeBag = DisposeBag()
Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
🐱
🐷
🐱
🐵
🐱
elementAt:elementAt 操作符将拉取 Observable 序列中指定索引数的元素,然后将它作为唯一的元素发出。
let disposeBag = DisposeBag()
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.elementAt(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
🐸
single:single 操作符将限制 Observable
只产生一个元素。如果 Observable
只有一个元素,它将镜像这个 Observable
。如果 Observable
没有元素或者元素数量大于一,它将产生一个 error
事件。
Observable.of("Jack", "Rose")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//Jack
//Unhandled error happened: Sequence contains more than one element.
Observable.of("Jack", "Rose")
.single { $0 == "Rose" }
.subscribe { print($0) }
.disposed(by: disposeBag)
//next(Rose)
//completed
take:通过 take 操作符你可以只发出头 n 个元素。并且忽略掉后面的元素,直接结束序列。
Observable.of("Hank", "Kody","Cooci", "CC")
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//Hank
//Kody
takeLast:通过 takeLast 操作符你可以只发出尾部 n 个元素。并且忽略掉前面的元素。
Observable.of("Hank", "Kody","Cooci", "CC")
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/*
Kody
Cooci
CC
*/
takeWhile:takeWhile 操作符将镜像源 Observable 直到某个元素的判定为 false。此时,这个镜像的 Observable 将立即终止。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 3, 2, 1)
.takeWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
1
2
3
takeUntil:takeUntil 操作符将镜像源 Observable,它同时观测第二个 Observable。一旦第二个 Observable 发出一个元素或者产生一个终止事件,那个镜像的 Observable 将立即终止。
let disposeBag = DisposeBag()
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")
referenceSequence.onNext("🔴")
sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")
//result
next(🐱)
next(🐰)
next(🐶)
completed
skip:skip 操作符可以让你跳过 Observable 中头 n 个元素,只关注后面的元素。
let disposeBag = DisposeBag()
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
🐶
🐸
🐷
🐵
skipUntil:skipUntil 操作符可以让你忽略源 Observable 中头几个元素,直到另一个 Observable 发出一个元素后,它才镜像源 Observable。
let disposeBag = DisposeBag()
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.skipUntil(referenceSequence)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")
referenceSequence.onNext("🔴")
sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")
//result
🐸
🐷
🐵
4、集合
toArray:将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止。
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
//success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
reduce:reduce 操作符将对第一个元素应用一个函数。然后,将结果作为参数填入到第二个元素的应用函数中。以此类推,直到遍历完全部的元素后发出最终结果。
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 1 + 10 + 100 + 1000 = 1111
concat:让两个或多个 Observables 按顺序串连起来。concat 操作符将多个 Observables 按顺序串联起来,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。concat 将等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。如果后一个是“热” Observable ,在它前一个 Observable 产生完成事件前,所产生的元素将不会被发送出来。
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")
let subject = BehaviorSubject(value: subject1)
subject
.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("🍐")
subject1.onNext("🍊")
subject.onNext(subject2)
subject2.onNext("I would be ignored")
subject2.onNext("🐱")
subject1.onCompleted()
subject2.onNext("🐭")
//result
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
5、error
catchErrorJustReturn:操作符会将 error 事件替换成其他的一个元素,然后结束该序列。
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("😊")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
//结果
next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed
catchError:从一个错误事件中恢复,将错误事件替换成一个备选序列。catchError 操作符将会拦截一个 error 事件,将它替换成其他的元素或者一组元素,然后传递给观察者。这样可以使得 Observable 正常结束,或者根本都不需要结束。
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
recoverySequence.onNext("😊")
//结果
next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)
retry:如果源 Observable 产生一个错误事件,重新对它进行订阅,希望它不会再次产生错误。retry 操作符将不会将 error 事件,传递给观察者,然而,它会重新订阅源 Observable,给这个 Observable 一个重试的机会,让它有机会不产生 error 事件。retry 总是对观察者发出 next 事件,即便源序列产生了一个 error 事件,所以这样可能会产生重复的元素。
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("🍎")
observer.onNext("🍐")
observer.onNext("🍊")
if count == 1 {
observer.onError(TestError.test)
print("Error encountered")
count += 1
}
observer.onNext("🐶")
observer.onNext("🐱")
observer.onNext("🐭")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
🐶
🐱
🐭
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("🍎")
observer.onNext("🍐")
observer.onNext("🍊")
if count < 5 {
observer.onError(TestError.test)
print("Error encountered")
count += 1
}
observer.onNext("🐶")
observer.onNext("🐱")
observer.onNext("🐭")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
Unhandled error happened: test
subscription called from:
retry(_:)::通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到 max 未遂计数。
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("Hank")
observer.onNext("Kody")
observer.onNext("CC")
if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
observer.onError(self.lgError)
print("错误序列来了")
count += 1
}
observer.onNext("Lina")
observer.onNext("小雁子")
observer.onNext("婷婷")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
6、流程
debug:打印所有的订阅,事件以及销毁信息。
let disposeBag = DisposeBag()
let sequence = Observable<String>.create { observer in
observer.onNext("🍎")
observer.onNext("🍐")
observer.onCompleted()
return Disposables.create()
}
sequence
.debug("Fruit")
.subscribe()
.disposed(by: disposeBag)
//result
Fruit -> subscribed
Fruit -> Event next(🍎)
Fruit -> Event next(🍐)
Fruit -> Event completed
Fruit -> isDisposed
RxSwift.Resources.total::提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用。
print(RxSwift.Resources.total)
let subject = BehaviorSubject(value: "Cooci")
let subscription1 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
let subscription2 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
subscription1.dispose()
print(RxSwift.Resources.total)
subscription2.dispose()
print(RxSwift.Resources.total)
7、链接
multicast:将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
func testMulticastConnectOperators(){
print("*****multicast*****")
let subject = PublishSubject<Any>()
subject.subscribe{print("00:\($0)")}
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
// 所以在订阅一次 会出现什么问题?
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
}
分析:
底层逻辑探索中间变量 ConnectableObservableAdapter 保存了源序列 source、中间序列 makeSubject 。
订阅流程 self.lazySubject.subscribe(observer) 一个懒加载的序列,保证了中间变量 ConnectableObservableAdapter 每一次都是同一个响应序列。
剩下就是 PublishSubject 的订阅效果。
完事等待源序列的响应,但是我们的源序列的订阅是在 connect 函数里面!如果没有调用 connect 函数,意味着就永远不会发送响应。这样背后的逻辑就是,前面所以的发送响应在 connect 函数之前的都没有任何的意义!
以上也就说明了我们的 publish 就是状态共享的:connnect 一次我们序列发送一次响应(响应所有订阅)。
replay:确保观察者接收到同样的序列,即使是在 Observable 发出元素后才订阅。replay 操作符将 Observable 转换为可被连接的 Observable,并且这个可被连接的 Observable 将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。
func testReplayConnectOperators(){
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
interval.subscribe(onNext: { print(Date.time,"订阅: 1, 事件: \($0)") })
.disposed(by: self.disposeBag)
delay(2) { _ = interval.connect() }
delay(4) {
interval.subscribe(onNext: { print(Date.time,"订阅: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(8) {
interval.subscribe(onNext: { print(Date.time,"订阅: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
delay(20, closure: {
self.disposeBag = DisposeBag()
})
/**
订阅: 1, 事件: 4
订阅: 1, 事件: 0
2019-05-28 21-32-42 订阅: 2, 事件: 0
2019-05-28 21-32-42 订阅: 1, 事件: 1
2019-05-28 21-32-42 订阅: 2, 事件: 1
2019-05-28 21-32-45 订阅: 2, 事件: 4
2019-05-28 21-32-46 订阅: 3, 事件: 0
2019-05-28 21-32-46 订阅: 3, 事件: 1
2019-05-28 21-32-46 订阅: 3, 事件: 2
2019-05-28 21-32-46 订阅: 3, 事件: 3
2019-05-28 21-32-46 订阅: 3, 事件: 4
// 序列从 0开始
// 定时器也没有断层 sub2 sub3 和 sub1 是同步的
*/
}
4、其他方法
amb:在多个源 Observables 中, 取第一个发出元素或产生事件的 Observable,然后只发出 这个 Observable 中的元素。当你传入多个 Observables 到 amb 操作符时,它将取其中一个 Observable:第一个产生事件的那个 Observable,可以是一个 next,error 或者 completed 事件。 amb 将忽略掉其他的 Observables。
buffer:缓存元素,然后将缓存的元素集合,周期性的发出来。buffer 操作符将缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。
concatMap:将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 串连起来。concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")
let subject = BehaviorSubject(value: subject1)
subject.asObservable()
.concatMap { $0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("🍐")
subject1.onNext("🍊")
subject.onNext(subject2)
subject2.onNext("I would be ignored")
subject2.onNext("🐱")
subject1.onCompleted()
subject2.onNext("🐭")
//result
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
connect:通知 ConnectableObservable 可以开始发出元素了。ConnectableObservable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以等所有观察者全部订阅完成后,才发出元素。
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.publish()
_ = intSequence
.subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
_ = intSequence
.subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
_ = intSequence
.subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}
//result
Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...
debounce:元素发出频率过高时会被过滤。
过滤掉高频产生的元素。debounce 操作符将发出这种元素,在 Observable
产生这种元素后,一段时间内没有新元素产生。
delay:将 Observable 的每一个元素拖延一段时间后发出。delay 操作符将修改一个 Observable,它会将 Observable 的所有元素都拖延一段设定好的时间, 然后才将它们发送出来。
delaySubscription:进行延时订阅。delaySubscription 操作符将在经过所设定的时间后,才对 Observable 进行订阅操作。
dematerialize:dematerialize 操作符将 materialize 转换后的元素还原。
do:当 Observable 的某些事件产生时,你可以使用 do 操作符来注册一些回调操作。这些回调会被单独调用,它们会和 Observable 原本的回调分离。
groupBy:将源 Observable 分解为多个子 Observable,并且每个子 Observable 将源 Observable 中“相似”的元素发送出来。
ignoreElements:忽略掉所有的元素,只发出 error 或 completed 事件。ignoreElements 操作符将阻止 Observable 发出 next 事件,但是允许他发出 error 或 completed 事件。如果你并不关心 Observable 的任何元素,你只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符。
materialize:将序列产生的事件,转换成元素。通常,一个有限的 Observable 将产生零个或者多个 onNext 事件,然后产生一个 onCompleted 或者 onError 事件。materialize 操作符将 Observable 产生的这些事件全部转换成元素,然后发送出来。
observeOn:指定 Observable 在那个 Scheduler 发出通知。ReactiveX 使用 Scheduler 来让 Observable
支持多线程。使用 observeOn 操作符,来指示 Observable
在哪个 Scheduler 发出通知。
注:一旦产生了 onError 事件, observeOn 操作符将立即转发。他不会等待 onError 之前的事件全部被收到。这意味着 onError 事件可能会跳过一些元素提前发送出去。
publish:publish 会将 Observable
转换为可被连接的 Observable
。可被连接的 Observable
和普通的 Observable
十分相似,不过在被订阅后不会发出元素,直到 connect
操作符被应用为止。这样一来你可以控制 Observable
在什么时候开始发出元素。
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.publish()
_ = intSequence
.subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
_ = intSequence
.subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
_ = intSequence
.subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}
//result
Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...
refCount:将可被连接的 Observable 转换为普通 Observable。
可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以控制 Observable 在什么时候开始发出元素。
refCount 操作符将自动连接和断开可被连接的 Observable。它将可被连接的 Observable 转换为普通 Observable。当第一个观察者对它订阅时,那么底层的 Observable 将被连接。当最后一个观察者离开时,那么底层的 Observable 将被断开连接。
sample:sample 操作符将不定期的对源 Observable
进行取样操作。通过第二个 Observable
来控制取样时机。一旦第二个 Observable
发出一个元素,就从源 Observable
中取出最后产生的元素。
shareReplay:使观察者共享 Observable,观察者会立即收到最新的元素,即使这些元素是在订阅前产生的。shareReplay 操作符将使得观察者共享源 Observable
,并且缓存最新的 n 个元素,将这些元素直接发送给新的观察者。
skipWhile:skipWhile 操作符可以让你忽略源 Observable 中头几个元素,直到元素的判定为否后,它才镜像源 Observable。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 3, 2, 1)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//result
4
3
2
1
subscribeOn:指定 Observable 在那个 Scheduler 执行。
(observeOn 操作符非常相似。它指示 Observable
在哪个 Scheduler 发出通知)
timeout:如果 Observable
在一段时间内没有产生元素,timeout 操作符将使它发出一个 error
事件。
using:通过使用 using 操作符创建 Observable
时,同时创建一个可被清除的资源,一旦 Observable
终止了,那么这个资源就会被清除掉了。
window:将 Observable 分解为多个子 Observable,周期性的将子 Observable 发出来。 window 操作符和 buffer 十分相似,buffer 周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable
的形态发送出来。buffer 要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。
withLatestFrom:withLatestFrom 操作符将两个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。当第一个 Observable 发出一个元素时,就立即取出第二个 Observable 中最新的元素,通过一个组合函数将两个最新的元素合并后发送出去。
let disposeBag = DisposeBag()
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()
firstSubject
.withLatestFrom(secondSubject)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
firstSubject.onNext("🅰️")
firstSubject.onNext("🅱️")
secondSubject.onNext("1")
secondSubject.onNext("2")
firstSubject.onNext("🆎")
//result
2
let disposeBag = DisposeBag()
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()
firstSubject
.withLatestFrom(secondSubject) {
(first, second) in
return first + second
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
firstSubject.onNext("🅰️")
firstSubject.onNext("🅱️")
secondSubject.onNext("1")
secondSubject.onNext("2")
firstSubject.onNext("🆎")
//result
🆎2
5、应用
1、UISlider 监听
slider.rx.value.asObservable()
.subscribe(onNext: {
print("当前值为:\($0)")
})
.disposed(by: disposeBag)
注:任何对象通过 asObservable 都可以实现监听,如:collectionView.mj_footer!.rx.refreshing.asObservable()
2、列表刷新
let item = Observable<[SocilaModel]>.just(self.dataArr!)
//或
let item = Observable.from(optional: self.dataArr!)
item.bind(to: self.listTab.rx.items) { (tableView, row, element) in
let cell = tableView.dequeueReusableCell(withIdentifier: "cell")
as! SecondSocialTableViewCell
cell.leftImageView.kf.setImage(with: URL(string: element.image_list ?? ""))
cell.titleLabel.text = element.theme_name
cell.descripLabel.text = element.info
//cell中按钮点击事件订阅
cell.RXButton.rx.tap.asDriver()
.drive(onNext: { [weak self] in
self?.showAlert(title: "\(row)", message: "哈哈哈")
}).disposed(by: cell.disposeBag)
return cell
}
.disposed(by: self.disposeBag)
}
}
3、按钮事件
lazy var bag: DisposeBag = DisposeBag()
button.rx.tap.subscribe { event in
print("按钮点击")
}.disposed(by: bag)
4、监听 UITextfield
//on 方法
tf.rx.text.subscribe { event in
print("输入变化:\(event.element!!)")
}.disposed(by: bag)
//onNext 方法
tf.rx.text.subscribe (onNext: {string in
print("输入变化:\(string!)")
}).disposed(by: bag)
//绑定数据
tf.rx.text.bind(to: label.rx.text).disposed(by: bag)
//多个 textField 监听,改变 button 显示
Observable.combineLatest(phoneTextField.rx.text, passwordCodeTextField.rx.text)
.map({ (userName, password) in
if (userName?.count ?? 0) >= 11 && (password?.count ?? 0) >= 4 {
return CGFloat(1)
}
return CGFloat(0.2)
})
.bind(to: loginButton.rx.alpha)
.disposed(by: disposeBag)
5、监听 label 属性
label.rx.observe(String.self, "text").subscribe { (str: String?) in
print("str")
}.disposed(by: bag)
label.rx.observe(CGRect.self, "frame").subscribe { (rect: CGRect?) in
print("rect")
}.disposed(by: bag)
6、UIScrollView
scrollView.rx.contentOffset.subscribe(onNext: { point in
print("滚动偏移:\(point.element!)")
}).disposed(by: bag)
7、网络请求
//MARK: - RxSwift应用-网络请求
func setupNextwork() {
let url = URL(string: "https://www.baidu.com")
URLSession.shared.rx.response(request: URLRequest(url: url!))
.subscribe(onNext: { (response, data) in
print("response ==== \(response)")
print("data ===== \(data)")
}, onError: { (error) in
print("error ===== \(error)")
}).disposed(by: disposeBag)
}
8、通知
NotificationCenter.default.rx
.notification(.UIApplicationWillEnterForeground)
.subscribe(onNext: { (notification) in
print("Application Will Enter Foreground")
})
.disposed(by: disposeBag)
9、KVO
// 系统KVO 还是比较麻烦的
// person.addObserver(self, forKeyPath: "name", options: .new, context: nil)
person.rx.observeWeakly(String.self, "name").subscribe(onNext: { (change) in
print(change ?? "helloword")
}).disposed(by: disposeBag)
10、手势事件
tap.rx.event.subscribe { (event) in
print("点了label")
}.disposed(by: disposeBag)
11、定时器
//MARK: - RxSwift应用-timer定时器
func setupTimer() {
timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
timer.subscribe(onNext: { (num) in
print("hello word \(num)")
}).disposed(by: disposeBag)
}
12、监听事件的生命周期
通过 doOn 方法来监听事件的生命周期,它会在每一次事件发送前被调用,同时也可以通过不同 block 回调处理不同类型的 event。
let observable = Observable.of("A", "B", "C")
observable
.do(onNext: { element in
print("Intercepted Next:", element)
}, onError: { error in
print("Intercepted Error:", error)
}, onCompleted: {
print("Intercepted Completed")
}, onDispose: {
print("Intercepted Disposed")
})
.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})