RxJava2.x-Flowable创建

一、Flowable创建

 public Flowable<Integer> getRxJavaFlowableData() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);

                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);
            }
        }, BackpressureStrategy.ERROR);
    }
 public void rxJavaFlowableCreateExample() {
        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                    }
                });
    }

日志

08-23 10:47:55.032 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 10:47:55.032 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 10:47:55.034 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-23 10:47:55.034 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
    getRxJavaFlowableData---:main--:3
08-23 10:47:55.035 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4

总结

1、当下游没有通知上游,自己有处理数据的能力时,上游发送数据会直接报错,也就是著名的背压错误MissingBackpressureException
2、被观察者则继续发送数据到发送完毕

二、Subscriber发送能处理数据的能力时


    public void rxJavaFlowableCreateExample() {

        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription  subscription;
                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription= s;
                        subscription.request(Integer.MAX_VALUE);
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                    }
                });
    }

日志

08-23 11:33:56.631 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 11:33:56.632 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:3
    rxJavaFlowableCreateExample--:main-onComplete-:onComplete
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4

总结

1、当下游通知上游,自己有处理数据的能力时,只要上游发送的数据不超过下游能力,就不会报错

日志(当request的数据是2时)

08-23 11:36:58.318 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 11:36:58.318 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
08-23 11:36:58.321 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-23 11:36:58.321 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4

2、 emitter.onComplete(); 发送的complete事件不算request数据中

三、异步创建Flowable

  public void rxJavaFlowableCreateExample() {
        model.getRxJavaFlowable128Data()
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription subscription;

                    @Override
                    public void onSubscribe(Subscription s) {
//                        subscription= s;
//                        subscription.request(3);
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                    }
                });
    }

日志

08-23 16:42:44.141 21570-21570/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 16:42:44.145 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:0
08-23 16:42:44.146 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:1
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:2
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:3
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:4
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:5
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:6
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:7
08-23 16:42:44.147 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:8
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:9
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:10
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:11
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:12
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:13
................................................................................................................
08-23 16:42:44.154 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:123
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:124
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127

总结

1、异步的情况下,上游被观察者和下游观察者之前有个水缸,大小128
2、所以,当上游发送数据,下游不接受的情况下,能发送128条数据不报MissingBackpressureException

日志(当发送129条数据)

................................................................................................................
  getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:128
08-23 16:49:16.869 24202-24202/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

四、验证requested大小

1、当同一个线程下时

 public Flowable<Integer> getRxJavaFlowableData() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);
            }
        }, BackpressureStrategy.ERROR);
    }
public void rxJavaFlowableSizeExample() {
        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 14:29:06.026 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:29:06.026 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:1
08-24 14:29:06.028 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onError-:io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-24 14:29:06.028 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:2
    getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:3
    getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:onComplete
    getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:4

总结

1、当在同一个线程时,当下游观察者未请求数据时,requested为0

  public void rxJavaFlowableSizeExample() {
        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription s;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(100);
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);

                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 14:32:15.405 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:32:15.405 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:100
    getRxJavaFlowableData---:main--:1
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:99
    getRxJavaFlowableData---:main--:2
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:98
    getRxJavaFlowableData---:main--:3
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
    getRxJavaFlowableData---:main--:onComplete
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
    getRxJavaFlowableData---:main--:4

总结

1、下游观察者 s.request(100),改变requested值

2、当在异步线程时

   public Flowable<Integer> getRxJavaFlowableData() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);
            }
        }, BackpressureStrategy.ERROR);
    }

    public Flowable<Integer> getRxJavaFlowable128Data() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 129; i++) {
                    LogUtils.debug(TAG, "getRxJavaFlowable128Data---:" + Thread.currentThread().getName() + "--:" + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR);
    }
  public void rxJavaFlowableSizeExample() {
        model.getRxJavaFlowableData()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription s;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(100);
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);

                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 14:48:05.367 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:48:05.374 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:127
08-24 14:48:05.376 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:126
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:onComplete
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:4

总结

1、requested 是128 而不是100,这是被观察者里面有个''水缸'' ,大小是128
2、被观察者每发送一条数据,requested减少1
3、onComplete(),onError()不减少requested值

3、当在异步线程时,什么情况下触发被观察者继续发送数据

 public Flowable<Integer> getRxJavaFlowableRealExample() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                         if (emitter.isCancelled()) {
                            break;
                        }
                    while (emitter.requested() == 0) {
                     
                    }
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + i);
                    emitter.onNext(i);
                }

            }
        }, BackpressureStrategy.ERROR);
    }
 public void rxJavaFlowableRealExample() {
        model.getRxJavaFlowableRealExample()
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new FlowableSubscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
//                        s.request(96);
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 16:36:38.599 6546-6546/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:36:38.606 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:36:38.607 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
 getRxJavaFlowableData---:RxCachedThreadScheduler-1--:122
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:5
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:123
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:4
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:124
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
08-24 16:36:38.621 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127

总结

1、当观察者没有request时,只是被观察者把128条数据发送完毕,观察者并未接受到数据
2、当观察者request(95)时,观察者接受到95条数据,但并没有触发被观察者继续发送数据,也就是并未改变水缸的size值即requested

日志

08-24 16:20:38.610 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:20:38.617 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:20:38.619 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:29
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:1
08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:98
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:2
08-24 16:20:38.625 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:30
08-24 16:20:38.625 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:3
    rxJavaFlowableRealExample--:main-onNext-:4
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:93
08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:100
08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:27
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:101
08-24 16:20:38.639 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:26
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
   getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127

3、当观察者request(96)时,观察者接受到96条数据,同时触发被观察者继续发送数据,改变水缸的size值

日志

08-24 16:23:23.690 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:23:23.696 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
08-24 16:23:23.697 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:97
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:29
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:31
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:30
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:96
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:31
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:32
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:95
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:33
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:94
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:34
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:32
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:93
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:94
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:33
08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:95
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:32//①此时requested值为32
08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:95//②此时观察者又处理了一条数据,到达处理总值96
08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:96//③此时被观察者又发送了一条数据
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127//④此时取得的requested值为32+96-1 为何需要减去1:(刚才又发送了一条数据)
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:221
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:222
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:223

08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:30
08-24 15:38:47.430 31071-31071/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:95
08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:98
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:99

4、异步线程-consumer

···
public Flowable<Integer> getRxJavaFlowableRealExample() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
if (emitter.isCancelled()) {
break;
}
while (emitter.requested() == 0) {
}
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "--:" + i);
emitter.onNext(i);
}

        }
    }, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableConsumeExample() {
    model.getRxJavaFlowableRealExample()
            .subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .compose(RxLifeCycleUtils.<Integer>bindUntilEvent(view, ActivityEvent.DESTROY))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    LogUtils.error(TAG, "rxJavaFlowableConsumeExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
                }
            });

}

···

日志

....................................................
2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67323
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67324
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:99
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67331
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67333
2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67339
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67342
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67344
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:78
2019-02-12 17:53:21.382 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67346

总结

当观察者是consume时,会持续请求request

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容