@[TOC](RxSwift学习之十 (基础使用篇 1- 序列,订阅,销毁))
- 从 GitHub 上下载最新的代码:https://github.com/ReactiveX/RxSwift
1. RxSwift简介
- RxSwift 的作用
1)在编写代码时我们经常会需要检测某些值的变化(比如:textFiled 输入值的变化、数据请求完成或失败的变化),然后进行相应的处理。
过去针对不同的情况,我们需要采用不同的事件传递方法去处理,比如:delegate、notification、target-action、KVO 等等。
而 RectiveX 机制(由 RxSwift 实现)的出现,让程序里的事件传递响应方法做到统一。将之前那些常用的事件传递方法(比如:delegate、notification、target-action 等等),全部替换成 Rx 的“信号链”方式。
(2)如果我们平时使用的是 MVVM 开发模式的话,通过 RxSwift 可以获得更加方便的数据绑定的方法,使得 MVVM 开发更加如虎添翼。
- RxSwift 与 RxCocoa
RxSwift:它只是基于 Swift 语言的 Rx 标准实现接口库,所以 RxSwift 里不包含任何 Cocoa 或者 UI 方面的类。
RxCocoa:是基于 RxSwift 针对于 iOS 开发的一个库,它通过 Extension 的方法给原生的比如 UI 控件添加了 Rx 的特性,使得我们更容易订阅和响应这些控件的事件。
2. RxSwift简单使用
2.1 响应式编程与传统式编程的比较样例
- 实例2.1
-
有这么一个需求:
表格中显示的是歌曲信息(歌名,以及歌手)
点击选中任意一个单元格,在控制台中打印出对应的歌曲信息。
- 按传统方式:首先我们创建一个 Music 的结构体,用来保存歌曲名称、歌手名字。此外它还遵循 CustomStringConvertible 协议,方便我们输出调试。
import UIKit
//歌曲结构体
struct Music {
let name: String //歌名
let singer: String //演唱者
init(name: String, singer: String) {
self.name = name
self.singer = singer
}
}
//实现 CustomStringConvertible 协议,方便输出调试
extension Music: CustomStringConvertible {
var description: String {
return "name:\(name) singer:\(singer)"
}
}
2.1.1 传统编程
- 首先写一个 ViewModel
import Foundation
//歌曲列表数据源
struct MusicListViewModel {
let data = [
Music(name: "无条件", singer: "陈奕迅"),
Music(name: "你曾是少年", singer: "S.H.E"),
Music(name: "从前的我", singer: "陈洁仪"),
Music(name: "在木星", singer: "朴树"),
]
}
- 视图控制器代码(ViewController.swift)
- 接着我们设置 UITableView 的委托,并让视图控制器实现 UITableViewDataSource 和 UITableViewDelegate 协议,及相关的协议方法。
- 这个大家肯定都写过无数遍了,也没什么好讲的。算一下,这里一共需要 43 行代码。
import UIKit
import RxSwift
class ViewController: UIViewController {
//tableView对象
@IBOutlet weak var tableView: UITableView!
//歌曲列表数据源
let musicListViewModel = MusicListViewModel()
override func viewDidLoad() {
super.viewDidLoad()
//设置代理
tableView.dataSource = self
tableView.delegate = self
}
}
extension ViewController: UITableViewDataSource {
//返回单元格数量
func tableView(_ tableView: UITableView, numberOfRowsInSection section: Int) -> Int {
return musicListViewModel.data.count
}
//返回对应的单元格
func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath)
-> UITableViewCell {
let cell = tableView.dequeueReusableCell(withIdentifier: "musicCell")!
let music = musicListViewModel.data[indexPath.row]
cell.textLabel?.text = music.name
cell.detailTextLabel?.text = music.singer
return cell
}
}
extension ViewController: UITableViewDelegate {
//单元格点击
func tableView(_ tableView: UITableView, didSelectRowAt indexPath: IndexPath) {
print("你选中的歌曲信息【\(musicListViewModel.data[indexPath.row])】")
}
}
- 下面来看一下Rxswift的编程
2.1.2 Rxswift编程
- 对
ViewModel
做些修改- 这里我们将 data 属性变成一个可观察序列对象(
Observable Squence
),而对象当中的内容和我们之前在数组当中所包含的内容是完全一样的。 - 关于可观察序列对象在后面的文章中我会详细介绍。简单说就是“序列”可以对这些数值进行“订阅(
Subscribe
)”,有点类似于“通知(NotificationCenter
)”
- 这里我们将 data 属性变成一个可观察序列对象(
import RxSwift
//歌曲列表数据源
struct MusicListViewModel {
let data = Observable.just([
Music(name: "无条件", singer: "陈奕迅"),
Music(name: "你曾是少年", singer: "S.H.E"),
Music(name: "从前的我", singer: "陈洁仪"),
Music(name: "在木星", singer: "朴树"),
])
}
- 视图控制器代码(
ViewController.swift
)- 这里我们不再需要实现数据源和委托协议了。而是写一些响应式代码,让它们将数据和 UITableView 建立绑定关系。
- 算了下这里我们只需要 31 行代码,同之前的相比,一下减少了 1/4 代码量。而且代码也更清爽了些。
代码的简单说明:
DisposeBag
:作用是 Rx 在视图控制器或者其持有者将要销毁的时候,自动释法掉绑定在它上面的资源。它是通过类似“订阅处置机制”方式实现(类似于 NotificationCenter 的 removeObserver)。
rx.items(cellIdentifier:
):这是 Rx 基于 cellForRowAt 数据源方法的一个封装。传统方式中我们还要有个 numberOfRowsInSection 方法,使用 Rx 后就不再需要了(Rx 已经帮我们完成了相关工作)。
rx.modelSelected
: 这是 Rx 基于 UITableView 委托回调方法 didSelectRowAt 的一个封装。
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
//tableView对象
@IBOutlet weak var tableView: UITableView!
//歌曲列表数据源
let musicListViewModel = MusicListViewModel()
//负责对象销毁
let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
//将数据源数据绑定到tableView上
musicListViewModel.data
.bind(to: tableView.rx.items(cellIdentifier:"musicCell")) { _, music, cell in
cell.textLabel?.text = music.name
cell.detailTextLabel?.text = music.singer
}.disposed(by: disposeBag)
//tableView点击响应
tableView.rx.modelSelected(Music.self).subscribe(onNext: { music in
print("你选中的歌曲信息【\(music)】")
}).disposed(by: disposeBag)
}
}
2.2 Observable介绍、创建可观察序列
Observable 作为 Rx 的根基,我们首先对它要有一些基本的了解。
-
Observable<T>:
Observable<T> 这个类就是 Rx 框架的基础,我们可以称它为可观察序列。它的作用就是可以异步地产生一系列的 Event(事件),即一个 Observable<T> 对象会随着时间推移不定期地发出 event(element : T) 这样一个东西。
而且这些 Event 还可以携带数据,它的泛型 <T> 就是用来指定这个 Event 携带的数据的类型。
有了可观察序列,我们还需要有一个 Observer(订阅者)来订阅它,这样这个订阅者才能收到 Observable<T> 不时发出的 Event。 Event
查看 RxSwift 源码可以发现,事件 Event 的定义如下:
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
- 可以看到 Event 就是一个枚举,也就是说一个 Observable 是可以发出 3 种不同类型的 Event 事件:
-
next:
next
事件就是那个可以携带数据 <T> 的事件,可以说它就是一个“最正常”的事件。 -
error:
error
事件表示一个错误,它可以携带具体的错误内容,一旦Observable
发出了error event
,则这个Observable
就等于终止了,以后它再也不会发出 event 事件了。 -
completed:
completed
事件表示Observable
发出的事件正常地结束了,跟 error 一样,一旦Observable
发出了completed event
,则这个Observable
就等于终止了,以后它再也不会发出 event 事件了。
-
next:
2.2.1 Observable 与 Sequence比较
- 1)为更好地理解,我们可以把每一个
Observable
的实例想象成于一个 Swift 中的 Sequence:- 即一个
Observable
(ObservableType
)相当于一个序列Sequence
(SequenceType
)。 -
ObservableType.subscribe(_:)
方法其实就相当于SequenceType.generate()
- 即一个
- 2)但它们之间还是有许多区别的:
- Swift 中的
SequenceType
是同步的循环,而Observable
是异步的。 -
Observable
对象会在有任何 Event 时候,自动将 Event 作为一个参数通过ObservableType.subscribe(_:)
发出,并不需要使用 next 方法。
- Swift 中的
2.2.2 创建 Observable 序列
- 我们可以通过如下几种方法来创建一个 Observable 序列
- just() 方法
(1)该方法通过传入一个默认值来初始化。
(2)下面样例我们显式地标注出了 observable 的类型为 Observable<Int>,即指定了这个 Observable 所发出的事件携带的数据类型必须是 Int 类型的。
let observable = Observable<Int>.just(5)
- of() 方法
(1)该方法可以接受可变数量的参数(必需要是同类型的)
(2)下面样例中我没有显式地声明出 Observable 的泛型类型,Swift 也会自动推断类型。
let observable = Observable.of("A", "B", "C")
- from() 方法
(1)该方法需要一个数组参数。
(2)下面样例中数据里的元素就会被当做这个 Observable 所发出 event 携带的数据内容,最终效果同上面饿 of() 样例是一样的。
let observable = Observable.from(["A", "B", "C"])
- empty() 方法
该方法创建一个空内容的 Observable 序列。
let observable = Observable<Int>.never()
- never() 方法
该方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列。
let observable = Observable<Int>.never()
- error() 方法
该方法创建一个不做任何操作,而是直接发送一个错误的 Observable 序列。
enum MyError: Error {
case A
case B
}
let observable = Observable<Int>.error(MyError.A)
- range() 方法
(1)该方法通过指定起始和结束数值,创建一个以这个范围内所有值作为初始值的 Observable 序列。
(2)下面样例中,两种方法创建的 Observable 序列都是一样的。
//使用range()
let observable = Observable.range(start: 1, count: 5)
//使用of()
let observable = Observable.of(1, 2, 3 ,4 ,5)
- repeatElement() 方法
该方法创建一个可以无限发出给定元素的 Event 的 Observable 序列(永不终止)。
let observable = Observable.repeatElement(1)
- generate() 方法
(1)该方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
(2)下面样例中,两种方法创建的 Observable 序列都是一样的。
//使用generate()方法
let observable = Observable.generate(
initialState: 0,
condition: { $0 <= 10 },
iterate: { $0 + 2 }
)
//使用of()方法
let observable = Observable.of(0 , 2 ,4 ,6 ,8 ,10)
- create() 方法
(1)该方法接受一个 block 形式的参数,任务是对每一个过来的订阅进行处理。
(2)下面是一个简单的样例。为方便演示,这里增加了订阅相关代码
//这个block有一个回调参数observer就是订阅这个Observable对象的订阅者
//当一个订阅者订阅这个Observable对象的时候,就会将订阅者作为参数传入这个block来执行一些内容
let observable = Observable<String>.create{observer in
//对订阅者发出了.next事件,且携带了一个数据"hangge.com"
observer.onNext("hangge.com")
//对订阅者发出了.completed事件
observer.onCompleted()
//因为一个订阅行为会有一个Disposable类型的返回值,所以在结尾一定要returen一个Disposable
return Disposables.create()
}
//订阅测试
observable.subscribe {
print($0)
}
- deferred() 方法
(1)该个方法相当于是创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable 序列创建的行为,而这个 block 里就是真正的实例化序列对象的地方。
(2)下面是一个简单的演示样例:
//用于标记是奇数、还是偶数
var isOdd = true
//使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
let factory : Observable<Int> = Observable.deferred {
//让每次执行这个block时候都会让奇、偶数进行交替
isOdd = !isOdd
//根据isOdd参数,决定创建并返回的是奇数Observable、还是偶数Observable
if isOdd {
return Observable.of(1, 3, 5 ,7)
}else {
return Observable.of(2, 4, 6, 8)
}
}
//第1次订阅测试
factory.subscribe { event in
print("\(isOdd)", event)
}
//第2次订阅测试
factory.subscribe { event in
print("\(isOdd)", event)
}
运行结果如下,可以看到我们两次订阅的得到的 Observable 是不一样的:
- interval() 方法
(1)这个方法创建的 Observable 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。
(2)下面方法让其每 1 秒发送一次,并且是在主线程(MainScheduler)发送。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
- timer() 方法
- (1) 这个方法有两种用法,一种是创建的 Observable 序列在经过设定的一段时间后,产生唯一的一个元素。
//5秒种后发出唯一的一个元素0
let observable = Observable<Int>.timer(5, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
- (2) 另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素。
//延时5秒种后,每隔1秒钟发出一个元素
let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
2.3 Observable订阅、事件监听、订阅销毁
2.3.1 Observable订阅
- 有了 Observable,我们还要使用 subscribe() 方法来订阅它,接收它发出的 Event。
2.3.1.1 第一种订阅方式
- (1)我们使用 subscribe() 订阅了一个 Observable 对象,该方法的 block 的回调参数就是被发出的 event 事件,我们将其直接打印出来。
let observable = Observable.of("A", "B", "C")
observable.subscribe { event in
print(event)
}
运行结果如下,可以看到:
初始化 Observable 序列时设置的默认值都按顺序通过 .next 事件发送出来。
当 Observable 序列的初始数据都发送完毕,它还会自动发一个 .completed 事件出来。
- (2)如果想要获取到这个事件里的数据,可以通过 event.element 得到。
let observable = Observable.of("A", "B", "C")
observable.subscribe { event in
print(event.element)
}
运行结果如下:
2.3.1.2 第二种订阅方式
- (1)RxSwift 还提供了另一个
subscribe
方法,它可以把 event 进行分类:- 通过不同的 block 回调处理不同类型的 event。(其中 onDisposed 表示订阅行为被 dispose 后的回调)
- 同时会把 event 携带的数据直接解包出来作为参数,方便我们使用。
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
运行结果如下:
- (2)
subscribe()
方法的onNext
、onError
、onCompleted
和onDisposed
这四个回调 block 参数都是有默认值的,即它们都是可选的。所以我们也可以只处理onNext
而不管其他的情况。
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
})
运行结果如下:
A
B
C
2.3.2 事件监听
-
doOn 介绍
(1)我们可以使用 doOn 方法来监听事件的生命周期,它会在每一次事件发送前被调用。
(2)同时它和 subscribe 一样,可以通过不同的 block 回调处理不同类型的 event。比如:
do(onNext:) 方法就是在 subscribe(onNext:) 前调用
而 do(onCompleted:) 方法则会在 subscribe(onCompleted:) 前面调用。 使用样例
let observable = Observable.of("A", "B", "C")
observable
.do(onNext: { element in
print("Intercepted Next:", element)
}, onError: { error in
print("Intercepted Error:", error)
}, onCompleted: {
print("Intercepted Completed")
}, onDispose: {
print("Intercepted Disposed")
})
.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
2.3.3 订阅销毁
2.3.3.1 Observable 从创建到终结流程
- (1)一个
Observable
序列被创建出来后它不会马上就开始被激活从而发出 Event,而是要等到它被某个人订阅了才会激活它。 - (2)而
Observable
序列激活之后要一直等到它发出了.error
或者.completed
的event
后,它才被终结。
2.3.3.2 dispose() 方法
-(1)使用该方法我们可以手动取消一个订阅行为。
-(2)如果我们觉得这个订阅结束了不再需要了,就可以调用 dispose()
方法把这个订阅给销毁掉,防止内存泄漏。
-(3)当一个订阅行为被 dispose
了,那么之后 observable
如果再发出 event
,这个已经 dispose
的订阅就收不到消息了。下面是一个简单的使用样例。
let observable = Observable.of("A", "B", "C")
//使用subscription常量存储这个订阅方法
let subscription = observable.subscribe { event in
print(event)
}
//调用这个订阅的dispose()方法
subscription.dispose()
2.3.3.13 DisposeBag
- (1)除了 dispose() 方法之外,我们更经常用到的是一个叫 DisposeBag 的对象来管理多个订阅行为的销毁:
- 我们可以把一个 DisposeBag 对象看成一个垃圾袋,把用过的订阅行为都放进去。
- 而这个 DisposeBag 就会在自己快要 dealloc 的时候,对它里面的所有订阅行为都调用 dispose() 方法。
- (2)下面是一个简单的使用样例。
let disposeBag = DisposeBag()
//第1个Observable,及其订阅
let observable1 = Observable.of("A", "B", "C")
observable1.subscribe { event in
print(event)
}.disposed(by: disposeBag)
//第2个Observable,及其订阅
let observable2 = Observable.of(1, 2, 3)
observable2.subscribe { event in
print(event)
}.disposed(by: disposeBag)
2.4 AnyObserver、Binder
2.4.1 观察者(Observer)
- 观察者(Observer)的作用就是监听事件,然后对这个事件做出响应。或者说任何响应事件的行为都是观察者。比如:
- 当我们点击按钮,弹出一个提示框。那么这个“弹出一个提示框”就是观察者 Observer<Void>
- 当我们请求一个远程的 json 数据后,将其打印出来。那么这个“打印 json 数据”就是观察者 Observer<JSON>
2.4.2 创建观察者
2.4.2.1 直接在 subscribe、bind 方法中创建观察者
- 在 subscribe 方法中创建
(1)创建观察者最直接的方法就是在 Observable 的 subscribe 方法后面描述当事件发生时,需要如何做出响应。
(2)比如下面的样例,观察者就是由后面的 onNext,onError,onCompleted 这些闭包构建出来的。
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
})
运行结果:
- 在 bind 方法中创建
(1)下面代码我们创建一个定时生成索引数的 Observable 序列,并将索引数不断显示在 label 标签上:
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind { [weak self](text) in
//收到发出的索引数后显示到label上
self?.label.text = text
}
.disposed(by: disposeBag)
2.4.2.2 使用 AnyObserver 创建观察者
- AnyObserver 可以用来描叙任意一种观察者。
- 配合 subscribe 方法使用
//观察者
let observer: AnyObserver<String> = AnyObserver { (event) in
switch event {
case .next(let data):
print(data)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
let observable = Observable.of("A", "B", "C")
observable.subscribe(observer)
- 配合 bindTo 方法使用
//观察者
let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in
switch event {
case .next(let text):
//收到发出的索引数后显示到label上
self?.label.text = text
default:
break
}
}
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind(to: observer)
.disposed(by: disposeBag)
}
2.4.2.3 使用 Binder 创建观察者
(1)相较于 AnyObserver 的大而全,Binder 更专注于特定的场景。Binder 主要有以下两个特征:
不会处理错误事件
确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)
(2)一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。
- 实例2.4.2.3
//观察者
let observer: Binder<String> = Binder(label) { (view, text) in
//收到发出的索引数后显示到label上
view.text = text
}
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind(to: observer)
.disposed(by: disposeBag)
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { $0 % 2 == 0 }
.bind(to: button.rx.isEnabled)
.disposed(by: disposeBag)
2.5 自定义可绑定属性
有时我们想让 UI 控件创建出来后默认就有一些观察者,而不必每次都为它们单独去创建观察者。比如我们想要让所有的 UIlabel 都有个 fontSize 可绑定属性,它会根据事件值自动改变标签的字体大小。
通过对 UI 类进行扩展
这里我们通过对 UILabel 进行扩展,增加了一个 fontSize 可绑定属性。
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
//Observable序列(每隔0.5秒钟发出一个索引数)
let observable = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
observable
.map { CGFloat($0) }
.bind(to: label.fontSize) //根据索引数不断变放大字体
.disposed(by: disposeBag)
}
}
extension UILabel {
public var fontSize: Binder<CGFloat> {
return Binder(self) { label, fontSize in
label.font = UIFont.systemFont(ofSize: fontSize)
}
}
}
- 通过对 Reactive 类进行扩展
既然使用了 RxSwift,那么更规范的写法应该是对 Reactive 进行扩展。这里同样是给 UILabel 增加了一个 fontSize 可绑定属性。
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
//Observable序列(每隔0.5秒钟发出一个索引数)
let observable = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
observable
.map { CGFloat($0) }
.bind(to: label.rx.fontSize) //根据索引数不断变放大字体
.disposed(by: disposeBag)
}
}
extension Reactive where Base: UILabel {
public var fontSize: Binder<CGFloat> {
return Binder(self.base) { label, fontSize in
label.font = UIFont.systemFont(ofSize: fontSize)
}
}
}
- RxSwift 自带的可绑定属性(UI 观察者)
其实 RxSwift 已经为我们提供许多常用的可绑定属性。比如 UILabel 就有 text 和 attributedText 这两个可绑定属性。
extension Reactive where Base: UILabel {
/// Bindable sink for `text` property.
public var text: Binder<String?> {
return Binder(self.base) { label, text in
label.text = text
}
}
/// Bindable sink for `attributedText` property.
public var attributedText: Binder<NSAttributedString?> {
return Binder(self.base) { label, text in
label.attributedText = text
}
}
}
那么上文那个定时显示索引数的样例,我们其实不需要自定义 UI 观察者,直接使用 RxSwift 提供的绑定属性即可。
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind(to: label.rx.text) //收到发出的索引数后显示到label上
.disposed(by: disposeBag)
2.6 Subjects、Variables
当我们创建一个 Observable 的时候就要预先将要发出的数据都准备好,等到有人订阅它时再将数据通过 Event 发出去。
但有时我们希望 Observable 在运行时能动态地“获得”或者说“产生”出一个新的数据,再通过 Event 发送出去。比如:订阅一个输入框的输入内容,当用户每输入一个字后,这个输入框关联的 Observable 就会发出一个带有输入内容的 Event,通知给所有订阅者。
这个就可以使用下面将要介绍的 Subjects 来实现。
2.6.1 Subjects
2.6.1.1 Subjects 简介
-(1)Subjects 既是订阅者,也是 Observable:
说它是订阅者,是因为它能够动态地接收新的值。
说它又是一个 Observable,是因为当 Subjects 有了新的值之后,就会通过 Event 将新值发出给他的所有订阅者。
-(2)一共有四种 Subjects,分别为:PublishSubject
、BehaviorSubject
、ReplaySubject
、Variable
。他们之间既有各自的特点,也有相同之处:
- 首先他们都是
Observable
,他们的订阅者都能收到他们发出的新的Event
。- 直到
Subject
发出.complete
或者.error
的Event
后,该Subject
便终结了,同时它也就不会再发出.next
事件。- 对于那些在
Subject
终结后再订阅他的订阅者,也能收到subject
发出的一条.complete
或.error
的event
,告诉这个新的订阅者它已经终结了。- .他们之间最大的区别只是在于:当一个新的订阅者刚订阅它的时候,能不能收到
Subject
以前发出过的旧Event
,如果能的话又能收到多少个。
-(3)Subject 常用的几个方法:
onNext(:)
:是 on(.next(:)) 的简便写法。该方法相当于 subject 接收到一个 .next 事件。onError(:)
:是 on(.error(:)) 的简便写法。该方法相当于 subject 接收到一个 .error 事件。onCompleted()
:是 on(.completed) 的简便写法。该方法相当于 subject 接收到一个 .completed 事件。
2.6.1.2 PublishSubject
- (1)基本介绍
PublishSubject
是最普通的Subject
,它不需要初始值就能创建。
PublishSubject
的订阅者从他们开始订阅的时间点起,可以收到订阅后Subject
发出的新Event
,而不会收到他们在订阅前已发出的Event
。
- (2)时序图
如下图:最上面一条是PublishSubject
。
下面两条分别表示两个新的订阅,它们订阅的时间点不同,可以发现PublishSubject
的订阅者只能收到他们订阅后的Event
。
- 实例 2.6.1.2 :
let disposeBag = DisposeBag()
//创建一个PublishSubject
let subject = PublishSubject<String>()
//由于当前没有任何订阅者,所以这条信息不会输出到控制台
subject.onNext("111")
//第1次订阅subject
subject.subscribe(onNext: { string in
print("第1次订阅:", string)
}, onCompleted:{
print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)
//当前有1个订阅,则该信息会输出到控制台
subject.onNext("222")
//第2次订阅subject
subject.subscribe(onNext: { string in
print("第2次订阅:", string)
}, onCompleted:{
print("第2次订阅:onCompleted")
}).disposed(by: disposeBag)
//当前有2个订阅,则该信息会输出到控制台
subject.onNext("333")
//让subject结束
subject.onCompleted()
//subject完成后会发出.next事件了。
subject.onNext("444")
//subject完成后它的所有订阅(包括结束后的订阅),都能收到subject的.completed事件,
subject.subscribe(onNext: { string in
print("第3次订阅:", string)
}, onCompleted:{
print("第3次订阅:onCompleted")
}).disposed(by: disposeBag)
运行结果:
2.6.1.3 BehaviorSubject
-(1)基本介绍
BehaviorSubject
需要通过一个默认初始值来创建。
当一个订阅者来订阅它的时候,这个订阅者会立即收到BehaviorSubjects
上一个发出的event
。之后就跟正常的情况一样,它也会接收到BehaviorSubject
之后发出的新的event
。
-(2)时序图
如下图:最上面一条是 BehaviorSubject
。
下面两条分别表示两个新的订阅,它们订阅的时间点不同,可以发现 BehaviorSubject
的订阅者一开始就能收到 BehaviorSubjects
之前发出的一个 Event
。
- 实例 2.6.1.3
let disposeBag = DisposeBag()
//创建一个BehaviorSubject
let subject = BehaviorSubject(value: "111")
//第1次订阅subject
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
//发送next事件
subject.onNext("222")
//发送error事件
subject.onError(NSError(domain: "local", code: 0, userInfo: nil))
//第2次订阅subject
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
运行结果:
2.6.1.4 ReplaySubject
- (1)基本介绍
ReplaySubject
在创建时候需要设置一个bufferSize
,表示它对于它发送过的 event 的缓存个数。- 比如一个
ReplaySubject
的bufferSize
设置为 2,它发出了 3 个.next
的event
,那么它会将后两个(最近的两个)event
给缓存起来。此时如果有一个subscriber
订阅了这个ReplaySubject
,那么这个subscriber
就会立即收到前面缓存的两个.next
的event
。- 如果一个
subscriber
订阅已经结束的ReplaySubject
,除了会收到缓存的.next
的event
外,还会收到那个终结的.error
或者.complete
的event
。
- (2)时序图
如下图:最上面一条是ReplaySubject
(bufferSize
设为为 2)。
下面两条分别表示两个新的订阅,它们订阅的时间点不同。可以发现ReplaySubject
的订阅者一开始就能收到ReplaySubject
之前发出的两个Event
(如果有的话)。
- 实例 2.6.1.4 :
let disposeBag = DisposeBag()
//创建一个bufferSize为2的ReplaySubject
let subject = ReplaySubject<String>.create(bufferSize: 2)
//连续发送3个next事件
subject.onNext("111")
subject.onNext("222")
subject.onNext("333")
//第1次订阅subject
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
//再发送1个next事件
subject.onNext("444")
//第2次订阅subject
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
//让subject结束
subject.onCompleted()
//第3次订阅subject
subject.subscribe { event in
print("第3次订阅:", event)
}.disposed(by: disposeBag)
运行结果:
2.6.2 Variables
- 注意:由于 Variable 在之后版本中将被废弃,建议使用 Varible 的地方都改用下面介绍的 BehaviorRelay 作为替代
2.6.2.1 Variable
-(1)基本介绍
Variable
其实就是对BehaviorSubject
的封装,所以它也必须要通过一个默认的初始值进行创建。Variable
具有BehaviorSubject
的功能,能够向它的订阅者发出上一个event
以及之后新创建的event
。- 不同的是,
Variable
还把会把当前发出的值保存为自己的状态。同时它会在销毁时自动发送.complete
的event
,不需要也不能手动给Variables
发送completed
或者error
事件来结束它。- 简单地说就是
Variable
有一个value
属性,我们改变这个value
属性的值就相当于调用一般Subjects
的onNext()
方法,而这个最新的onNext()
的值就被保存在value
属性里了,直到我们再次修改它。Variables
本身没有subscribe()
方法,但是所有Subjects
都有一个asObservable()
方法。我们可以使用这个方法返回这个Variable
的Observable
类型,拿到这个Observable
类型我们就能订阅它了。
- 实例 2.6.2.1
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
override func viewDidLoad() {
super.viewDidLoad()
let disposeBag = DisposeBag()
//创建一个初始值为111的Variable
let variable = Variable("111")
//修改value值
variable.value = "222"
//第1次订阅
variable.asObservable().subscribe {
print("第1次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
variable.value = "333"
//第2次订阅
variable.asObservable().subscribe {
print("第2次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
variable.value = "444"
}
}
注意:由于 Variable 对象在 viewDidLoad() 方法内初始化,所以它的生命周期就被限制在该方法内。当这个方法执行完毕后,这个 Variable 对象就会被销毁,同时它也就自动地向它的所有订阅者发出 .completed 事件
运行结果:
2.6.2.2 BehaviorRelay
- 1)基本介绍
BehaviorRelay
是作为Variable
的替代者出现的。它的本质其实也是对BehaviorSubject
的封装,所以它也必须要通过一个默认的初始值进行创建。BehaviorRelay
具有BehaviorSubject
的功能,能够向它的订阅者发出上一个event
以及之后新创建的event
。- 与
BehaviorSubject
不同的是,不需要也不能手动给BehaviorReply
发送completed
或者error
事件来结束它(BehaviorRelay
会在销毁时也不会自动发送.complete
的event
)。BehaviorRelay
有一个value
属性,我们通过这个属性可以获取最新值。而通过它的accept()
方法可以对值进行修改。
- (2)上面的
Variable
样例我们可以改用成BehaviorRelay
,代码如下:
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
override func viewDidLoad() {
super.viewDidLoad()
let disposeBag = DisposeBag()
//创建一个初始值为111的BehaviorRelay
let subject = BehaviorRelay<String>(value: "111")
//修改value值
subject.accept("222")
//第1次订阅
subject.asObservable().subscribe {
print("第1次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept("333")
//第2次订阅
subject.asObservable().subscribe {
print("第2次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept("444")
}
}
运行结果:
- (3)如果想将新值合并到原值上,可以通过
accept()
方法与value
属性配合来实现。(这个常用在表格上拉加载功能上,BehaviorRelay
用来保存所有加载到的数据)
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
override func viewDidLoad() {
super.viewDidLoad()
let disposeBag = DisposeBag()
//创建一个初始值为包含一个元素的数组的BehaviorRelay
let subject = BehaviorRelay<[String]>(value: ["1"])
//修改value值
subject.accept(subject.value + ["2", "3"])
//第1次订阅
subject.asObservable().subscribe {
print("第1次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept(subject.value + ["4", "5"])
//第2次订阅
subject.asObservable().subscribe {
print("第2次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
subject.accept(subject.value + ["6", "7"])
}
}
运行结果:
2.7 变换操作符:buffer、map、flatMap、scan等
- 变换操作指的是对原始的 Observable 序列进行一些转换,类似于 Swift 中 CollectionType 的各种转换。
2.7.1 buffer
- (1)基本介绍
buffer 方法作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。
该方法简单来说就是缓存Observable
中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。
- 实例 2.7.1
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
let subject = PublishSubject<String>()
//每缓存3个元素则组合起来一起发出。
//如果1秒钟内不够3个也会发出(有几个发几个,一个都没有发空数组 [])
subject
.buffer(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
}
}
运行结果:
2.7.2 window
- (1)基本介绍
window 操作符和 buffer 十分相似。不过 buffer 是周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable 的形态发送出来。
同时 buffer 要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。
- 实例 2.7.2
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
let subject = PublishSubject<String>()
//每3个元素作为一个子Observable发出。
subject
.window(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
.subscribe(onNext: { [weak self] in
print("subscribe: \($0)")
$0.asObservable()
.subscribe(onNext: { print($0) })
.disposed(by: self!.disposeBag)
})
.disposed(by: disposeBag)
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
}
}
运行结果:
2.7.3 map
- (1)基本介绍
该操作符通过传入一个函数闭包把原来的 Observable 序列转变为一个新的 Observable 序列。
- 实例 2.7.3
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * 10}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
10
20
30
2.7.4 flatMap
- (1)基本介绍
map
在做转换的时候容易出现“升维”的情况。即转变之后,从一个序列变成了一个序列的序列。- 而
flatMap
操作符会对源Observable
的每一个元素应用一个转换方法,将他们转换成Observables
。 然后将这些Observables
的元素合并之后再发送出来。即又将其 "拍扁"(降维)成一个Observable
序列。- 这个操作符是非常有用的。比如当
Observable
的元素本生拥有其他的Observable
时,我们可以将所有子Observables
的元素发送出来。
- 实例 2.7.4
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let variable = Variable(subject1)
variable.asObservable()
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")
运行结果:
2.7.5 flatMapLatest
- (1)基本介绍
flatMapLatest 与 flatMap 的唯一区别是:flatMapLatest 只会接收最新的 value 事件。
- 实例2.7.5 : 这里我们将上例中的 flatMap 改为 flatMapLatest
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let variable = Variable(subject1)
variable.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")
运行结果:
2.7.6 flatMapFirst
- (1)基本介绍
- flatMapFirst 与 flatMapLatest 正好相反:flatMapFirst 只会接收最初的 value 事件。
- 该操作符可以防止重复请求:
比如点击一个按钮发送一个请求,当该请求完成前,该按钮点击都不应该继续发送请求。便可该使用 flatMapFirst 操作符。
- 实例2.7.6 :这里我们将上例中的 flatMapLatest 改为 flatMapFirst。
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let variable = Variable(subject1)
variable.asObservable()
.flatMapFirst { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")
运行结果:
2.7.7 concatMap
- (1)基本介绍
concatMap 与 flatMap 的唯一区别是:当前一个 Observable 元素发送完毕后,后一个Observable 才可以开始发出元素。或者说等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。
- 实例2.7.7
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let variable = Variable(subject1)
variable.asObservable()
.concatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")
subject1.onCompleted() //只有前一个序列结束后,才能接收下一个序列
运行结果:
2.7.8 scan
- (1)基本介绍
scan 就是先给一个初始化的数,然后不断的拿前一个结果和最新的值进行处理操作。
- 实例2.7.8
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 5)
.scan(0) { acum, elem in
acum + elem
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果
2.7.9 groupBy
- (1)基本介绍
groupBy 操作符将源 Observable 分解为多个子 Observable,然后将这些子 Observable 发送出来。
也就是说该操作符会将元素通过某个键进行分组,然后将分组后的元素序列以 Observable 的形态发送出来。
- 实例2.7.9
let disposeBag = DisposeBag()
//将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5)
.groupBy(keySelector: { (element) -> String in
return element % 2 == 0 ? "偶数" : "基数"
})
.subscribe { (event) in
switch event {
case .next(let group):
group.asObservable().subscribe({ (event) in
print("key:\(group.key) event:\(event)")
})
.disposed(by: disposeBag)
default:
print("")
}
}
.disposed(by: disposeBag)
运行结果:
2.8 过滤操作符:filter、take、skip等
- 过滤操作指的是从源 Observable 中选择特定的数据发送。
2.8.1 filter
- (1)基本介绍
该操作符就是用来过滤掉某些不符合要求的事件。
- 实例2.8.1
let disposeBag = DisposeBag()
Observable.of(2, 30, 22, 5, 60, 3, 40 ,9)
.filter {
$0 > 10
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.8.2 distinctUntilChanged
-
该操作符用于过滤掉连续重复的事件。
- 实例2.8.2
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 1, 1, 4)
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
2.8.3 single
- (1)基本介绍
限制只发送一次事件,或者满足条件的第一个事件。
如果存在有多个事件或者没有事件都会发出一个 error 事件。
如果只有一个事件,则不会发出 error 事件。
- 实例2.8.3
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.single{ $0 == 2 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Observable.of("A", "B", "C", "D")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
2.8.4 elementAt
-
该方法实现只处理在指定位置的事件。
- 实例2.8.4
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.elementAt(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
3
2.8.5 ignoreElements
- (1)基本介绍
该操作符可以忽略掉所有的元素,只发出 error 或 completed 事件。
如果我们并不关心 Observable 的任何元素,只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符。
- 实例2.8.5
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.ignoreElements()
.subscribe{
print($0)
}
.disposed(by: disposeBag)
运行结果:
completed
2.8.6 take
-
该方法实现仅发送 Observable 序列中的前 n 个事件,在满足数量之后会自动 .completed。
- 实例2.8.6
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
1
2
2.8.7 takeLast
-
该方法实现仅发送 Observable 序列中的后 n 个事件。
- 实例2.8.7
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.takeLast(1)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
4
2.8.8 skip
-
该方法用于跳过源 Observable 序列发出的前 n 个事件。
- 实例 2.8.8
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
3
4
2.8.9 Sample
- Sample 除了订阅源 Observable 外,还可以监视另外一个 Observable, 即 notifier 。
- 每当收到 notifier 事件,就会从源序列取一个最新的事件并发送。而如果两次 notifier 事件之间没有源序列的事件,则不发送值。
- 实例2.8.9
let disposeBag = DisposeBag()
let source = PublishSubject<Int>()
let notifier = PublishSubject<String>()
source
.sample(notifier)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
source.onNext(1)
//让源序列接收接收消息
notifier.onNext("A")
source.onNext(2)
//让源序列接收接收消息
notifier.onNext("B")
notifier.onNext("C")
source.onNext(3)
source.onNext(4)
//让源序列接收接收消息
notifier.onNext("D")
source.onNext(5)
//让源序列接收接收消息
notifier.onCompleted()
运行结果:
1
2
4
5
2.8.10 debounce
- (1)基本介绍
- debounce 操作符可以用来过滤掉高频产生的元素,它只会发出这种元素:该元素产生后,一段时间内没有新元素产生。
- 换句话说就是,队列中的元素如果和下一个元素的间隔小于了指定的时间间隔,那么这个元素将被过滤掉。
- debounce 常用在用户输入的时候,不需要每个字母敲进去都发送一个事件,而是稍等一下取最后一个事件。
- 实例2.8.10:
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
//定义好每个事件里的值以及发送的时间
let times = [
[ "value": 1, "time": 0.1 ],
[ "value": 2, "time": 1.1 ],
[ "value": 3, "time": 1.2 ],
[ "value": 4, "time": 1.2 ],
[ "value": 5, "time": 1.4 ],
[ "value": 6, "time": 2.1 ]
]
//生成对应的 Observable 序列并订阅
Observable.from(times)
.flatMap { item in
return Observable.of(Int(item["value"]!))
.delaySubscription(Double(item["time"]!),
scheduler: MainScheduler.instance)
}
.debounce(0.5, scheduler: MainScheduler.instance) //只发出与下一个间隔超过0.5秒的元素
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
}
运行结果:
1
5
6
2.9 条件和布尔操作符:amb、takeWhile、skipWhile等
- 条件和布尔操作会根据条件发射或变换 Observables,或者对他们做布尔运算。
2.9.1 amb
-
当传入多个
Observables
到amb
操作符时,它将取第一个发出元素或产生事件的Observable
,然后只发出它的元素。并忽略掉其他的Observables
。
实例 2.9.1
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let subject3 = PublishSubject<Int>()
subject1
.amb(subject2)
.amb(subject3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject2.onNext(1)
subject1.onNext(20)
subject2.onNext(2)
subject1.onNext(40)
subject3.onNext(0)
subject2.onNext(3)
subject1.onNext(60)
subject3.onNext(0)
subject3.onNext(0)
运行结果:
1
2
3
2.9.2 takeWhile
-
该方法依次判断 Observable 序列的每一个值是否满足给定的条件。 当第一个不满足条件的值出现时,它便自动完成。
实例 2.9.2
let disposeBag = DisposeBag()
Observable.of(2, 3, 4, 5, 6)
.takeWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
2
3
2.9.3 takeUntil
- 基本介绍
- 除了订阅源
Observable
外,通过takeUntil
方法我们还可以监视另外一个Observable
, 即notifier
。- 如果
notifier
发出值或complete
通知,那么源Observable
便自动完成,停止发送事件。
- 实例 2.9.3
let disposeBag = DisposeBag()
let source = PublishSubject<String>()
let notifier = PublishSubject<String>()
source
.takeUntil(notifier)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
source.onNext("a")
source.onNext("b")
source.onNext("c")
source.onNext("d")
//停止接收消息
notifier.onNext("z")
source.onNext("e")
source.onNext("f")
source.onNext("g")
输出结果:
a
b
c
d
2.9.4 skipWhile
- 基本介绍
- 该方法用于跳过前面所有满足条件的事件。
一旦遇到不满足条件的事件,之后就不会再跳过了。
- 实例 2.9.4
let disposeBag = DisposeBag()
Observable.of(2, 3, 4, 5, 6)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
}
运行结果:
4
5
6
2.9.5 skipUntil
- 基本介绍
- 同上面的
takeUntil
一样,skipUntil
除了订阅源Observable
外,通过skipUntil
方法我们还可以监视另外一个Observable
, 即notifier
。- 与
takeUntil
相反的是。源Observable
序列事件默认会一直跳过,直到notifier
发出值或complete
通知。
- 实例 2.9.5
let disposeBag = DisposeBag()
let source = PublishSubject<Int>()
let notifier = PublishSubject<Int>()
source
.skipUntil(notifier)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
source.onNext(1)
source.onNext(2)
source.onNext(3)
source.onNext(4)
source.onNext(5)
//开始接收消息
notifier.onNext(0)
source.onNext(6)
source.onNext(7)
source.onNext(8)
//仍然接收消息
notifier.onNext(0)
source.onNext(9)
运行结果:
6
7
8
9
2.10 结合操作符:startWith、merge、zip等
2.10.1 startWith
- 基本介绍:
该方法会在 Observable 序列开始之前插入一些事件元素。即发出事件消息之前,会先发出这些预先插入的事件消息
-
图解:
- 实例 2.10.1
let disposeBag = DisposeBag()
Observable.of("2", "3")
.startWith("1")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
1
2
3
- 实例2.10.1.1: 可以插入多个数据
let disposeBag = DisposeBag()
Observable.of("2", "3")
.startWith("a")
.startWith("b")
.startWith("c")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
运行结果:
c
b
a
2
3
2.10.2 merge
- 基本介绍:
该方法可以将多个(两个或两个以上的)
Observable
序列合并成一个Observable
序列。
-
图解:
- 实例 2.10.1
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext(20)
subject1.onNext(40)
subject1.onNext(60)
subject2.onNext(1)
subject1.onNext(80)
subject1.onNext(100)
subject2.onNext(1)
运行结果:
20
40
60
1
80
100
1
2.10.3 zip
- 基本介绍:
该方法可以将多个(两个或两个以上的)
Observable
序列压缩成一个Observable
序列。
而且它会等到每个Observable
事件一一对应地凑齐之后再合并。
-
图解:
- 实例 2.10.1
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<String>()
Observable.zip(subject1, subject2) {
"\($0)\($1)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext(1)
subject2.onNext("A")
subject1.onNext(2)
subject2.onNext("B")
subject2.onNext("C")
subject2.onNext("D")
subject1.onNext(3)
subject1.onNext(4)
subject1.onNext(5)
运行结果:
1A
2B
3C
4D
- zip 常常用在整合网络请求上:
比如我们想同时发送两个请求,只有当两个请求都成功后,再将两者的结果整合起来继续往下处理。这个功能就可以通过 zip 来实现。
//第一个请求
let userRequest: Observable<User> = API.getUser("me")
//第二个请求
let friendsRequest: Observable<Friends> = API.getFriends("me")
//将两个请求合并处理
Observable.zip(userRequest, friendsRequest) {
user, friends in
//将两个信号合并成一个信号,并压缩成一个元组返回(两个信号均成功)
return (user, friends)
}
.observeOn(MainScheduler.instance) //加这个是应为请求在后台线程,下面的绑定在前台线程。
.subscribe(onNext: { (user, friends) in
//将数据绑定到界面上
//.......
})
.disposed(by: disposeBag)
2.10.4 combineLatest
- 基本介绍:
- 该方法同样是将多个(两个或两个以上的)
Observable
序列元素进行合并。- 但与
zip
不同的是,每当任意一个Observable
有新的事件发出时,它会将每个Observable
序列的最新的一个事件元素进行合并。
-
图解:
- 实例 2.10.1
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<String>()
Observable.combineLatest(subject1, subject2) {
"\($0)\($1)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext(1)
subject2.onNext("A")
subject1.onNext(2)
subject2.onNext("B")
subject2.onNext("C")
subject2.onNext("D")
subject1.onNext(3)
subject1.onNext(4)
subject1.onNext(5)
运行结果:
2.10.5 withLatestFrom
- 基本介绍:
该方法将两个
Observable
序列合并为一个。每当self
队列发射一个元素时,便从第二个序列中取出最新的一个值。
图解:
实例 2.10.1
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
subject1.withLatestFrom(subject2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("A")
subject2.onNext("1")
subject1.onNext("B")
subject1.onNext("C")
subject2.onNext("2")
subject1.onNext("D")
运行结果:
1
1
2
2.10.6 switchLatest
- 基本介绍:
switchLatest
有点像其他语言的switch
方法,可以对事件流进行转换。- 比如本来监听的
subject1
,我可以通过更改variable
里面的value
更换事件源。变成监听subject2
。
-
图解:
- 实例 2.10.1
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
let variable = Variable(subject1)
variable.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("B")
subject1.onNext("C")
//改变事件源
variable.value = subject2
subject1.onNext("D")
subject2.onNext("2")
//改变事件源
variable.value = subject1
subject2.onNext("3")
subject1.onNext("E")
运行结果: