Hi,前两天有点事情耽搁了,抱歉。
我把思路梳理了一下,先介绍 Backpressure Strategy
会有较大的知识代沟,所以今天我们就从 异常处理
开始,明天介绍 ReactiveX
中的 并发
。之后再回来谈 Backpressure Strategy
。
本节代码中的 observer 就是第二节中的
OK,先来看个例子。
// 8.1.kt
import io.reactivex.Observable
inline fun Any.toIntOrError(): Int = toString().toInt()
// 把任意的对象转化为数字, 肯定有转化不了的(比如"haha"), 此时 toIntOrError 会抛出异常
// 这是 Kotlin 中的 `扩展方法` , 即, 可以给已有的类增加方法
fun main(args: Array<String>) {
Observable.just(1, 2, "Errr", 3) // 执行到 "Errr" 处会抛出异常, 那么 3 还会不会被传递下去呢?
.map { it.toIntOrError() }
.subscribe(observer)
}
输出
New Subscription
Next 1
Next 2 // Next 3 没有被输出
Error Occured For input string: "Errr" // 执行了 onError 方法
代码主旨
当 Observable
中存在异常时会触发 observer
的 onError
, 订阅
会立刻终止。所以 Next 3 没有被输出。
它甚至都没给我们一个改正错误的机会 : (
怎么办?用 Error Handling Operator
Error Handling Operator
种类
在 ReactiveX
规范中一共有两种
- Catch
- Retry
这两种抽象 Operator
在 RxKotlin
中常用的有五种具体实现
- Catch
- onErrorReturn
- onErrorResumeNext
- Retry
- retry
- retry
- retry
这三个 retry
有什么区别我下面介绍
Catch
onErrorRetur
用一个指定值替换异常, 并且取消订阅
// 8.2.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just(1, 2, "Errr", 3)
.map { it.toIntOrError() }
.onErrorReturn { -1 }
.subscribe(observer)
}
输出
New Subscription
Next 1
Next 2
Next -1
// 并没有输出 Next 3
All Completed
// 执行了 observer 的 onComplete 方法
onErrorResumeNext
在异常值处取消对原 Observable 的订阅, 并订阅另一个 Observable
下图中的 Catch 是 onErrorResumeNext
, 并不适用于 onErrorReturn
// 8.3.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just(1, 2, "Errr", 3)
.map { it.toIntOrError() }
.onErrorResumeNext(Observable.range(10, 2)) // 即订阅了 Observable.just(10,11)
.subscribe(observer)
}
输出
New Subscription
Next 1
Next 2
Next 10
Next 11
// 并没有输出 Next 3
All Completed
// 执行了 observer 的 onComplete 方法
Retry
retry ---- 没有参数
上游有异常就重新订阅, 直到正常为止
这里不举栗子了, 注意避免死循环....
retry ---- 接受 1 个 参数(times
: 可以尝试重新订阅的最大次数)
上游有异常会重新订阅, 直到达到 times
。如果仍然没有恢复,则向下游抛出最后一次订阅产生的异常
// 8.4.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just(1, 2, "Errr", 3)
.map { it.toIntOrError() }
.retry(2) // 最多可以尝试重新订阅 2 次
.subscribe(observer)
}
输出
New Subscription
Next 1 \_ 正常程序流程
Next 2 /
Next 1 \_ 第一次重复尝试
Next 2 /
Next 1 \_ 第二次重复尝试
Next 2 / // 两次还不行就向下游抛出最后一次订阅产生的异常
Error Occured For input string: "Errr"
retry ---- 接受 1 个参数(predicate function
: 判断函数)
这个判断函数的类型是 (Int, Throwable)->Boolean
- 第一个参数: 目前已经重新订阅的次数
- 第二个参数: 上游抛出的异常
- 返回值: 如果为
true
则继续重新订阅, 否则, 向下游抛出最后一次订阅产生的异常
下面的例子我看着都牵强....凑合着看吧, 但是还是没超过 15 行 : )
// 8.5.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
var retryCount = 0
Observable.just(1, 2, "Errr", 3)
.map { it.toIntOrError() }
.retry { _, _ ->
(++retryCount) < 3
}
.subscribe(observer)
}
输出
和 8.4.kt
相同
OK, 今天到这里。
明天我们来说 ReactiveX
中的 并发
Good night ~
推荐阅读:
Rxjava2入门教程五:Flowable背压支持
2018.5.16 更新
距离第一稿已经过去了两个月,目前做的项目中对并发没有要求,实在是写不下去了 ....原谅我吧,就到这里
祝顺
StandbyMe
RxKotlin 例子不超过15行教程 1----环境配置与初体验
RxKotlin 例子不超过15行教程 2----Observable Observer 与 Subscribe 简介
RxKotlin 例子不超过15行教程 3----Observable 的创建
RxKotlin 例子不超过15行教程 4----Observer Subscribe 与 Hot/Cold Observable
RxKotlin 例子不超过15行教程 5----Subject
RxKotlin 例子不超过15行教程 6----Operator 与 Marble Diagram
RxKotlin 例子不超过15行教程 7----Backpressure Flowable 与 Subscriber 简介