RxJava2.0从入门到放弃(二)

前言

RxJava2.0从入门到放弃(一)中简单介绍了我对RxJava的理解以及RxJava最基本的一个写法。这一部分继续讲讲RxJava最重要的一个环节关于线程得调度。(本文案例用kotlin来做案例。)

至于为什么说是最重要的?RxJava在github是这么定义自己的RxJava is a Java VM implementation of [Reactive Extensions](http://reactivex.io/): a library for composing asynchronous and event-based programs by using observable sequences. 也就是说RxJava是一个专注于用来解决异步以及事件驱动的库。

讲解之前我们先抛出一个问题吧:

先从IO线程独读一个文件夹,
再把文件夹里面的png图片筛选出来,
然后在主线程中把这些图片加载在UI上。

面对这样一个需求该怎么处理。用Thread应该差不多这样实现

final File[] files = file.listFiles();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for(File f:files) {
                    if(f.getName().endsWith(".png")){
                       final Bitmap bitmap = transFileToBitmap(f);
                       runOnUiThread(new Runnable() {
                           @Override
                           public void run() {
                               updateUI(bitmap);
                           }
                       });
                    }
                }
            }
        }).start();

后面再来用RxJava来实现一下这个需求。

正文

RxJava整体的线程调度涉及到三个关键点分别是subscribeOn observeOn Scheduler

RxJava在不指定线程的情况下,RxJava保持者线程不变的原则。也就是说『上游』在哪个线程上创建事件,『下游』就是在哪个线程上处理事件,『上游』和『下游』线程保持一致。

用代码来验证下:

 Observable.create<Int> { e ->
            for (i in 0..5) {
                Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
                Log.e(TAG, "observable  $i")
                e.onNext(i)
            }
        }
                .subscribe { int ->
                    Log.e(TAG, "onNext  $int")
                    Log.e(TAG, "subscribe thread ${Thread.currentThread().name}")
                }

输出结果是这样:

08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  0
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  0
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  1
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  1
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  2
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  2
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  3
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  3
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  4
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  4
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  5
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  5
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main

可以看到所有的运行都是在main线程运行的,可以验证:

RxJava在不指定线程的情况下,『上游』和『下游』线程保持一致。

如果指定线程的话该怎么做?
在RxJava中可以分别通过 subscribeOn()observerOn()这两个方法来指定『上游』事件产生的线程以及『下游』事件响应的线程。
具体怎么做我们来看代码:

Observable.create<Int> { e ->
            for (i in 0..2) {
                Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
                Log.e(TAG, "observable  $i")
                e.onNext(i)
            }
        }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { int ->
                    Log.e(TAG, "onNext  $int")
                    Log.e(TAG, "subscribe thread ${Thread.currentThread().name}")
                }

输出结果是:

08-24 11:02:28.614 19473 21708 E RxTag   : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.614 19473 21708 E RxTag   : observable  0
08-24 11:02:28.614 19473 21708 E RxTag   : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.615 19473 21708 E RxTag   : observable  1
08-24 11:02:28.615 19473 21708 E RxTag   : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.615 19473 21708 E RxTag   : observable  2
08-24 11:02:28.982 19473 19473 E RxTag   : onNext  0
08-24 11:02:28.982 19473 19473 E RxTag   : subscribe thread main
08-24 11:02:28.982 19473 19473 E RxTag   : onNext  1
08-24 11:02:28.983 19473 19473 E RxTag   : subscribe thread main
08-24 11:02:28.983 19473 19473 E RxTag   : onNext  2
08-24 11:02:28.983 19473 19473 E RxTag   : subscribe thread main

『上游』事件产生在RxCachedThreadScheduler-1这个线程,『下游』事件响应的onNext()在main线程。

那么我们关注下Scheduler的几个线程名称:

  • Schedulers.trampoline() : 相当于不指定线程。直接在之前的线程运行,依赖于调用操作的线程。
  • Schedulers.io():(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率;
  • Schedulers.newThread(): 总是启用新线程,并在新线程中执行操作;
  • Schedulers.single(): 启用一个线程池大小为1的线程池,相当于(newScheduledThreadPool(1)),重复利用这个线程;
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

在RxAndroid中就会有这么一个线程:

  • AndroidSchedulers.mainThread():运行在Android主线程中。main UI线程。

那么在Android中最简单的异步或者请求网络就这么写了:

Observable.create<String> { e ->
            //请求网络,返回一个String
            val str:String = api.getString()
            e.onNext(str)
        }.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({
                    str ->   
                  //获得String,更新到UI
                    updateUI(str)
                },{
                    error->
                    //连接错误,提示错误信息
                    netWorkError(error.message)
                })

回顾文章最开始提出的问题我们就可以这么的用 RxJava实现出来:

Observable.fromArray(f.listFiles())
                .filter(new Predicate<File>() {
                    @Override
                    public boolean test(File file) throws Exception {
                        return file.getName().endsWith(".png");
                    }
                })
                .map(new Function<File, Bitmap>() {
                    @Override
                    public Bitmap apply(File file) throws Exception {
                        return getBitMapFromFile(file);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        updateUI(bitmap);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        showError(throwable.getMessage());
                    }
                });
    }

但是有两点需要注意:

  • subscribeOn 讲『上游』事件的发射切换到 Scheduler 所定义的线程, 如果多次调用 subscribeOn(),那么只有第一个 subscribeOn 操作有效 ;
  • observeOn 指定 observeOn 后续操作所在线程。也就是说 可以多次调用observeOn 可以多次切换接下来『下游』事件处理的线程 ;

举个栗子吧:

    Observable.create<Int> { emitter ->
            for (i in 0..2) {
                Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
                Log.e(TAG, "observable  $i")
                emitter.onNext(i)
            }
        }
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.computation())
                .doOnNext(consumer())
                .observeOn(Schedulers.io())
                .doOnNext(consumer())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer())
    }

     private fun consumer(): Consumer<Int> {
        return Consumer { i ->
            Log.e(TAG, "onNext thread ${Thread.currentThread().name}")
            Log.e(TAG, "onNext $i")
        }
    }

看一看输出结果就证明之前的注意点:

08-24 14:54:41.670 26674 27052 E RxTag   : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag   : observable  0
08-24 14:54:41.670 26674 27052 E RxTag   : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag   : observable  1
08-24 14:54:41.670 26674 27052 E RxTag   : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag   : observable  2
08-24 14:54:41.672 26674 26880 E RxTag   : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag   : onNext 0
08-24 14:54:41.688 26674 26880 E RxTag   : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag   : onNext 1
08-24 14:54:41.688 26674 26880 E RxTag   : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag   : onNext 2
08-24 14:54:41.691 26674 27054 E RxTag   : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.691 26674 27054 E RxTag   : onNext 0
08-24 14:54:41.697 26674 27054 E RxTag   : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.697 26674 27054 E RxTag   : onNext 1
08-24 14:54:41.698 26674 27054 E RxTag   : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.698 26674 27054 E RxTag   : onNext 2
08-24 14:54:42.058 26674 26674 E RxTag   : onNext thread main
08-24 14:54:42.058 26674 26674 E RxTag   : onNext 0
08-24 14:54:42.058 26674 26674 E RxTag   : onNext thread main
08-24 14:54:42.059 26674 26674 E RxTag   : onNext 1
08-24 14:54:42.059 26674 26674 E RxTag   : onNext thread main
08-24 14:54:42.059 26674 26674 E RxTag   : onNext 2

我们调用了两次 subscribeOn()分别是 io()和newThread(),但是输出结果就只有在RxCachedThreadScheduler -6线程中。但是每次调用doOnNext()都切换了一个线程,也就是说可以随时随地切换事件的处理线程。

总结

线程的调度就到这结束了,把握好subscribeOn observableOn 以及scheduler 的切换,就能随心所欲的进行切换线程切换啦。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容