Rxjava2.0笔记-003-操作符-搭配Retrofit

1. 使用interval进行轮询操作,类似于请求用户消息(之前一直使用handler)

 /**
     * 轮询查询接口-使用操作符interval
     * 此处主要展示无限次轮询,若要实现有限次轮询,仅需将interval()改成intervalRange()即可
     */
    private void init() {
        /**
         * 参数说明:
         * 参数1==第一次延迟时间,1秒后发送查询请求
         * 参数2==间隔时间
         * 参数3==实践单位
         * 该例子发送的事件特点:延迟2s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
         */
        Observable.interval(2, 1, TimeUnit.SECONDS)
                /**
                 * 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
                 * 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
                 */
                .doOnNext(aLong -> {
                    KLog.d(TTAG, "第" + aLong + "次查询");

                    retrofitApi.getCall()
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(new Observer<Translation>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                    //切断
                                    d.dispose();
                                }
                                @Override
                                public void onNext(Translation translation) {
                                    translation.show();
                                }
                                @Override
                                public void onError(Throwable e) {
                                    KLog.d(TTAG, "请求失败了:失败原因是:" + e.getMessage());
                                }
                                @Override
                                public void onComplete() {
                                    KLog.d(TTAG, "本次请求结束了");
                                }
                            });

                }).subscribe(aLong -> {

            KLog.d(TTAG, "接收到请求,这是第" + aLong + "次");

        });

    }

2. 变换操作符,对时间序列进行加工处理,使其转变成不同的事件/序列

常用变换操作符有:

  • map()
  • flatMap()
  • concatMap()
  • buffer()

2.1 Map()

作用:对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件,
即, 将被观察者发送的事件转换为任意的类型事件。

例子如下:使用map()操作符使事件参数从整形变换成字符串类型

  Observable.create((ObservableOnSubscribe<Integer>) emitter -> {

            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }).map(integer ->
                "这是发送的第" + integer + "条消息")
                .subscribe(s ->
                        KLog.d(TTAG, "接收事件::" + s));

2.2 flatMap()

作用:将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送
无序的将被观察者发送的整个事件序列进行变换

 private void flatMap() {

        Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }).flatMap(integer -> {
            final ArrayList<String> strings = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                strings.add("我是事件"+integer+"拆分后的子事件"+i);
            }
            return Observable.fromIterable(strings);

        }).subscribe(s -> {

            KLog.d(TTAG, s);
        });

    }

2.3 ConcatMap()

作用:
类似于flatMap(),不过是有序的

新合并生成的事件序列顺序是有序的,即 严格按照旧序列发送事件的顺序

日志如下:

(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件1拆分后的子事件0
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件1拆分后的子事件1
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件1拆分后的子事件2
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件2拆分后的子事件0
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件2拆分后的子事件1
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件2拆分后的子事件2
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件3拆分后的子事件0
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件3拆分后的子事件1
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件3拆分后的子事件2

实例:接口嵌套

 /**
     * 接口合并,实例,注册登录
     */
    private void concatMap() {

        retrofitApi.getCall().subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(translation -> {
                    translation.show();
                }).observeOn(Schedulers.io())//注册线程结束,作为新的观察者,切换到io此线程(理应为设置subscribeOn(Schedulers.io()))
                //作为观察者,下面又有新的观察者,他就作为老的观察者,也就是新的被观察者,所以调控线程用observeOn(Schedulers.io())
                .concatMap(translation ->
                        //添加注册失败是的判断返回空对象
                        null != translation ? retrofitApi.getCall() : Observable.empty())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(translation -> {
                    translation.show();
                }, throwable -> {
                    KLog.d(TTAG, throwable.getMessage());
                });
    }


2.4 buffer()

作用:定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送

 /**
     * buffer 操作符接受两个参数,buffer(count,skip),
     * 作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个  Observable 。
     * <p>
     * 意思就是取count个,发射之后,重头开始跳过skip个,在选count个发射,一直到最后一个
     */
    private void buffer() {

        Observable.just(1,2,3,4,5,6,7)
                .buffer(3,1)//设置缓存区大小==每次从被观察者中获取的事件数量
        //步长:每次获取新事件数量
        .subscribe(integers -> {

            KLog.d(TTAG, "缓存区数量"+integers.size());
            for (Integer integer : integers) {
                KLog.d(TTAG, "事件"+integer);
            }
        });
    }

3. 组合操作符

3.1 concat()以及concatArray()

作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行执行

二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个

  /**
     * 该类型的操作符的作用 = 组合多个被观察者
     * 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
     *concat()
     * concatArray()
     */
    private void concat() {
        
        Observable.concat(Observable.just(1,2)//发射者数量不超过4个
        ,Observable.just(3,4)
        ,Observable.just(7,8))
                .subscribe(integer -> {
                });
        
        
        Observable.concatArray(Observable.just(1,2)//被观察者数量不受限制
        ,Observable.just(4,5)
        ,Observable.just(7,8)
        ,Observable.just(3,6))
                .subscribe(integer -> {
                    
                });
    }

3.2 merge()以及mergeArray()

作用:组合多个被观察者一起发送数据,合并后 按时间线并行执行

区别为:merge()组合被观察者数量小于等于4,合并后按时间线执行

 /**
     * 合并发射者,按时间线执行
     */
    private void merge() {

        Observable.merge(
                //延迟发送操作符
                //从0开始发送,工发送3个数据,第一次发件延迟时间1秒。间隔时间1s
                //
                Observable.intervalRange(0,3,1,1,TimeUnit.SECONDS),
                Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS)
        ).subscribe(aLong -> {

        });

    }

3.3 concatDelayError()以及mergeDelayError()

作用:使用conat以及merge操作符时,如果某个发射者发出error()时间,则会总结整个流程,我们希望onError()事件推迟到其他发射者都发送完时间之后后才会触发,即可使用concatDelayError()以及mergeDelayError()

 /**
     * 使用conat以及merge操作符时,如果某个发射者发出error()时间,则会总结整个流程,
     * 我们希望onError()事件推迟到其他发射者都发送完时间之后后才会触发,
     * 即可使用` concatDelayError()`以及`mergeDelayError()`
     */
    private void concatArrayDelayErrorTest() {

        Observable.concatArrayDelayError(Observable.create(emitter -> {

            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
            emitter.onError(new NullPointerException());
            emitter.onComplete();

        }),Observable.just(4,5,6))
                .subscribe(integer -> {
                });
    }

4. 合并多个事件

作用:该类型的操作符主要是对多个发射者中的事件进行合并处理

4.1 zip()操作符

作用:合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送

 /**
     * zip,可以用于接口合并
     * 操作符使用例子
     * zip 专用于合并事件,该合并不是连接(连接操作符后面会说),
     * 而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。
     * <p>
     * zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,
     * 组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的
     */
    public static void useZip() {

        Observable.zip(getStringObservable(), getIntegerObservable(),
                (s, integer) -> s + integer).subscribe(s -> KLog.d(TTAG, "新的消息字段是" + s));
    }

    private static Observable<String> getStringObservable() {
        return Observable.create((ObservableOnSubscribe<String>) e -> {
            if (!e.isDisposed()) {
                aaa.append("asd");
                e.onNext("A");
                aaa.append("asd");
                e.onNext("B");
                aaa.append("asd");
                aaa.append("zxczxc");
                e.onNext("C");
            }
        }).subscribeOn(Schedulers.io());
    }

    private static Observable<Integer> getIntegerObservable() {
        return Observable.create((ObservableOnSubscribe<Integer>) e -> {
            if (!e.isDisposed()) {

                e.onNext(1);
                aaa.append("--" + 1);

                e.onNext(2);
                aaa.append("--" + 2);

                e.onNext(3);
                aaa.append("--" + 3);

                e.onNext(4);
                aaa.append("--" + 4);

                e.onNext(5);
                aaa.append("--" + 5);
            }
        }).subscribeOn(Schedulers.io());
    }

4.2 combineLatest()

作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据

 /**
     * 当两个Observables中的任何一个发送了数据后,
     * 将先发送了数据的Observables 的最新(最后)一个数据 与
     * 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
     */
    private void combineLatest() {

        Observable.combineLatest(
                Observable.just(1L, 2L, 3L, 4L, 5L),
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                (aLong, aLong2) -> {
                    KLog.d(TTAG, aLong);
                    KLog.d(TTAG, aLong2);
                    return aLong+aLong2;
                }
        ).subscribe(aLong -> {
            KLog.d(TTAG, aLong);
        });

    }

事件接收结果是:

3  0
3
3  1
4
3  2
5

4.3 reduce()

作用:把被观察者需要发送的事件聚合成1个事件 & 发送

/**
     * 每次用一个方法处理一个值,可以有一个 seed 作为初始值
     */
    public static void useReduce() {

        Observable.just(1, 2, 3, 4)
                .reduce((integer, integer2) -> {
                    KLog.d(TTAG, integer + "");
                    KLog.d(TTAG, integer2 + "");
                    
                    //是所有事件相加
                    return integer + integer2;
                }).subscribe(integer -> KLog.d(TTAG, integer + ""));
    }

4.4 collect()

作用:将被观察者Observable发送的时间收集到一个数据结构里面

  /**
     * 将被观察者Observable发送的数据事件收集到一个数据结构里
     */
    private void collect() {

        Observable.just(1,2,3,4,5,6,7,8)
                .collect((Callable<ArrayList<Integer>>) () ->
                        new ArrayList<>(),
                        (integers, integer) -> {
                    integers.add(integer);
                }).subscribe(integers ->
                KLog.d(TTAG, integers.toString()));

    }

5. 发送事件前追加发送事件

5.1 startWIth()以及 startWithArray()

作用: 在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者

  /**
     *  在一个被观察者发送事件前,追加发送一些数据
     *  后追加,先调用,组合模式
     */
    private void startWith() {

        Observable.just(2,3,4,5)
                .startWith(0)
                .startWith(Observable.just(7,8))
                .startWithArray(1)
                .subscribe(integer -> {

                });
    }

6. 统计发送事件数量

6.1 count()

作用:统计被观察者发送事件的数量

/**
     * 统计被观察者发送事件的数量
     */
    private void count() {

        Observable.just(1,2,3,4)
                .count()
                .subscribe(aLong -> {
                    KLog.d(TTAG, "发送事件数量是:"+aLong);

                });
    }

7. 在事件的生命周期中操作

在事件发送以及接收的整个生命周期中进行操作,如发送时间前的初始化,发送事件后的回调请求等

7.1 操作符 do( )

作用:在某个时间的生命周期中调用

类型分为:

  • 当Observable每发送一次数据事件就会调用1次

    • doOnEach()
    • 含onNext()、onError()和onCompleted()
  • Next事件

    • 执行Next事件前调用- doOnNext( )
    • 执行Next事件后调用- doAfterNext()
  • 发送事件完毕后调用

    • 发送错误事件后- doOnError()
    • 正常发送事件完毕后- doOnCompleted()
    • 无论正常发送完毕或者异常终止-doOnTerminate()
    • 最后执行: doFinally()
  • 订阅相关:

    • 观察者订阅时调用-doOnSubscribe()
    • 观察者取消订阅时调用- doOnUnsubscribe()

具体代码如下:

 /**
     * do操作符
     */
    private void useDo() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Throwable("发送错误"));
            }
            //1. 当Observable每发送1次数据事件就会调用1次
        }).doOnEach(new Consumer<Notification<Integer>>() {
            @Override
            public void accept(Notification<Integer> integerNotification) throws Exception {

                KLog.d(TTAG, "doOnEach:" + integerNotification);
            }
            // 2. 执行Next事件前调用
        }).doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                KLog.d(TTAG, "doOnNext:" + integer);
            }
            //3.执行Next事件后调用
        }).doAfterNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        }).doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                KLog.d(TTAG, "doOnCompleted:");
            }
        }).doOnError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {

                KLog.d(TTAG, "doOnError:" + throwable.getMessage());
            }
        }).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                KLog.d(TTAG, "doOnSubscribe:");
            }
        }).doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                KLog.d(TTAG, "doAfterTerminate");
            }
        }).doFinally(new Action() {
            @Override
            public void run() throws Exception {
                KLog.d(TTAG, "doFinally");
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                KLog.d(TTAG, "开始发射了");
            }

            @Override
            public void onNext(Integer integer) {
                KLog.d(TTAG, "接收到事件:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                KLog.d(TTAG, "发生错误了:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                KLog.d(TTAG, "处理完成了");
            }
        });

    }

(Main3Activity.java:116)#accept ] doOnSubscribe:
(Main3Activity.java:131)#onSubscribe ] 开始发射了
(Main3Activity.java:88)#accept ] doOnEach:OnNextNotification[1]
(Main3Activity.java:94)#accept ] doOnNext:1
(Main3Activity.java:136)#onNext ] 接收到事件:1
(Main3Activity.java:88)#accept ] doOnEach:OnNextNotification[2]
(Main3Activity.java:94)#accept ] doOnNext:2
(Main3Activity.java:136)#onNext ] 接收到事件:2
 (Main3Activity.java:88)#accept ] doOnEach:OnNextNotification[3]
(Main3Activity.java:94)#accept ] doOnNext:3
(Main3Activity.java:136)#onNext ] 接收到事件:3
(Main3Activity.java:88)#accept ] doOnEach:OnErrorNotification[java.lang.Throwable: 发送错误]
(Main3Activity.java:111)#accept ] doOnError:发送错误
(Main3Activity.java:141)#onError ] 发生错误了:发送错误
(Main3Activity.java:126)#run ] doFinally
(Main3Activity.java:121)#run ] doAfterTerminate

8. 关于错误处理:

  • onErrorReturn( )
  • onErrorResumeNext( )
  • onExceptionResumeNext( )
  • retry( )
  • retryUntil( )
  • retryWhen( )

解决方案:

  1. 发送数据:
  • 发送一个特殊事件&正常终止- onErrorReturn( )

  • 发送一个新的Observable,有如下两种:

    • onErrorResumeNext( )
    • onExceptionResumeNext( )
  1. 重试:
    -直接重试 retry()
  • 让Observable重新订阅 - retryUntil( )

  • 将错误传递给另一个Observable来决定是否要重新订阅改Observable- retryWhen( )

具体使用见下面代码:

8.1 onErrorReturn

/**
     * 关于错误的解决方案
     */
    private void onErrorReturn() {


        /**
         * 方案1
         * 发送一个特殊书剑,正常结束
         */
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            emitter.onComplete();
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onNext(4);
            emitter.onError(new Throwable("发生错误了"));
        }).onErrorReturn(throwable -> {
            KLog.d(TTAG, "在onErrorReturn处理了错误::" + throwable.getMessage());

            //发生错误时发送一个事件,正常结束
            return 666;
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                KLog.d(TTAG, "接收到事件:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                KLog.d(TTAG, "失败了");
            }

            @Override
            public void onComplete() {
                KLog.d(TTAG, "结束了");
            }
        });
    }

8.2 onErrorResumeNext

 /**
     * 方案2
     * 发送新的eObservable
     * 两种方式
     * onErrorResumeNext( )拦截的错误=Throwable;需要拦截Exception使用下面的方式
     * <p>
     * onExceptionResumeNext( )如果拦截的错误=Exception,则会发送新的Observable,不会走onerror()方法
     * 如果拦截到Throwable错误,会将错误传递给观察者的onError方法,不在发送新的Observable
     */
    private void onErrorResumeNext() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onError(new Throwable("发生错误了呢"));
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {

                KLog.d(TTAG, "onErrorResumeNext:" + throwable.getMessage());

                return Observable.just(7, 3, 6, 8);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

                KLog.d(TTAG, "接收到事件:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                KLog.d(TTAG, "失败了");
            }

            @Override
            public void onComplete() {
                KLog.d(TTAG, "结束了");
            }
        });
    }

8.3 onExceptionResumeNext

 private void onExceptionResumeNext() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onError(new Exception("发生错误了呢"));
            }
        }).onExceptionResumeNext(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer<? super Integer> observer) {

                observer.onNext(11);
                observer.onNext(22);
                observer.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

                KLog.d(TTAG, "接收到事件:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                KLog.d(TTAG, "失败了");
            }

            @Override
            public void onComplete() {
                KLog.d(TTAG, "结束了");
            }
        });

    }

8.4 retry

 /**
     * 当出现错误时,让被观察者(Observable)重新发射数据
     * Throwable 和 Exception都可拦截
     * <p>
     * 1. retry()
     * 作用:出现错误时,让被观察者重新发送数据
     * 注:若一直错误,则一直重新发送
     * <p>
     * 2. retry(long time)
     * 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
     * 参数 = 重试次数
     * <p>
     * 3. retry(Predicate predicate)
     * 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
     * 参数 = 判断逻辑
     * <p>
     * 4. retry(new BiPredicate<Integer, Throwable>)
     * 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
     * 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)
     * <p>
     * 5. retry(long time,Predicate predicate)
     * 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
     * 参数 = 设置重试次数 & 判断逻辑
     */
    private void retry() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Throwable("发送错误了"));
                emitter.onNext(5);
                emitter.onNext(6);
            }
            //遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送
        }).retry()
                //遇到错误时,重试3次
                .retry(3)
                //拦截错误后,判断是否需要重新发送请求
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(Throwable throwable) throws Exception {
                        KLog.d(TTAG, "错误是:" + throwable.getMessage());

                        //返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        //返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                        return throwable.getMessage().equals("我是判定错误");
                    }
                    //出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
                    // 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)
                }).retry(new BiPredicate<Integer, Throwable>() {
            @Override
            public boolean test(Integer integer, Throwable throwable) throws Exception {
                KLog.d(TTAG, "错误是:" + throwable.getMessage());
                KLog.d(TTAG, "重试次数是:" + integer);

                return true;
            }
            // 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
            // 参数 = 设置重试次数 & 判断逻辑
        }).retry(3, new Predicate<Throwable>() {
            @Override
            public boolean test(Throwable throwable) throws Exception {
                KLog.d(TTAG, "错误是:" + throwable.getMessage());

                //返回false = 不重新重新发送数据 & 调用观察者的onError()结束
                //返回true = 重新发送请求(最多重新发送3次)
                return true;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });


    }

8.5 retryUntil

 /**
     * 出现错误后,判断是否需要重新发送数据
     * <p>
     * 若需要重新发送 & 持续遇到错误,则持续重试
     * 作用类似于retry(Predicate predicate)
     * 返回false就一直重试
     * 返回true结束
     */
    private void retryUntil() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                emitter.onNext(5);
                emitter.onNext(8);
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {

                //返回false就一直重试
                //返回true结束
                return true;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

                KLog.d(TTAG, "onNext:" + integer);
            }

            @Override
            public void onError(Throwable e) {

                KLog.d(TTAG, "错误是:" + e.getMessage());

            }

            @Override
            public void onComplete() {

            }
        });

    }

8.6 retryWhen

  /**
     * 出现错误后,判断是否需要重新发送数据
     * <p>
     * 若需要重新发送 & 持续遇到错误,则持续重试
     * 作用类似于retry(Predicate predicate)
     * 返回false就一直重试
     * 返回true结束
     */
    private void retryUntil() {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                emitter.onNext(5);
                emitter.onNext(8);
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {

                //返回false就一直重试
                //返回true结束
                return true;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

                KLog.d(TTAG, "onNext:" + integer);
            }

            @Override
            public void onError(Throwable e) {

                KLog.d(TTAG, "错误是:" + e.getMessage());

            }

            @Override
            public void onComplete() {

            }
        });

    }

9. 重复发送操作:

9.1 repeat()

作用:无条件地、重复发送 被观察者事件,具备重载方法,可设置重复创建次数

/**
     * 无条件地、重复发送 被观察者事件
     */
    private void repeat() {

        Observable.just(1, 2, 3, 4)
                //设置重复发送次数3次
                .repeat(3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        KLog.d(TTAG, "开始了链接");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        KLog.d(TTAG, "接收事件是:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        KLog.d(TTAG, "收到错误了");
                    }

                    @Override
                    public void onComplete() {
                        KLog.d(TTAG, "完成");
                    }
                });

    }

9.2 repeartWhen()

作用:有条件地、重复发送 被观察者事件,将原始 Observable 停止发送事件的标识(Complete() / Error()) 转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable

**
     * 有条件地、重复发送 被观察者事件
     * 将原始 Observable 停止发送事件的标识(Complete() /  Error())
     * 转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable
     * <p>
     * 返回结果分为两种情况:
     * 1.若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
     * 2.若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable
     */
    private void repeatWhen() {

        Observable.just(1,2,3)
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {

                        // 在Function函数中,必须对输入的 Observable<Object>进行处理,这里使用的是flatMap操作符接收上游的数据

                        return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Object o) throws Exception {

                                //情况1:若新被观察者(Observable)返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable

                                // Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()

                                // return Observable.error(new Throwable("不再重新订阅事件"));
                                // 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。

                                // 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
                                 return Observable.just(1);
                                // 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() /  Error()事件
                            }
                        });
                    }
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                KLog.d(TTAG, "开始连接");
            }
            @Override
            public void onNext(Integer integer) {
                KLog.d(TTAG, "收到事件:" + integer);
            }
            @Override
            public void onError(Throwable e) {
                KLog.d(TTAG, "收到错误是:"+e.getMessage());
            }
            @Override
            public void onComplete() {
                KLog.d(TTAG, "完成");
            }
        });

    }

10. 线程调控

注意:Observable.subscribeOn()可以多次指定,但是只有第一次有效,Observable.observeOn()每次都会生效

类型 含义 应用场景
Schedulers.immediate() 当前线程 == 不指定线程 默认线程
AndroidSchedulers.mainThread() Android主线程 操作UI
Schedulers.newThread() 常规新线程 进行耗时操作
Schedulers.io() IO操作线程 网络请求,读写文件等IO密集型操作
Schedulers.computation() CPU计算操作线程 大量计算操作

11. 取消订阅

使用Disposable.dispose()

多个Disposable时,可采用RxJava内置容器CompositeDisposable进行统一管理


CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add();
compositeDisposable.clear();

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容