前言
上一节我们讲解了线程调度器,这个时候我们可能需要考虑一个问题。上下游工作在不同的线程,这时候收发数据无需等待,当上游发事件的速度太快, 下游处理事件的速度太慢会发生什么事情?这也就是所谓的背压(Backpressure)问题。
我们先看一个例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
Log.d(TAG, "Observable发出:"+i);
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(1000);//一秒处理一次数据
Log.d(TAG, "" + integer);
}
});
没错,我们的内存会爆掉,最后OOM,那要怎么解决这个问题呢?
不用担心,RxJava2.x已经为你想到这个问题。
Flowable和Subscriber的使用
Flowable<Integer> flowable=Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.d(TAG, "flowable:B1");
e.onNext(1);//向下游(观察者)发射内容1
Log.d(TAG, "flowable:B2");
e.onNext(2);
Log.d(TAG, "flowable:B3");
e.onNext(3);
Log.d(TAG, "flowable:B4");
e.onNext(4);
Log.d(TAG, "flowable:B5");
e.onNext(5);
e.onComplete();
}
}, BackpressureStrategy.ERROR);//增加了一个参数
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意这句代码,这里告诉上游,下游的处理能力
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
flowable.subscribe(subscriber);
我们稍微修改一下代码:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.d(TAG, "flowable:1");
e.onNext(1);//向下游(观察者)发射内容1
Log.d(TAG, "flowable:2");
e.onNext(2);
Log.d(TAG, "flowable:3");
e.onNext(3);
Log.d(TAG, "flowable:4");
e.onNext(4);
Log.d(TAG, "flowable:5");
e.onNext(5);
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3); //这里告诉上游,下游的处理能力为3
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);//上游多发事件报警告
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
运行结果报异常警告,为什么呢?因为s.request(3)告诉上游处理能力为3,上游多发事件,则直接报警告。
而多发事件的处理是由BackpressureStrategy.ERROR决定
该参数还有以下选项:
BackpressureStrategy.DROP(多发则丢弃不处理)
BackpressureStrategy.BUFFER(多发则先放到缓存)
BackpressureStrategy.LATEST(多发则只保留最后的事件)
再看一个例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0;i<100 ; i++) {
Log.d(TAG, "flowable:"+i);
e.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription=s;
Log.d(TAG, "onSubscribe");
s.request(10); //这里告诉上游,下游的处理能力为10
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
我们使用BackpressureStrategy.BUFFER把多发的事件保存到缓存里,运行代码没有警告异常。
这时候我们可以通过外部控制mSubscription.request,例如点击按钮调用mSubscription.request(10);
系统会继续运行输出onNext。
上面都是同一线程下的结果,如果是不同线程会发生什么?
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
Log.d(TAG, "flowable:"+i);
e.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(10); //这里告诉上游,下游的处理能力为10,实际上并没什么卵用
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
运行代码,下游虽然通过s.request(10)告诉上游处理能力为10,但是上游并不理会,而继续不断发送事件。这是为什么呢?是不是上游还不知道下游的处理能力?那有没有办法呢?
答案就是FlowableEmitter的requested()
再看看下面的例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; i<300 ; i++) {
//Log.d(TAG, "flowable:"+i);
Log.d(TAG,"requested:"+e.requested());
e.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3); //异步情况,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
//mSubscription.request(1);
}
@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
运行结果发现e.requested()获取到的结果是128,每调用一次onNext减1。
为什么是128?是因为异步情况下,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化。
如果是同一线程则得到结果一样哦,这里不展示(同学们自己尝试吧)
既然有e.requested(),那么好办了
我们稍微修改一下程序:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
boolean flag;
for (int i = 0; ; i++) { //无限循环发事件
Log.d(TAG,"requested:"+e.requested());
flag = false;
while (e.requested()==0){
if (!flag){
Log.d(TAG,"下游不能处理,暂不能发送");
flag=true;
}
}
e.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
private Subscription mSubscription;
@Override
public void onSubscribe(Subscription s) {
mSubscription=s;
Log.d(TAG, "onSubscribe");
s.request(3); //异步情况,上下游不同线程,这里告诉上游处理能力为3,实际是128+3,128是由Flowable初始化
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
try {
Thread.sleep(1000);
mSubscription.request(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
//Log.d(TAG, "onError");
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
运行程序发现,首次上游调用了128次事件,随后下游处理96次后,上游再次调用96次事件,这样一直循环下去。可见程序很有规律,没有出现OOM。
为什么会下游处理96次后上游会继续调用事件?
这是因为Flowable内部实现了s.request请求,也就是说下游处理了一部分事件后,Flowable内部自动调用s.request(95),自动增加处理能力。