所有事物都是序列
Observable可以用于描述元素异步产生的序列。
如何创建序列
这里介绍一下创建序列最基本的方法,例如,我们创建一个 [0, 1, ... 8, 9] 的序列:
let numbers: Observable<Int> = Observable.create { observer -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onNext(4)
observer.onNext(5)
observer.onNext(6)
observer.onNext(7)
observer.onNext(8)
observer.onNext(9)
observer.onCompleted()
return Disposables.create()
}
创建序列最直接的方法就是调用 Observable.create,然后在构建函数里面描述元素的产生过程。 observer.onNext(0) 就代表产生了一个元素,他的值是 0。后面又产生了 9 个元素分别是 1, 2, ... 8, 9 。最后,用 observer.onCompleted() 表示元素已经全部产生,没有更多元素了。
你可以用这种方式来封装功能组件,例如,闭包回调:
typealias JSON = Any
let json: Observable<JSON> = Observable.create { (observer) -> Disposable in
let task = URLSession.shared.dataTask(with: ...) { data, _, error in
guard error == nil else {
observer.onError(error!)
return
}
guard let data = data,
let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves)
else {
observer.onError(DataError.cantParseJSON)
return
}
observer.onNext(jsonObject)
observer.onCompleted()
}
task.resume()
return Disposables.create { task.cancel() }
}
在闭包回调中,如果任务失败,就调用 observer.onError(error!)。如果获取到目标元素,就调用 observer.onNext(jsonObject)。由于我们的这个序列只有一个元素,所以在成功获取到元素后,就直接调用 observer.onCompleted() 来表示任务结束。最后 Disposables.create { task.cancel() } 说明如果数据绑定被清除(订阅被取消)的话,就取消网络请求。
这样一来我们就将传统的闭包回调转换成序列了。然后可以用 subscribe 方法来响应这个请求的结果:
json
.subscribe(onNext: { json in
print("取得 json 成功: \(json)")
}, onError: { error in
print("取得 json 失败 Error: \(error.localizedDescription)")
}, onCompleted: {
print("取得 json 任务成功完成")
})
.disposed(by: disposeBag)
这里subscribe后面的onNext,onError, onCompleted 分别响应我们创建 json 时,构建函数里面的onNext,onError, onCompleted 事件。我们称这些事件为 Event:
Event-事件
public enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
- next - 序列产生了一个新的元素
- error - 创建序列时产生了一个错误,导致序列终止
- completed - 序列的所有元素都已经成功产生,整个序列已经完成
你可以合理的利用这些 Event 来实现业务逻辑。
特征序列
Single
Single是Observable的另一个版本。但是它只能发出一个元素,或者产生一个error事件。
- 发出一个元素,或者一个error事件
- 不会共享状态变化
例如执行Http请求,返回应答或错误。你也可以用Single描述只有一个元素的序列。
如何创建Single?
创建:
func getRepo(_ repo: String) -> Single<[String: Any]> {
return Single<[String: Any]>.create { single in
let url = URL(string: "https://api.github.com/repos/\(repo)")!
let task = URLSession.shared.dataTask(with: url) {
data, _, error in
if let error = error {
single(.error(error))
return
}
guard let data = data,
let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
let result = json as? [String: Any] else {
single(.error(DataError.cantParseJSON))
return
}
single(.success(result))
}
task.resume()
return Disposables.create { task.cancel() }
}
}
使用:
getRepo("ReactiveX/RxSwift")
.subscribe(onSuccess: { json in
print("JSON: ", json)
}, onError: { error in
print("Error: ", error)
})
.disposed(by: disposeBag)
SingleEvent:
public enum SingleEvent<Element> {
case success(Element)
case error(Swift.Error)
}
- success - 产生一个单独的元素
- error - 产生一个错误
你同样可以对 Observable 调用 .asSingle() 方法,将它转换为 Single。
Completable
Completable只能产生一个completed事件或者一个error事件。
- 发出0个元素
- 产生一个completed事件或者一个error事件
- 不会共享状态变化
Completable适用于那种你只关心任务是否完成,而不需要在意任务的返回值的场景下。
如何创建 Completable?
创建:
func cacheLocally() -> Completable {
return Completable.create { completable in
// Store some data locally
...
...
guard success else {
completable(.error(CacheError.failedCaching))
return Disposables.create {}
}
completable(.completed)
return Disposables.create {}
}
}
使用:
cacheLocally()
.subscribe(onCompleted: {
print("Completed with no error")
}, onError: { error in
print("Completed with an error: \(error.localizedDescription)")
})
.disposed(by: disposeBag)
CompletableEvent:
public enum CompletableEvent {
case error(Swift.Error)
case completed
}
- completed - 产生完成事件
- error - 产生一个错误
Maybe
Maybe只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。
- 发出一个元素或者一个 completed 事件或者一个 error 事件
- 不会共享状态变化
如果你遇到那种可能需要发出一个元素,又可能不需要发出时,就可以使用 Maybe。
如何创建Maybe?
创建:
func generateString() -> Maybe<String> {
return Maybe<String>.create { maybe in
maybe(.success("RxSwift"))
// OR
maybe(.completed)
// OR
maybe(.error(error))
return Disposables.create {}
}
}
使用:
generateString()
.subscribe(onSuccess: { element in
print("Completed with element \(element)")
}, onError: { error in
print("Completed with an error \(error.localizedDescription)")
}, onCompleted: {
print("Completed with no element")
})
.disposed(by: disposeBag)
可以对 Observable 调用 .asMaybe() 方法,将它转换为 Maybe。
Driver
Driver(司机?)主要是为了简化UI层的代码。主要有以下特征:
- 不会产生 error 事件
- 一定在 MainScheduler 监听(主线程监听)
- 共享状态变化
这些都是驱动 UI 的序列所具有的特征。
为什么要使用 Driver ?
我们举个例子:
let results = query.rx.text
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
}
results
.map { "\($0.count)" }
.bind(to: resultCount.rx.text)
.disposed(by: disposeBag)
results
.bind(to: resultsTableView.rx.items(cellIdentifier: "Cell")) {
(_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.disposed(by: disposeBag)
这段代码的主要目的是:
- 取出用户输入稳定后的内容
- 向服务器请求一组结果
- 将返回的结果绑定到两个 UI 元素上:tableView 和 显示结果数量的label
存在的问题:
- 如果 fetchAutoCompleteItems 的序列产生了一个错误(网络请求失败),这个错误将取消所有绑定,当用户输入一个新的关键字时,是无法发起新的网络请求。
- 如果 fetchAutoCompleteItems 在后台返回序列,那么刷新页面也会在后台进行,这样就会出现异常崩溃。
- 返回的结果被绑定到两个 UI 元素上。那就意味着,每次用户输入一个新的关键字时,就会分别为两个 UI 元素发起 HTTP 请求,这并不是我们想要的结果。
一个更好的方案是这样的:
let results = query.rx.text
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
.observeOn(MainScheduler.instance) // 结果在主线程返回
.catchErrorJustReturn([]) // 错误被处理了,这样至少不会终止整个序列
}
.share(replay: 1) // HTTP 请求是被共享的
results
.map { "\($0.count)" }
.bind(to: resultCount.rx.text)
.disposed(by: disposeBag)
results
.bind(to: resultsTableView.rx.items(cellIdentifier: "Cell")) {
(_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.disposed(by: disposeBag)
在一个大型系统内,要确保每一步不被遗漏是一件不太容易的事情。所以更好的选择是合理运用编译器和特征序列来确保这些必备条件都已经满足。
以下是使用Driver优化后的代码:
let results = query.rx.text.asDriver() // 将普通序列转换为 Driver
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
.asDriver(onErrorJustReturn: []) // 仅仅提供发生错误时的备选返回值
}
results
.map { "\($0.count)" }
.drive(resultCount.rx.text) // 这里改用 `drive` 而不是 `bindTo`
.disposed(by: disposeBag) // 这样可以确保必备条件都已经满足了
results
.drive(resultsTableView.rx.items(cellIdentifier: "Cell")) {
(_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.disposed(by: disposeBag)
首先第一个 asDriver 方法将 ControlProperty 转换为 Driver
然后第二个变化是:
.asDriver(onErrorJustReturn: [])
任何可被监听的序列都可以被转换为 Driver,只要他满足 3 个条件:
- 不会产生 error 事件
- 一定在 MainScheduler 监听(主线程监听)
- 共享状态变化
那么要如何确定条件都被满足?通过 Rx 操作符来进行转换。asDriver(onErrorJustReturn: []) 相当于以下代码:
let safeSequence = xs
.observeOn(MainScheduler.instance) // 主线程监听
.catchErrorJustReturn(onErrorJustReturn) // 无法产生错误
.share(replay: 1, scope: .whileConnected)// 共享状态变化
return Driver(raw: safeSequence) // 封装
最后使用 drive 而不是 bindTo
drive 方法只能被 Driver 调用。这意味着,如果你发现代码所存在 drive,那么这个序列不会产生错误事件并且一定在主线程监听。这样你可以安全的绑定UI元素。
ControlEvent
ControlEvent 专门用于描述 UI 控件所产生的事件,它具有以下特征:
- 不会产生 error 事件
- 一定在 MainScheduler 订阅(主线程订阅)
- 一定在 MainScheduler 监听(主线程监听)
- 共享状态变化