目的
gitHub地址: https://github.com/ReactiveX/RxSwift
RxSwift的目的是让数据/事件流和异步任务能够更方便的序列化处理, 能够使用Swift进行响应式编程。
RxSwift做了什么
RxSwift把我们程序中每一个操作都看成一个事件,比如一个TextField中的文本改变,一个按钮被点击,或者一个网络请求结束等,每一个事件源就可以看成一个管道,也就是sequence,比如TextField,当我们改变里面的文本的时候,这个TextField就会不断的发出事件,从他的这个sequence中不断的流出,我们只需要监听这个sequence,每流出一个事件就做相应的处理。同理,Button也是一个sequence,每点击一次就流出一个事件。
RxSwift的核心思想是 Observable
sequence,observable表示可监听或者可观察,也就是说RxSwift的核心思想是可监听的序列。并且Observable sequence可以接受异步信号,也就是说,信号是可以异步给监听者的,Observable(ObservableType) 和 SequenceType类似,ObservableType.subscribe 和 SequenceType.generate类似,由于RxSwift支持异步获得信号,所以用ObservableType.subscribe,这和indexGenerator.next()类似其中SequenceType是Swift(2.3以前版本,之后的版本没有该协议)中的一个协议,比如Swift中的Array就遵循这个协议,通过这个协议,你可以这样的去操作一个Array
let array = [1, 2, 3, 4, 5]
let array2 = array.filter({$0 > 1}).map({$0 * 2})//4 6 8 10
var indexGenerator = array2.generate()
let fisrt = indexGenerator.next() // 4
let seoncd = indexGenerator.next() //6
RxSwift中,ObservableType.subscribe的回调(新的信号到来)一共有三个
enum Event<Element> {
case Next(Element) // 新的信号到来
case Error(ErrorType) // 信号发生错误,序列不会再产生信号
case Completed // 序列发送信号完成,不会再产生新的信号
}
Observable分为两种
1 在有限的时间内会自动结束(Completed/Error),比如一个网络请求当作一个序列,当网络请求完成的时候,Observable自动结束,资源会被释放。
2 信号不会自己结束,最简单的比如一个Timer,每隔一段时间发送一个新的信号过来,这时候需要手动取消监听,来释放相应的资源,比如一个label.rx.text是一个Obserable,通常需要这样调用addDisposableTo(disposeBag)来让其在deinit,也就是所有者要释放的时候,自动取消监听。
class Observable<Element> {
func subscribe(observer: Observer<Element>) -> Disposable //调用Disposable的方法来取消
}
当然,除了手动释放,RxSwift提供了一些操作符,比如 takeUntil来根据条件取消
sequence .takeUntil(self.rx__deallocated) //当对象要释放的时候,取消监听 .subscribe { print($0) }
Operator运算符相关
never
never就是创建一个sequence,但是不发出任何事件信号。
Observable<String>.never().subscribe { _ in
print("This will never be printed")
}.disposed(by: disposeBag)
empty
empty就是创建一个空的sequence,只能发出一个complected事件。
Observable<Int>.empty().subscribe { event in
let text = "operator: empty__\(event)"
self.showText(text: text)
print(text)
}.disposed(by: disposeBag)
create
我们也可以自定义可观察的sequence,那就是使用create。
create操作符传入一个观察者observer,然后调用observer的onNext,onCompleted和onError方法。返回一个可观察的obserable序列。
无参创建create
Observable<Any>.create { (observal:AnyObserver<Any>) -> Disposable in
observal.onNext("abc")
observal.onNext("12")
observal.onCompleted()
return Disposables.create()
}.subscribe(onNext: { str in
let text = "operator: create__\(str)"
self.showText(text: text)
print(text)
}).disposed(by: disposeBag)
添加参数创建create
func createObservable(element:String) -> Observable<String> {
return Observable.create { (observal:AnyObserver<String>) -> Disposable in
observal.onNext(element)
observal.onCompleted()
return Disposables.create()
}
}
createObservable(element: "element").subscribe(onNext: { str in
let text = "operator: create with element__\(str)"
self.showText(text: text)
print(text)
}).disposed(by: disposeBag)
deferred
延时创建Observable对象,当subscribe的时候才去创建,它为每一个Observer创建一个新的Observable,也就是说每个订阅者订阅的对象都是内容相同但是完全独立的序列。
deferr采用一个Factory函数型作为参数,Factory函数返回的是Observable类型。这也是其延时创建Observable的主要实现。
let defObservable = Observable<String>.deferred { () -> Observable<String> in
return Observable.create { observer -> Disposable in
observer.onNext("defObservable create")
observer.onCompleted()
return Disposables.create()
}
}
defObservable.subscribe { event in
let text = "operator: deferred one__\(event)"
self.showText(text: text)
print(text)
}.disposed(by: disposeBag)
defObservable.subscribe { event in
let text = "operator: deferred two__\(event)"
self.showText(text: text)
print(text)
}.disposed(by: disposeBag)
/// deferred 延迟创建的例子
var value1:String?
let observable1 = Observable<String>.from(optional: value1)
value1 = "Darren1"
observable1.subscribe { event in
/// 只打印出 completed
/// 并没有像我们想象中的那样也会打印出 onNext事件,这个是为什么呢?因为在我们订阅的时候,数据未必已经初始化完成
let text = "operator: deferred__\(event)"
self.showText(text: text)
print(text)
}.disposed(by: disposeBag)
/// 把这个例子使用defer重新测试一下
var value2:String?
let observable2 = Observable<String>.deferred { () -> Observable<String> in
return Observable<String>.from(optional: value2)
}
value2 = "Darren2"
observable2.subscribe { event in
let text = "operator: deferred__\(event)"
self.showText(text: text)
/// next(Darren2) completed
print(text)
}.disposed(by: disposeBag)
of/from
这两个方法都是把一个对象或者数据转换为可观察序列,这在使用Swift中的SequenceType时很有用。
Observable<String>.of("hello", "RxSwift").subscribe { event in
let text = "operator: of__\(event)"
self.showText(text: text)
/// next(hello) next(RxSwift) completed
print(text)
}.disposed(by: disposeBag)
Observable<String>.from(["hello", "RxSwift"]).subscribe { event in
let text = "operator: from__\(event)"
self.showText(text: text)
/// next(hello) next(RxSwift) completed
print(text)
}.disposed(by: disposeBag)
just
将一个对象或者一个Sequence转换为 一个可观察序列,请注意这里与From是完全不相同的:from是转换为一个或者多个可观察序列(这取决于你是要将一个还是一个序列进行转换)。也就是说just只能包含一个观察序列,请注意与上面例子结果进行对比。
Observable<Array<String>>.just(["hello", "RxSwift"]).subscribe { event in
let text = "operator: just__\(event)"
self.showText(text: text)
/// next(["hello", "RxSwift"]) completed
print(text)
}.disposed(by: disposeBag)
range
给定范围,依次显示,就是创建一个sequence,他会发出这个范围中的从开始到结束的所有事件,Observable必须指定数据类型。
Observable<Int>.range(start: 1, count: 4).subscribe(onNext: { value in
let text = "operator: range__\(value)"
self.showText(text: text)
print(text)
}).disposed(by: disposeBag)
interval
创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)。
Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).take(3).subscribe { event in
let text = "operator: interval__\(event)"
self.showText(text: text)
/// operator: interval__next(0)
/// operator: interval__next(1)
/// operator: interval__next(2)
/// operator: interval__completed
print(text)
}.disposed(by: disposeBag)
timer
在指定的时间后,发送一个特定的item (E -> Int/NSInteger),请注意这里与interval的区别(interval是发送一系列特定item,而timer只会发送一个)。
Observable<Int>.timer(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
let text = "operator: timer__\(event)"
self.showText(text: text)
/// operator: timer__next(0)
/// operator: timer__completed
print(text)
}.disposed(by: disposeBag)
repeatElement
创建一个sequence,发出特定的事件n次。
/// 如果没有.take(3), 就会一直执行下去
Observable.repeatElement("darren").take(3).subscribe(onNext: { value in
let text = "operator: repeat__\(value)"
self.showText(text: text)
print(text)
}).disposed(by: disposeBag)
generate
类似于for循环,创建一个可观察sequence,当初始化的条件为true的时候,他就会发出所对应的事件。
Observable.generate(initialState: 0) {
$0 < 5
} iterate: {
$0 + 2
}.subscribe(onNext: { value in
let text = "operator: generate__\(value)"
self.showText(text: text)
print(text)
}).disposed(by: disposeBag)
error
发出错误信号,创建一个可观察序列,但不发出任何正常的事件,只发出error事件并结束。
let error = NSError(domain: "error", code: 10, userInfo: ["This is error" : "xxxxxx"]) as Error
Observable<Any>.error(error).subscribe(onNext: { value in
let text = "operator: error__\(value)"
self.showText(text: text)
print(text)
}).disposed(by: disposeBag)
Transform 变换相关
buffer
定期的将需要发射的items随机到一个buffer的包中,分批次的发射这些包,而不是一次发射一个item:例如你有[1, 2, 3, 4] ,你可以一次发射一个,也可以一次发射两个item或者三个。
/// 一次发射1个Item事件
Observable<Int>.of(1, 2, 3, 4).buffer(timeSpan: RxTimeInterval.seconds(1), count: 1, scheduler: MainScheduler.instance).subscribe { event in
let text = "transform: buffer__\(event)"
self.showText(text: text)
/// transform: buffer__next([1])
/// transform: buffer__next([2])
/// transform: buffer__next([3])
/// transform: buffer__next([4])
/// transform: buffer__next([])
/// transform: buffer__completed
print(text)
}.disposed(by: disposeBag)
/// 一次发射3个Item事件
Observable<Int>.of(1, 2, 3, 4).buffer(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { event in
let text = "transform: buffer__\(event)"
self.showText(text: text)
/// transform: buffer__next([1, 2, 3])
/// transform: buffer__next([4])
/// transform: buffer__completed
print(text)
}.disposed(by: disposeBag)
window
与buffer类似,但是每次发射的不是item,而是Observables序列(请注意与buffer的结果比较)。
Observable<Int>.of(1, 2, 3, 4).window(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { event in
let text = "transform: window__\(event)"
self.showText(text: text)
/// transform: window__next(RxSwift.AddRef<Swift.Int>)
/// transform: window__next(RxSwift.AddRef<Swift.Int>)
/// transform: window__completed
print(text)
}.disposed(by: disposeBag)
flatMap
将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
这个方法是很有用的,例如当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。
/// 我需要在每一个Item后跟一个新的Item叫做RxSwift
Observable<Int>.of(0, 1, 2).flatMap { (element:Int) -> Observable<String> in
return Observable<String>.of("\(element)", "RxSwift")
}.subscribe { event in
let text = "transform: flatMap__\(event)"
self.showText(text: text)
/// transform: flatMap__next(0)
/// transform: flatMap__next(RxSwift)
/// transform: flatMap__next(1)
/// transform: flatMap__next(RxSwift)
/// transform: flatMap__next(2)
/// transform: flatMap__next(RxSwift)
/// transform: flatMap__completed
print(text)
}.disposed(by: disposeBag)
groupBy
将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列。
/// 我需要将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5).groupBy { (element) -> String in
return element % 2 == 0 ? "偶数" : "奇数"
}.subscribe { event in
switch event {
case .next(let group):
group.asObservable().subscribe { event in
let text = "transform: groupBy__key: \(group.key) \(event)"
self.showText(text: text)
/// transform: groupBy__ key: 偶数 next(0)
/// transform: groupBy__ key: 奇数 next(1)
/// transform: groupBy__ key: 偶数 next(2)
/// transform: groupBy__ key: 奇数 next(3)
/// transform: groupBy__ key: 偶数 next(4)
/// transform: groupBy__ key: 奇数 next(5)
/// transform: groupBy__ key: 偶数 completed
/// transform: groupBy__ key: 奇数 completed
print(text)
}.disposed(by: self.disposeBag)
default:break
}
}.disposed(by: disposeBag)
map
通过一个闭包将原来的序列转换为一个新序列的操作。
Observable<Int>.of(1, 2, 3).map { return "hello " + "\($0)" }.subscribe { event in
let text = "transform: map__\(event)"
self.showText(text: text)
/// transform: map__next(hello 1)
/// transform: map__next(hello 2)
/// transform: map__next(hello 3)
/// transform: map__completed
print(text)
}.disposed(by: disposeBag)
scan
从字面意思可以看出是扫描,也就是说该方法会给出一个初始值(seed),每次通过一个函数将上一次的结果与序列中的Item进行处理,每处理完成都会发射.next事件。
Observable<String>.of("Rx", "Swift").scan("hello ") { acum, element in
return acum + element
}.subscribe { event in
let text = "transform: scan__\(event)"
self.showText(text: text)
/// transform: scan__next(hello Rx)
/// transform: scan__next(hello RxSwift)
/// transform: scan__completed
print(text)
}.disposed(by: disposeBag)
reduce
与上述scan类似,都是初始一个seed,每次通过函数将上一次的结果与序列中的item进行处理,但是唯一不同的一点是,只会在最后发射一次.next事件,将其拿来作数学计算很有用,这个我们将会在后面讲到 (请注意与上述scan的结果比较)。
Observable<String>.of("Rx", "Swift").reduce("hello ") { acum, element in
return acum + element
}.subscribe { event in
let text = "transform: reduce__\(event)"
self.showText(text: text)
/// transform: reduce__next(hello RxSwift)
/// transform: reduce__completed
print(text)
}.disposed(by: disposeBag)
Filter过滤器相关
debounce
在规定的时间内过滤item。
Observable<Int>.of(1, 2, 3, 4).debounce(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
let text = "filter: debounce__\(event)"
self.showText(text: text)
/// filter: debounce__next(4)
/// filter: debounce__completed
print(text)
}.disposed(by: disposeBag)
distinctUntilChanged
过滤掉可观察到的重复item,表示如果发射的事件与上一次不相同那么才会发射此次事件。
Observable<Int>.of(1, 2, 2, 2, 3).distinctUntilChanged().subscribe { event in
let text = "filter: distinctUntilChanged__\(event)"
self.showText(text: text)
/// filter: distinctUntilChanged__next(1)
/// filter: distinctUntilChanged__next(2)
/// filter: distinctUntilChanged__next(3)
/// filter: distinctUntilChanged__completed
print(text)
}.disposed(by: disposeBag)
elementAt
发射第 n个item。
Observable<Int>.of(1, 2, 3, 4, 5).element(at: 3).subscribe { event in
let text = "filter: elementAt__\(event)"
self.showText(text: text)
/// filter: elementAt__next(4)
/// filter: elementAt__completed
print(text)
}.disposed(by: disposeBag)
filter
仅发射谓词测试通过的Items。
Observable<Int>.of(9, 10, 11, 12).filter { element -> Bool in
return element > 10
}.subscribe { event in
let text = "filter: filter__\(event)"
self.showText(text: text)
/// filter: filter__next(11)
/// filter: filter__next(12)
/// filter: filter__completed
print(text)
}.disposed(by: disposeBag)
skip
发射第n(包含n)之后的items。
Observable<Int>.of(9, 10, 11, 12).skip(2).subscribe { event in
let text = "filter: skip__\(event)"
self.showText(text: text)
/// filter: skip__next(11)
/// filter: skip__next(12)
/// filter: skip__completed
print(text)
}.disposed(by: disposeBag)
take
发射第n(不包含n)之前的items,与skip相反效果。
Observable<Int>.of(9, 10, 11, 12).take(2).subscribe { event in
let text = "filter: take__\(event)"
self.showText(text: text)
/// filter: take__next(9)
/// filter: take__next(10)
/// filter: take__completed
print(text)
}.disposed(by: disposeBag)
takeLast
发射第n(包含n)之后的items,与skip相同效果。
Observable<Int>.of(9, 10, 11, 12).takeLast(2).subscribe { event in
let text = "filter: takeLast__\(event)"
self.showText(text: text)
/// filter: takeLast__next(11)
/// filter: takeLast__next(12)
/// filter: takeLast__completed
print(text)
}.disposed(by: disposeBag)
Combine结合相关
merge
将多个序列的items合并为一个序列的items。
let observable1 = Observable<Int>.of(1, 3, 5, 7, 9)
let observable2 = Observable<Int>.of(2, 4, 6)
Observable<Int>.merge(observable1, observable2).subscribe { event in
let text = "combine: merge__\(event)"
self.showText(text: text)
/// combine: merge__next(1)
/// combine: merge__next(2)
/// combine: merge__next(3)
/// combine: merge__next(4)
/// combine: merge__next(5)
/// combine: merge__next(6)
/// combine: merge__next(7)
/// combine: merge__next(9)
/// combine: merge__completed
print(text)
}.disposed(by: disposeBag)
startWith
在发射序列items前新增一个item。
Observable<String>.of(" ", "RxSwift", "!").startWith("hello").reduce("") { (accum, element) -> String in
return accum + element
}.subscribe { event in
let text = "combine: startWith__\(event)"
self.showText(text: text)
/// combine: startWith__next(hello RxSwift!)
/// combine: startWith__completed
print(text)
}.disposed(by: disposeBag)
zip
将多个序列的items进行一一合并,但是需要注意的是,它会等到item对其后合并,未对齐的会舍弃。
let observable3 = Observable<Int>.of(1, 2, 3, 4, 5, 6, 7)
let observable4 = Observable<String>.of("A", "B", "C", "D")
Observable<String>.zip(observable3, observable4) { (e1:Int, e2:String) -> String in
return "\(e1)\(e2)"
}.subscribe { event in
let text = "combine: zip__\(event)"
self.showText(text: text)
/// combine: zip__next(1A)
/// combine: zip__next(2B)
/// combine: zip__next(3C)
/// combine: zip__next(4D)
/// combine: zip__completed
print(text)
}.disposed(by: disposeBag)
combineLatest
如果存在两条事件队列,需要同时监听,那么每当有新的事件发生的时候,combineLatest 会将每个队列的最新的一个元素进行合并。
类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。
combineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,combineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
let observable5 = Observable<Int>.of(1, 2, 3, 4, 5)
let observable6 = Observable<String>.of("A", "B", "C", "D")
Observable<String>.combineLatest(observable5, observable6) { (e1:Int, e2:String) -> String in
return "\(e1)\(e2)"
}.subscribe { event in
let text = "combine: combineLatest__\(event)"
self.showText(text: text)
/// combine: combineLatest__next(1A)
/// combine: combineLatest__next(2A)
/// combine: combineLatest__next(2B)
/// combine: combineLatest__next(3B)
/// combine: combineLatest__next(3C)
/// combine: combineLatest__next(4C)
/// combine: combineLatest__next(4D)
/// combine: combineLatest__next(5D)
/// combine: combineLatest__completed
print(text)
}.disposed(by: disposeBag)
Observable<String>.combineLatest(observable6, observable5) { (e1:String, e2:Int) -> String in
return "\(e1)\(e2)"
}.subscribe { event in
let text = "combine: combineLatest__\(event)"
self.showText(text: text)
/// combine: combineLatest__next(A1)
/// combine: combineLatest__next(B1)
/// combine: combineLatest__next(B2)
/// combine: combineLatest__next(C2)
/// combine: combineLatest__next(C3)
/// combine: combineLatest__next(D3)
/// combine: combineLatest__next(D4)
/// combine: combineLatest__next(D5)
/// combine: combineLatest__completed
print(text)
}.disposed(by: disposeBag)
Error错误处理相关
catch
在收到序列的异常事件时,通过返回另一个序列来持续发送非error事件。
Observable<UInt8>.create { observer in
observer.onNext(0)
observer.onError(NSError(domain: "error", code: 110, userInfo: nil))
return Disposables.create()
}.catch { error -> Observable<UInt8> in
let text = "combine: catch error__\(error)"
self.showText(text: text)
/// combine: catch error__Error Domain=error Code=110 "(null)"
print(text)
return Observable<UInt8>.of(1, 2)
}.subscribe { event in
let text = "combine: catch__\(event)"
self.showText(text: text)
/// combine: catch__next(0)
/// combine: catch__next(1)
/// combine: catch__next(2)
print(text)
}.disposed(by: disposeBag)
retry
出现错误事件后,重新发送所有事件信息。
Observable<UInt8>.create { observer in
observer.onNext(0)
observer.onError(NSError(domain: "error", code: 110, userInfo: nil))
return Disposables.create()
}.retry(3).subscribe { event in
let text = "combine: retry__\(event)"
self.showText(text: text)
/// combine: retry__next(0)
/// combine: retry__next(0)
/// combine: retry__next(0)
/// combine: retry__error(Error Domain=error Code=110 "(null)")
print(text)
}.disposed(by: disposeBag)
PracticalOperation实用操作相关
delay
延迟发射事件。
let text = "practicalOperation: delay__start time: \(Date())"
showText(text: text)
/// practicalOperation: delay__start time: 2021-03-14 09:38:36 +0000
print(text)
Observable<Int>.of(1, 2).delay(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
if event.isCompleted {
let text = "practicalOperation: delay__end time: \(Date())"
self.showText(text: text)
/// practicalOperation: delay__end time: 2021-03-14 09:38:37 +0000
print(text)
}
let text = "practicalOperation: delay__\(event)"
self.showText(text: text)
/// practicalOperation: delay__next(1)
/// practicalOperation: delay__next(2)
/// practicalOperation: delay__completed
print(text)
}.disposed(by: disposeBag)
do
在一个序列的每个事件执行之前添加一个执行动作。
Observable<Int>.of(1, 2, 3).do { _ in
let text = "practicalOperation: do__previous next"
self.showText(text: text)
print(text)
} onError: { _ in
let text = "practicalOperation: do__previous error"
self.showText(text: text)
print(text)
} onCompleted: {
let text = "practicalOperation: do__previous complete"
self.showText(text: text)
print(text)
}.subscribe { event in
let text = "practicalOperation: do__\(event)"
self.showText(text: text)
/// practicalOperation: do__previous next
/// practicalOperation: do__next(1)
/// practicalOperation: do__previous next
/// practicalOperation: do__next(2)
/// practicalOperation: do__previous next
/// practicalOperation: do__next(3)
/// practicalOperation: do__previous complete
/// practicalOperation: do__completed
print(text)
}.disposed(by: disposeBag)
observeOn
observer在指定scheduler中观察序列事件。
Observable<Int>.of(1).observe(on: ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "test"))).subscribe { event in
let text = "practicalOperation: observeOn__isMainThread: \(Thread.current.isMainThread) \(event)"
DispatchQueue.main.async {
self.showText(text: text)
}
/// practicalOperation: observeOn__isMainThread: false next(1)
/// practicalOperation: observeOn__isMainThread: false completed
print(text)
}.disposed(by: disposeBag)
subscribeOn
在指定的scheduler中操作,参考observeOn。
Observable<Int>.of(1).subscribe(on: MainScheduler.instance).subscribe { event in
let text = "practicalOperation: subscribeOn__isMainThread: \(Thread.current.isMainThread) \(event)"
self.showText(text: text)
/// practicalOperation: subscribeOn__isMainThread: true next(1)
/// practicalOperation: subscribeOn__isMainThread: true completed
print(text)
}.disposed(by: disposeBag)
timeout
一个序列在指定时间内未发射完成所有事件,那么将会进入.onError。
Observable<Int>.of(1).delay(RxTimeInterval.seconds(2), scheduler: MainScheduler.instance).timeout(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
let text = "practicalOperation: timeout__\(event)"
self.showText(text: text)
/// practicalOperation: timeout__error(Sequence timeout.)
print(text)
}.disposed(by: disposeBag)
ifEmpty
如果是序列中没有任何item,那么给定一个default。
Observable<Int>.empty().ifEmpty(default: 0).subscribe { event in
let text = "practicalOperation: defaultIfEmpty__\(event)"
self.showText(text: text)
/// practicalOperation: defaultIfEmpty__next(0)
/// practicalOperation: defaultIfEmpty__completed
print(text)
}.disposed(by: disposeBag)
skipUntil
丢弃掉第一个序列的所有items,直到第二个序列的item出现。
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence.skip(until: referenceSequence).subscribe { event in
let text = "practicalOperation: skipUntil__\(event)"
self.showText(text: text)
/// practicalOperation: skipUntil__next(4)
/// practicalOperation: skipUntil__next(5)
/// practicalOperation: skipUntil__completed
print(text)
}.disposed(by: disposeBag)
sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
referenceSequence.onNext("1")
sourceSequence.onNext("4")
sourceSequence.onNext("5")
/// 必须调用onCompleted, 不然返回, 所在控制器, 不会被释放
sourceSequence.onCompleted()
skipWhile
丢弃掉所有的items,直到满足某个不满足条件的item出现。
Observable<String>.of("AD", "BD", "CD").skip { element -> Bool in
return element.contains("A")
}.subscribe { event in
let text = "practicalOperation: skipWhile__\(event)"
self.showText(text: text)
/// practicalOperation: skipWhile__next(BD)
/// practicalOperation: skipWhile__next(CD)
/// practicalOperation: skipWhile__completed
print(text)
}.disposed(by: disposeBag)
takeUntil
取得第一个序列所有items,直到第二个序列发射item或者终止。
Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9).take(until: { element -> Bool in
return element > 4
}).subscribe { event in
let text = "practicalOperation: takeUntil__\(event)"
self.showText(text: text)
/// practicalOperation: takeUntil__next(1)
/// practicalOperation: takeUntil__next(2)
/// practicalOperation: takeUntil__next(3)
/// practicalOperation: takeUntil__next(4)
/// practicalOperation: takeUntil__completed
print(text)
}.disposed(by: disposeBag)
takeWhile
取得第一个序列的所有items,直到出现不满足条件的item (请仔细体会与skipWhile的不同之处)。
Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9).take(while: { element -> Bool in
return element < 5
}).subscribe { event in
let text = "practicalOperation: takeWhile__\(event)"
self.showText(text: text)
/// practicalOperation: takeWhile__next(1)
/// practicalOperation: takeWhile__next(2)
/// practicalOperation: takeWhile__next(3)
/// practicalOperation: takeWhile__next(4)
/// practicalOperation: takeWhile__completed
print(text)
}.disposed(by: disposeBag)
Observer观察者&Observable被观察者相关
AnyObserver
let observer1 = AnyObserver<Int> { event in
let text = "observer: anyObserver__\(event)"
self.showText(text: text)
/// observer: anyObserver__next(1)
/// observer: anyObserver__next(2)
/// observer: anyObserver__next(3)
/// observer: anyObserver__completed
print(text)
}
Observable<Int>.of(1, 2, 3).subscribe(observer1).disposed(by: disposeBag)
AsyncSubject
Subject在ReactiveX的一些实现中扮演了一种桥梁或者代理的角色,它既可以作为Observer也可以作为Observable来使用。
作为观察者来说它可以订阅一个或者多个可观察序列,作为可观察者来说它可以通过items的reemitting来观察,并且还可以发射新的items事件,我们将从如下四个Subject进行学习:
AsyncSubject仅仅只发送订阅之后的最后一个item以及.onCompleted,如果出现错误,那么仅仅将只发送.onError。
let asyncSubject = AsyncSubject<Int>()
asyncSubject.onNext(1)
asyncSubject.onNext(2)
asyncSubject.subscribe { event in
let text = "observer: asyncSubject__\(event)"
self.showText(text: text)
/// observer: asyncSubject__next(4)
/// observer: asyncSubject__completed
print(text)
}.disposed(by: disposeBag)
asyncSubject.onNext(3)
asyncSubject.onNext(4)
/// 没有调用onCompleted, 则收不到subscribe, 因为不知道哪个是最后一个
asyncSubject.onCompleted()
ReplaySubject
订阅ReplaySubject的时候,可以接收到订阅他之后的事件,但也可以接受订阅他之前发出的事件,接受几个事件取决与bufferSize的大小。
createUnbounded()表示接受所有事件。
create(bufferSize: 4) 表示可接受到的订阅他之前的事件的个数,但是订阅他之后的事件一定会触发。
let replaySubject = ReplaySubject<Int>.createUnbounded()
replaySubject.onNext(1)
replaySubject.onNext(2)
replaySubject.subscribe { event in
let text = "observer: replaySubject__\(event)"
self.showText(text: text)
/// observer: replaySubject__next(1)
/// observer: replaySubject__next(2)
/// observer: replaySubject__next(3)
/// observer: replaySubject__completed
print(text)
}.disposed(by: disposeBag)
replaySubject.onNext(3)
replaySubject.onCompleted()
let replaySubject1 = ReplaySubject<Int>.create(bufferSize: 1)
replaySubject1.onNext(1)
replaySubject1.onNext(2)
replaySubject1.subscribe { event in
let text = "observer: replaySubject__\(event)"
self.showText(text: text)
/// observer: replaySubject__next(2)
/// observer: replaySubject__next(3)
/// observer: replaySubject__completed
print(text)
}.disposed(by: disposeBag)
replaySubject1.onNext(3)
replaySubject1.onCompleted()
PublishSubject
订阅PublishSubject的时候,只能接收到订阅他之后发生的事件。subject.onNext()发出onNext事件,对应的还有onError()和onCompleted()事件,可以把他看成一个bufferSize=0的ReplaySubject。
let publishSubject = PublishSubject<Int>()
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.subscribe { event in
let text = "observer: publishSubject__\(event)"
self.showText(text: text)
/// observer: publishSubject__next(3)
/// observer: publishSubject__completed
print(text)
}.disposed(by: disposeBag)
publishSubject.onNext(3)
publishSubject.onCompleted()
BehaviorSubject
订阅了BehaviorSubject,会接受到订阅之前的最后一个事件,订阅之后的事件一定会触发。
let behaviorSubject = BehaviorSubject<Int>(value: 0)
behaviorSubject.onNext(1)
behaviorSubject.onNext(2)
behaviorSubject.subscribe { event in
let text = "observer: behaviorSubject__\(event)"
self.showText(text: text)
/// observer: behaviorSubject__next(2)
/// observer: behaviorSubject__next(3)
/// observer: behaviorSubject__completed
print(text)
}.disposed(by: disposeBag)
behaviorSubject.onNext(3)
behaviorSubject.onCompleted()
BehaviorRelay
BehaviorReplay是Swift5 替换 Swift4 中的 Variable。
1 可以明确的是它不是subject类型,因为它只是一个可观察序列,但是它又包含subject对象(私有的BehaviorSubject)。
2 初始化的时候也需要一个初始值。
3 既然它不是一个订阅者,那么就不能发出onNext:、complete和error事件。
4 只能通过accept发出event。
总结:BehaviorRelay 跟 BehaviorSubject 很像,只是不是发出complete、error事件。
let behaviorRelay = BehaviorRelay(value: 0)
behaviorRelay.accept(1)
behaviorRelay.accept(2)
behaviorRelay.subscribe { event in
let text = "observer: behaviorRelay__\(event)"
self.showText(text: text)
/// observer: behaviorRelay__next(2)
/// observer: behaviorRelay__next(3)
/// observer: behaviorRelay__next(4)
print(text)
}.disposed(by: disposeBag)
behaviorRelay.accept(3)
behaviorRelay.accept(4)
Driver
Driver从名字上可以理解为驱动,在功能上它类似被观察者(Observable),而它本身也可以与被观察者相互转换(Observable: asDriver, Driver: asObservable),它驱动着一个观察者,当它的事件流中有事件涌出时,被它驱动着的观察者就能进行相应的操作。一般我们会将一个Observable被观察者转换成Driver后再进行驱动操作。
Driver的drive方法与Observable的方法bindTo用法非常相似,事实上,它们的作用也是一样,说白了就是被观察者与观察者的绑定。
那为什么RxSwift的作者又搞出Driver这么个东西来呢?
比较与Observable,Driver有以下的特性:
1 它不会发射出错误(Error)事件。
2 对它的观察订阅是发生在主线程(UI线程)的。
3 自带shareReplayLatestWhileConnected。
let categorysDriver = categorysRelay.map { models -> [ServiceCategoryItemModel] in
return [ServiceCategoryItemModel(model: Void(), items: models)]
}.asDriver(onErrorJustReturn: []).drive(headerView.categorysRelay).disposed(by: disposeBag)
Scheduler调度器相关
对于Scheduler来说,我们需要了解Concurrent(并行)、Serial(串行)Scheduler就可以了。
Observable<Int>.of(1, 2, 3).observe(on: SerialDispatchQueueScheduler(internalSerialQueueName: "serialDispatchQueue")).map { element -> Int in
let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
DispatchQueue.main.async {
self.showText(text: text)
}
/// scheduler: map --> Main Thread: false element__1
/// scheduler: map --> Main Thread: false element__2
/// scheduler: map --> Main Thread: false element__3
print(text)
return element * 2
/// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
}.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
self.showText(text: text)
/// scheduler: subscribe --> Main Thread: true event__next(2)
/// scheduler: subscribe --> Main Thread: true event__next(4)
/// scheduler: subscribe --> Main Thread: true event__next(6)
/// scheduler: subscribe --> Main Thread: true event__completed
print(text)
}.disposed(by: disposeBag)
Observable<Int>.of(4, 5, 6).observe(on: ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "concurrentDispatchQueue"))).map { element -> Int in
let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
DispatchQueue.main.async {
self.showText(text: text)
}
/// scheduler: map --> Main Thread: false element__4
/// scheduler: map --> Main Thread: false element__5
/// scheduler: map --> Main Thread: false element__6
print(text)
return element * 2
/// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
}.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
self.showText(text: text)
/// scheduler: subscribe --> Main Thread: true event__next(8)
/// scheduler: subscribe --> Main Thread: true event__next(10)
/// scheduler: subscribe --> Main Thread: true event__next(12)
/// scheduler: subscribe --> Main Thread: true event__completed
print(text)
}.disposed(by: disposeBag)
Observable<Int>.of(7, 8, 9).observe(on: ConcurrentMainScheduler.instance).map { element -> Int in
let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
DispatchQueue.main.async {
self.showText(text: text)
}
/// scheduler: map --> Main Thread: true element__7
/// scheduler: map --> Main Thread: true element__8
/// scheduler: map --> Main Thread: true element__9
print(text)
return element * 2
/// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
}.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
self.showText(text: text)
/// scheduler: subscribe --> Main Thread: true event__next(14)
/// scheduler: subscribe --> Main Thread: true event__next(16)
/// scheduler: subscribe --> Main Thread: true event__next(18)
/// scheduler: subscribe --> Main Thread: true event__completed
print(text)
}.disposed(by: disposeBag)
Disposable
订阅了一个可观察序列,如果有特殊需求需要提前取消订阅时使用。也就是说Disposable是用来取消订阅的一个工具,通过Disposables工具创建。
let dis1 = Disposables.create()
let dis2 = Disposables.create {
print("在Dispose之前所做一些工作")
}
let _ = Disposables.create([dis1, dis2])
/// dispose:通过.dispose()取消或者添加到disposeBag(可以将它看成一个非ARC机制下的AutoReleasePool)
let disposable = Observable<Int>.of(1, 2, 3).subscribe { event in
print(event)
}
disposable.dispose()
/// 或者
disposable.disposed(by: disposeBag)
常用方法
监听点击事件和输入事件
closeButton.rx.tap.subscribe(onNext: {[weak self] _ in
self?.closeClosure?()
}).disposed(by: disposeBag)
phoneTextField.rx.text.orEmpty.subscribe(onNext: {[weak self] (text) in
self?.textDidChange()
self?.textDidChangeClosure?(text)
}).disposed(by: disposeBag)
phoneTextField.rx.controlEvent(.editingDidBegin).subscribe(onNext: {[weak self] _ in
self?.editingDidBeginClosure?()
}).disposed(by: disposeBag)
phoneTextField.rx.controlEvent(.editingDidEnd).subscribe(onNext: {[weak self] _ in
self?.editingDidEndClosure?()
}).disposed(by: disposeBag)
代替通知
// MARK: public
extension NotificationManager {
/**
注册通知
*/
static func registNotification(_ name:NSNotification.Name, disposeBag:DisposeBag, callBack:@escaping ((_ noti:Notification) -> ())) {
NotificationCenter.default.rx.notification(name).subscribe(onNext: {(noti) in
callBack(noti)
}).disposed(by: disposeBag)
}
/**
发送通知
*/
static func post(name aName: NSNotification.Name, object anObject: Any? = nil, userInfo aUserInfo: [AnyHashable : Any]? = nil) {
NotificationCenter.default.post(name: aName, object: anObject, userInfo: aUserInfo)
}
}
代替观察者
/// 只发出与下一个间隔超过0.25秒的元素
scrollView.rx.observe(String.self, "contentSize").debounce(RxTimeInterval.milliseconds(250), scheduler: MainScheduler.instance).subscribe(onNext: {[weak self] _ in
self?.contentSizeClosure?(self?.scrollView.contentSize ?? .zero)
}).disposed(by: self.disposeBag)
信号合并
/// 将下拉刷新/上拉加载更多的信号和请求参数信号合并, 确定最后发起请求的请求参数
Observable<NewsListParams>.combineLatest(input.paramsRelay, input.reloadDataSubject) {[weak self] (params:NewsListParams, reloadData:Bool) -> NewsListParams in
params.page = self?.caculatePage(reloadData) ?? 1
params.row = self?.pageSize ?? 10
return params
}.subscribe(onNext: {[weak self] params in
self?.showHUD()
NewsManager.requestNewsListData(params: params) { datas in
HUD.dismiss()
self?.handleData(datas?.rows ?? [], scrollView: self?.tableView ?? BaseTableView(), isFirstFetch: params.page == 1, totalCount: datas?.total ?? 0)
}
}).disposed(by: disposeBag)
数据绑定
let categorysDriver = categorysRelay.map { models -> [ServiceCategoryItemModel] in
return [ServiceCategoryItemModel(model: Void(), items: models)]
}.asDriver(onErrorJustReturn: []).drive(headerView.categorysRelay).disposed(by: disposeBag)
代替定时器
/// Interval:创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)
Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).take(3).subscribe { event in
let text = "operator: interval__\(event)"
self.showText(text: text)
/// operator: interval__next(0)
/// operator: interval__next(1)
/// operator: interval__next(2)
/// operator: interval__completed
print(text)
}.disposed(by: disposeBag)
/// Timer:在指定的时间后,发送一个特定的Item (E -> Int/NSInteger),请注意这里与Interval的区别(Interval是发送一系列特定Item,而Timer只会发送一个)
Observable<Int>.timer(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
let text = "operator: timer__\(event)"
self.showText(text: text)
// operator: timer__next(0)
/// operator: timer__completed
print(text)
}.disposed(by: disposeBag)