RxJava常用操作

一、 拆分使用

先创建被观察者和观察者,然后建立订阅关系,这样在观察者中就会接收到个生命周期的回调:

    @Test
    public void test(){
        //1. 创建被观察者
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 发送消息
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });

        //2. 创建观察者
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("建立订阅关系");
            }

            @Override
            public void onNext(Integer integer) {
                //接受到消息
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        };

        //3. 建立订阅关系
        observable.subscribe(observer);
    }

运行结果:

建立订阅关系
1
2
完成

二、 链式调用(一般都是这种写法):

    @Test
    public void test2() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("建立订阅关系");
            }

            @Override
            public void onNext(Integer integer) {
                //接受到消息
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });
    }

三、更简单的观察者

    @Test
    public void test3() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
    }

Consumer相对Observer简化了很多,没有了onSubscribe() onError () onComplete (),当然也无法对这些进行监听了。

四、创建操作符

上面用的creat是创建被观察者的一种操作符,另外常用的还有justjustArratrangeempty,直接看运行结果去理解就好了。
empty这里说下,这个使用场景比如一个耗时操作不要任何数据反馈去更新UI,只是显示和隐藏加载动画。(先不用去纠结耗时操作在哪里添加)

    @Test
    public void test4() {
        System.out.println("-----------------just");
        Observable.just(1, 2, 3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        System.out.println("-----------------fromArray");
        Observable.fromArray(new Integer[]{1,2,3}).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        System.out.println("-----------------range");
        Observable.range(0, 3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        System.out.println("-----------------empty");
        Observable.empty().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("建立订阅关系");
            }

            @Override
            public void onNext(Object object) {
                //接受到消息
                System.out.println(object);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });

运行结果:

-----------------just
1
2
3
-----------------fromArray
1
2
3
-----------------range
0
1
2
-----------------empty
建立订阅关系
完成

五、合并操作符

合并操作是指合并被观察者,用同一个观察者去接受,常用的有concatWithstartWithconcatmergezip,这里为了显示出合并的区别,用了另一个创建创建操作符intervalRange,比如Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS),这个代表从0开始发送10个数,延迟0秒后开始执行,每1秒发送一次。
用这两个被观察者测试上面几个合并操作符:

    //发送0-4
    Observable observable1 = Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS);
    //发送10-14
    Observable observable2 = Observable.intervalRange(10, 5, 0, 1, TimeUnit.SECONDS);

测试函数:

    private void concatWith() {
        Log.e(TAG, "-----------------concatWith");
        observable1.concatWith(observable2).subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "concatWith: " + aLong);
                    }
                });
    }

    private void startWith() {
        Log.e(TAG, "-----------------startWith");
        observable1.startWith(observable2).subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "startWith: " + aLong);
                    }
                });
    }

    public void concat() {
        Log.e(TAG, "-----------------concat");
        Observable.concat(observable1,observable2).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "concat: " + aLong);
            }
        });
    }

    public void merge() {
        Log.e(TAG, "-----------------merge");
        Observable.merge(observable1,observable2).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "merge: " + aLong);
            }
        });
    }

运行结果:

E/MainActivity: -----------------concatWith
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------startWith
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: -----------------concat
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------merge
E/MainActivity: -----------------merge
E/MainActivity: accept: 10
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 11
E/MainActivity: accept: 2
E/MainActivity: accept: 12
E/MainActivity: accept: 3
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 4

根据上面结果总结:

  1. concatWithstartWith是执行的先后顺序不一样,是同步执行的
  2. concatWithconcat都是顺序执行,只是写法不一样
  3. concatmerge写法一样,但是merge是异步的,两个被观察者没有先后顺序,各自执行。

还有一种zip操作符,把被观察者合并时一一对应,直接看使用方式:

    private void zip() {
        Observable observable1 = Observable.just("语文", "数学", "英语");
        Observable observable2 = Observable.just("100", "80", "60");
        Observable.zip(observable1, observable2, new BiFunction() {
            @Override
            public Object apply(Object o, Object o2) throws Exception {
                return o.toString() + ":" + o2.toString();
            }
        })
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Log.e(TAG, "accept: " + o);
                    }
                });
    }

运行结果:

E/MainActivity: accept: 语文:100
E/MainActivity: accept: 数学:80
E/MainActivity: accept: 英语:60

六、变换操作符

常见的有mapconcatMapflatMapgroupBybuffer
先通过最简单的map来看看变换操作符是干什么的

    private void map() {
        Log.e(TAG, "-----------------map");
        Observable.just(1)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "转化为String" + integer;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String string) throws Exception {
                        Log.e(TAG, "merge: " + string);
                    }
                });
    }

运行结果:

E/MainActivity: -----------------map
E/MainActivity: accept: 转化为String1

也就是说map里可以把被观察者传递过来的数据转换成另一种数据格式传递给观察者,这里是Integer转String,比如你也可以被观察者传递过来一个URL,在Function直接网络请求,转化成请求结果给观察者。
扯多了,继续看上面的操作符flatMap

    private void flatMap() {
        Observable.just(1)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(final Integer integer) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> e) throws Exception {
                                e.onNext("转化为String" + integer);
                                e.onNext("我还可以再发送" + integer);
                                e.onNext("我还可以随便发送" + integer);
                            }
                        });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String string) throws Exception {
                        Log.e(TAG,  "accept: " + string);
                    }
                });
    }

运行结果:

E/MainActivity: accept: 转化为String1
E/MainActivity: accept: 我还可以再发送1
E/MainActivity: accept: 我还可以随便发送1

这个相对map更灵活,map是的Function里直接返回的是转换之后的数据,一对一的,而flatMap的Function返回的是另一个被观察者,所以这个可以在里面随意发送给观察者。
在用concatMap之前先看flatMap的另一种操作:

    private void flatMap2() {
        Observable.just(1, 2, 3)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        List<String> list = new ArrayList<>();
                        for (int i = 0; i < 3; i++) {
                            list.add(integer + "." + (1 + i));
                        }
                        return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String string) throws Exception {
                        Log.e(TAG, "accept: " + string);
                    }
                });
    }

在收到被观察者发来的数据后,生产一个List再延迟1s发送给观察者,看下运行结果:

E/MainActivity: accept: 2.1
E/MainActivity: accept: 2.2
E/MainActivity: accept: 2.3
E/MainActivity: accept: 1.1
E/MainActivity: accept: 3.1
E/MainActivity: accept: 3.2
E/MainActivity: accept: 3.3
E/MainActivity: accept: 1.2
E/MainActivity: accept: 1.3

每个都是先.1再.2再.3没错,但是整体并没有按照1、2、3顺序执行,说明他们是异步执行的,类似合并操作符中的merge(其实内部调用的就是merge)。看完这个问题,就可以猜到concatMap的作用了,就不贴了,是完全按顺序同步输出的。
然后来groupBy操作符:

    private void group() {
        Observable.just(20, 40, 60, 80, 100)
                .groupBy(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return integer >= 60 ? "及格" : "不及格";
                    }
                })
                .subscribe(new Consumer<GroupedObservable<String, Integer>>() {
                    @Override
                    public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
                        stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.e(TAG, "accept: " + integer + ":" + stringIntegerGroupedObservable.getKey());
                            }
                        });
                    }
                });
    }

输出结果:

E/MainActivity: accept: 20:不及格
E/MainActivity: accept: 40:不及格
E/MainActivity: accept: 60:及格
E/MainActivity: accept: 80:及格
E/MainActivity: accept: 100:及格

buffer操作符:

    private void buffer() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 100; i++) {
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })
                .buffer(20)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
    }

运行结果:

E/MainActivity: accept: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
E/MainActivity: accept: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
E/MainActivity: accept: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
E/MainActivity: accept: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
E/MainActivity: accept: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

这两种没什么特殊的,groupBy是按条件分组, buffer是分批发送。

七、过滤操作符

filtertakedistinctelementAl

    //条件筛选,输出B、C
    public void filter() {
        Observable.just("A", "B", "C")
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        if ("A".equals(s)) {
                            return false;
                        }

                        return true;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: " + s);
                    }
                });
    }



    //用于停止定时器,输出0、1、2、3、4
    public void take() {
        Observable.interval(1, TimeUnit.SECONDS)
                .take(5)// 5次之后停下
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "accept: " + aLong);
                    }
                });

    }

    //过滤重复,输出1、2、3
    public void distinct() {
        Observable.just(1,1,2,3,3)
                .distinct()
                .subscribe(new Consumer<Integer>() { // 下游 观察者
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
    }

    //制定发送角标,输出B
    public void elementAt() {
        Observable.just("A", "B", "C")
                .elementAt(1, "X")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });


    }

八、条件操作符

anyallcontains,这些就是改变Java中if的书写方式,与、或、包含
all:全部为true,才是true,只要有一个为false,就是false
any:全部为 false,才是false, 只要有一个为true,就是true
contains :是否包含

    //等于Java中if的连续判断,有一个等于C就返回false,输出false
    public void all() {
        Observable.just("A", "B", "C", "D")
                .all(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return !s.equals("C");
                    }
                })
                .subscribe(new Consumer<Boolean>() { // 下游 观察者
                    @Override
                    public void accept(Boolean s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

    //判断包含
    public void contains() {
        Observable.just("A", "B", "C", "D")
                .contains("C")
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

    //和上面的All相反,有一个等于C就返回true,输出true
    public void any() {
        Observable.just("A", "B", "C", "D")
                .any(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return !s.equals("C");
                    }
                })
                .subscribe(new Consumer<Boolean>() { // 下游 观察者
                    @Override
                    public void accept(Boolean s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

九、异常处理操作符

onErrorReturnonErrorResumeNextonExceptionResumeNextretry
先模拟个错误:

    public void onError() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                               e.onError(new Throwable("模拟一个错误"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

运行结果:

D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模拟一个错误

上面代码会在观察者的onError中收到回调,然后来看一下异常操作符能干什么,先看onErrorReturnonErrorResumeNext,区别就是onErrorReturn发送一次,onErrorResumeNext可以任意发,跟上面很多其他的操作符一样:

    private void onErrorReturn() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new Throwable("模拟一个错误"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        return 400;
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

运行结果:

D/MainActivity: onNext: 400
D/MainActivity: onComplete: 

onErrorResumeNext:

    public void onErrorResumeNext() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new Throwable("模拟一个错误"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })

                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                    @Override
                    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {

                        return Observable.create(new ObservableOnSubscribe<Integer>() {
                            @Override
                            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                                e.onNext(400);
                                e.onNext(4000);
                                e.onNext(40000);
                                e.onComplete();
                            }
                        });
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

    }

运行结果:

D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onNext: 400
D/MainActivity: onNext: 4000
D/MainActivity: onNext: 40000
D/MainActivity: onComplete: 

这里两个注意点:

  • onErrorReturn发生error后会自动调用onComplete(),而onErrorResumeNext需要根据需要手动调用
  • 都不会再触发观察者的onError()回调,除非onErrorResumeNext中再手动调用e.onError()

然后看下onExceptionResumeNext代码:

    public void onExceptionResumeNext() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new Throwable("模拟一个错误"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })

                .onExceptionResumeNext(new ObservableSource<Integer>() {
                    @Override
                    public void subscribe(Observer<? super Integer> observer) {
                        observer.onNext(400);
                        observer.onNext(4000);
                        observer.onNext(40000);
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

运行结果:

D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模拟一个错误

onErrorResumeNext的运行结果对比,很明显没有400、4000、40000,说明新的Observer并不会起作用,这里用的是Throwable,如果是用Exception,同样也会有400、4000、40000,所以:onErrorResumeNextonExceptionResumeNext对Exception的处理是一样的流程,区别在于对Error处理的时候,是否会使用新的Observer发送消息,也就是onExceptionResumeNext不处理Error,直接回调观察者的onError ()onErrorResumeNext都处理,不会再调用观察者的onError ()
然后是retry这个操作符,这个很简单,贴出三种常用的:

    public void retry() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new IllegalAccessError("模拟错误"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })
                
                //不设置重试次数
                .retry( new Predicate<Throwable>() {
                    @Override
                    public boolean test(Throwable throwable) throws Exception {
                        //true表示不停地重试 ,  false表示不重试
                        return true;
                    }
                })

//                //设置重试次数
//                .retry(3, new Predicate<Throwable>() {
//                    @Override
//                    public boolean test(Throwable throwable) throws Exception {
//                        //true表示按设置的次数重试 ,  false表示不重试
//                        return true;
//                    }
//                })
//
//                //可获取重试次数
//                .retry(new BiPredicate<Integer, Throwable>() {
//                    @Override
//                    public boolean test(Integer integer, Throwable throwable) throws Exception {
//                        //相对上面两种,这个integer表示重试次数, 返回值跟上面一样
//                        return true;
//                    }
//                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

十、线程切换

默认发送和接收都是在主线程:

    private void schedulers() {
        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.e(TAG, "发送: " + Thread.currentThread().getName());
                        e.onNext("123");
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "接收: " + Thread.currentThread().getName());
                    }
                });
    }

输出:

E/MainActivity: 发送: main
E/MainActivity: 接收: main

可以通过subscribeOn()会同时修改观察者和被观察者的线程,通过observeOn()只设置观察者线程,通过AndroidSchedulers.mainThread()得到主线程,通过Schedulers.io()得到子线程:

    private void schedulers() {
        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.e(TAG, "发送: " + Thread.currentThread().getName());
                        e.onNext("123");
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "接收: " + Thread.currentThread().getName());
                    }
                });
    }

这一段,先通过subscribeOn(Schedulers.io())把观察者和被观察者都设置到子线程,如果不写下面这句observeOn(AndroidSchedulers.mainThread()),会输出:

E/MainActivity: 发送: RxCachedThreadScheduler-1
E/MainActivity: 接收:  RxCachedThreadScheduler-1

但是下面又用observeOn(AndroidSchedulers.mainThread())把观察者改回子线程,所以输出:

E/MainActivity: 发送: RxCachedThreadScheduler-1
E/MainActivity: 接收: main

十一、背压模式

当上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,就会产生背压问题,内存使用越来越多,这时候就需要用Flowable去处理。Flowable会对上游发送的时间进行缓存,缓存池也满了(超出128)的时候会有4种不通的处理方式:

  • BackpressureStrategy.ERROR:就会抛出异常
  • BackpressureStrategy.DROP:把后面发射的事件丢弃
  • BackpressureStrategy.LATEST:把前面发射的事件丢弃
  • BackpressureStrategy.BUFFER:这种不会有上限,但是如果上游发送太多,也会造成内存使用越来越大

Flowable的使用跟Observable很类似,简单使用:

    private void backpressure() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        // 改成129就会崩溃
                        for (int i = 0; i < 128; i++) {
                            e.onNext(i); // todo 1
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "接收: " + integer);
                    }
                });
    }

然后看一种完整模式的观察者:

    private void backpressure() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        // 改成129就会崩溃
                        for (int i = 0; i < 128; i++) {
                            e.onNext(i); // todo 1
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e(TAG, "接收: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

这一会发现观察者收不到任何消息,这里跟Observable有个区别,就是订阅的方法subscribe()的参数,Observable订阅对应的是Observer,而Flowable对应的是Subscriber,Observer和Subscriber对应的回调onSubscribe(..)参数不同,Subscriber的onSubscribe(..)参数拿到的是一个Subscription,这个需要主动去取数据,比如:

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(10);
                    }

这样就会onNext()中就会收到前10个。那这个使用就很灵活了,根据代码需要,可以在需要的地方主动调用s.request(..),让观察者接收到数据。

十二、一个展示网络图片的例子

    private void getImage(final String path) {
        Observable.just(path)
                // 通过map变换操作符把String转换成Bitmap
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(String s) throws Exception {
                        URL url = new URL(path);
                        HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                        httpURLConnection.setConnectTimeout(5000);
                        int responseCode = httpURLConnection.getResponseCode();
                        if (HttpURLConnection.HTTP_OK == responseCode) {
                            Bitmap bitmap = BitmapFactory.decodeStream(httpURLConnection.getInputStream());
                            return bitmap;
                        }

                        return null;
                    }
                })
                // 下载图片在子线程中
                .subscribeOn(Schedulers.io())
                // 设置图片在主线程中
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Bitmap>() {

                    @Override
                    // 开始操作前
                    public void onSubscribe(Disposable d) {
                        progressDialog = new ProgressDialog(MainActivity.this);
                        progressDialog.setMessage("正在下载中...");
                        progressDialog.show();
                    }

                    @Override
                    // 收到Bitmap
                    public void onNext(Bitmap bitmap) {
                        if (imageView != null) {
                            imageView.setImageBitmap(bitmap);
                        }
                    }

                    @Override
                    // 下载错误
                    public void onError(Throwable e) {
                        if (progressDialog != null) {
                            progressDialog.dismiss();
                        }
                        if (imageView != null) {
                            imageView.setImageResource(R.mipmap.ic_launcher);
                        }
                        Log.e(TAG, "onError: " + e.toString());
                    }

                    @Override
                    // 下载完成
                    public void onComplete() {
                        if (progressDialog != null) {
                            progressDialog.dismiss();
                        }
                    }
                });
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容