前言
上一节中,我们一次性发送128个事件没有任何问题,但是一旦超过128个立马抛出MissingBackpressureException异常,提示你上游发送事件太多,下游处理不过来,如何解决?这一节就来解决下。
1. 解决方法
思路一:
发送128个事件没问题是因为Flowable内部有一个大小为128的水缸,超过则会溢出,既然水缸这么小,就换一个大点的水缸试下,代码如下:
/**
* BUFFER:大的水缸,
* 让上游无限的发送事件,下游一个也不处理,结果容易造成OOM
*/
public static void demo1(){
// 创建一个上游:Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) { // 无限for循环
Log.e("TAG" , "emit " + i) ;
emitter.onNext(i);
}
}
} , BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()) // 让上游的for循环在子线程中执行
.observeOn(AndroidSchedulers.mainThread()) // for循环执行完后切回到主线程
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e("TAG" , "subscribe") ;
mSubscription = s ;
}
@Override
public void onNext(Integer integer) {
Log.e("TAG" , "next" + integer) ;
}
@Override
public void onError(Throwable t) {
Log.e("TAG" , "error") ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
}) ;
}
换大的水缸,在上游使用无限for循环发射数据,下游一个也不处理,导致OOM;
思路二:
之前解决Observable如何解决上游发送事件太快,有两种方法:从数量和速度两个方面解决,同样的,Flowable也有对应的两种方法:BackpressureStrategy.DRAP和BackpressureStrategy.LATEST。
- Drap:直接把存不下的事件丢掉;
- Latest:只保留最新的数据;
Drop代码如下:
/**
* Drop:丢弃存储不下的事件
*/
public static void demo2(){
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000; i++) {
emitter.onNext(i);
}
}
} , BackpressureStrategy.DROP).subscribeOn(Schedulers.io()) // 让上游的for循环在子线程中执行
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程执行下边的操作
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e("TAG" , "subscribe -> ") ;
mSubscription = s ;
s.request(128);
}
@Override
public void onNext(Integer integer) {
Log.e("TAG" , "next -> " + integer) ;
}
@Override
public void onError(Throwable t) {
Log.e("TAG" , "error") ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
}) ;
}
效果如如下:
- 结果就是:第一次调用 s.request(128); 方法时候,下游收到 0-127共128个事件,第二次调用 s.request(128)就不确定,上游在一直发送事件,内存正常,drop丢弃了存不下事件;
latest代码如下:
让上游无限发送事件,把Subscription保存起来,方便在外边调用 s.request(128)方法
/**
* latest:只保留最新数据
*/
public static void demo3(){
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000; i++) {
emitter.onNext(i);
}
}
} , BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()) // 让上游的for循环在子线程中执行
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程中执行下边操作
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e("TAG" , "subscribe") ;
mSubscription = s ;
s.request(128);
}
@Override
public void onNext(Integer integer) {
Log.e("TAG" , "next -> " + integer) ;
}
@Override
public void onError(Throwable t) {
Log.e("TAG" , "error") ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
}) ;
}
interval操作符:
/**
* interval操作符
*/
public static void demo4(){
Flowable.interval(1 , TimeUnit.MICROSECONDS)
.onBackpressureDrop() // 添加背压策略
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.e("TAG" , "subscribe") ;
mSubscription = s ;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.e("TAG" , "next -> " + aLong) ;
}
@Override
public void onError(Throwable t) {
Log.e("TAG" , "error") ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
}) ;
}
八篇文章所有代码已上传至github:
https://github.com/shuai999/RxJava2Demo.git