Android RxJava 2.x入门例子详解(四)

线程调度器

上游默认在主线程发送事件,下游默认也是主线程中接收事件,
上下游默认是在同一个线程工作

//create创建一个上游 Observable(被观察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());

                Log.d(TAG, "Observable发出:1");
                e.onNext(1);//向下游(观察者)发射内容1
                Log.d(TAG, "Observable发出:2");
                e.onNext(2);
                Log.d(TAG, "Observable发出:3");
                e.onNext(3);
            }
        });

        Consumer<Integer> consumer = new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "onNext收到:" + integer);
            }
        };

        observable.subscribe(consumer);

而我们更多时候想要的是,在子线程中做耗时的操作, 然后回到主线程操作UI。通过RxJava内置的线程调度器,我们可以很轻松的做到这一点,如下面的例子:

//create创建一个上游 Observable(被观察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());

                Log.d(TAG, "Observable发出:1");
                e.onNext(1);//向下游(观察者)发射内容1
                Log.d(TAG, "Observable发出:2");
                e.onNext(2);
                Log.d(TAG, "Observable发出:3");
                e.onNext(3);
            }
        });

        Consumer<Integer> consumer = new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "onNext收到:" + integer);
            }
        };

        //subscribeOn() 指定上游发送事件的线程, observeOn() 指定下游接收事件的线程.
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);

subscribeOn只能调用一次,如果调用多次,只有第一次有效。
而observeOn可以多次调用,每次调用下游都可以切换一次线程。
如下面的例子:

//create创建一个上游 Observable(被观察者)
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());

                Log.d(TAG, "Observable发出:1");
                e.onNext(1);//向下游(观察者)发射内容1
                Log.d(TAG, "Observable发出:2");
                e.onNext(2);
                Log.d(TAG, "Observable发出:3");
                e.onNext(3);
            }
        });

        Consumer<Integer> consumer = new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "onNext收到:" + integer);
            }
        };

        //subscribeOn() 指定上游发送事件的线程, observeOn() 指定下游接收事件的线程.
        observable.subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())//多次调用subscribeOn()只有第一次的有效
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "observeOn(mainThread) current thread is: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())//每调用一次observeOn() , 下游的线程就会切换一次
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "observeOn(io) current thread is : " + Thread.currentThread().getName());
                    }
                })
                .subscribe(consumer);

RxJava内置了很多线程选项供我们选择
1、Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
2、Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
3、Schedulers.newThread() 代表一个常规的新线程
4、AndroidSchedulers.mainThread() 代表Android的主线程

这些内置的Scheduler已经足够满足我们开发的需求,因此我们应该使用内置的这些选项,在RxJava内部使用的是线程池,效率也比较高。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 6,094评论 1 9
  • 怎么如此平静, 感觉像是走错了片场.为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emit...
    Young1657阅读 5,445评论 2 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,314评论 19 139
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 12,991评论 6 151
  • 碧落情岛花花飞, 满天蝴蝶翼翼追。 清丽俏皮醉佳人, 恬静可人动我心。
    付尔摩豪阅读 1,449评论 0 0