Rxjava(五)之背压模式

前言

我们在上面的篇幅讲解了Rxjava的操作符的使用,那么这篇文章我们将讲解Rxjava的背压模式。
在Rxjava1.0的时候还没有背压模式,当我们被观察者大量发送事件,远远大于观察者处理事件的速度的时候,会造成内存溢出。这时候背压模式就产生了。

背压模式的代码实现

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 0; i < 200; i++) {
                    e.onNext(i);
                }
                e.onComplete();
            }
        },
        BackpressureStrategy.ERROR
//        BackpressureStrategy.BUFFER
//        BackpressureStrategy.DROP
//        BackpressureStrategy.LATEST
        ).
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.currentThread().sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.d(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

这里有一个BackpressureStrategy参数,一共有四种MODE。
BackpressureStrategy.ERROR
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会抛出异常。
BackpressureStrategy.BUFFER
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会等待下游处理。
BackpressureStrategy.DROP
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会丢弃多余事件。
BackpressureStrategy.LATEST
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,只会存储128个事件。
这里需要注意的是,缓存池大小就是128。

我们运行上述代码,会发现程序直接报错了(onError: create: could not emit value due to lack of requests)。因为我们采用了BackpressureStrategy.ERROR模式,并且发送的事件大于128。
我们将发送的事件改为100,然后在被观察者的onSubscribe方法中请求了三次,这时就会打印三次。结果如下图:

onNext: 0
onNext: 1
onNext: 2

然后我们在请求数据的时候新建一个线程:

s.request(3);
new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(5000);
            Log.d(TAG,"Thread request");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        s.request(3);
    }
}).start();
onNext: 0
onNext: 1
onNext: 2
Thread request
onNext: 3
onNext: 4
onNext: 5

这时候从结果可以看出,当我们采用异步线程时,我们可以在线程中请求数据,并且可以多次请求。
注意:异步线程切换时,被观察者的线程只会切换一次,而观察者的线程每次都会切换。

总结:
Flowable就是Observable的升级版,用法大致相同,他的主要不同点有以下几点:
1.Flowable在create方法中多了一个BackpressureStrategy参数,一共有四种MODE。
2.Observable有一个切断被观察者和观察者dispose方法,而在Flowable方法中这是采用request方法请求被观察者的事件。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容