【Android】RxJava的使用(二)线程切换

理论
总所周知 RxJava 在切换线程时用到了两个方法 subscribeOn() 和 observeOn() 下面来分别解释一下这两个方法

  • subscribeOn() : 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
  • observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;

为什么 subscribeOn() 只有第一次切换有效
因为 RxJava 最终能影响 ObservableOnSubscribe 这个匿名实现接口的运行环境的只能是最后一次运行的 subscribeOn() ,又因为 RxJava 订阅的时候是从下往上订阅,所以从上往下第一个 subscribeOn() 就是最后运行的,这就造成了写多个 subscribeOn() 并没有什么乱用的现象。

线程控制 —— Scheduler (一)
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

Scheduler 的 API (一)
正常情况下, 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

在RxJava中, 已经内置了很多线程选项供我们选择, 例如有

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和
    newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

  • Schedulers.computation(): 代表CPU计算密集型的操作, 例如需要大量计算的操作 ,计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

回到RxJava中, 当我们在主线程中去创建一个上游Observable来发送事件, 则这个上游默认就在主线程发送事件.
当我们在主线程去创建一个下游Observer来接收事件, 则这个下游默认就在主线程中接收事件。
但是实际情况是 ,我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI,

要达到这个目的, 我们需要先改变上游发送事件的线程, 让它去子线程中发送事件, 然后再改变下游的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码

//创建一个下游 观察者Observer

Observer<Integer> observer = new Observer<Integer>() {
       @Override
       public void onSubscribe(Disposable d) {

       }

       @Override
       public void onNext(Integer value) {
           Log.w(TAG, "" + value);
           Log.d(TAG, "observer thread is : " + Thread.currentThread().getName());

       }

       @Override
       public void onError(Throwable e) {
       }

       @Override
       public void onComplete() {
       }
   };


   //创建一个上游  被观察者 Observable:
   Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
       @Override
       public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
           Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
           emitter.onNext(1);
       }
   });

//建立连接
observable.subscribeOn(Schedulers.newThread())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(observer);
打印结果
08-29 18:53:38.313 5128-5155/tongxunlu.com.myapplication D/MainActivity: Observable thread is : RxNewThreadScheduler-1
08-29 18:53:38.314 5128-5128/tongxunlu.com.myapplication W/MainActivity: 1
08-29 18:53:38.314 5128-5128/tongxunlu.com.myapplication D/MainActivity: observer thread is : main

可以看到, 上游发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-2的线程中发送的事件, 而下游仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.

和上一段代码相比,这段代码只不过是增加了两行代码:

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())

简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.

需要注意的是

  • 多次指定上游(Observable)的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
  • 多次指定下游(Observer)的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
    举个栗子:
//建立连接
               observable.subscribeOn(Schedulers.newThread())
                       .subscribeOn(Schedulers.io())
                       .observeOn(AndroidSchedulers.mainThread())
                       .observeOn(Schedulers.io())
                       .subscribe(observer);

** doOnNext官方介绍:**
The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

可以这么理解:

  • do系列的作用是side effect,当onNext发生时,它被调用,不改变数据流。
  • doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。

实践
对于我们Android开发人员来说, 经常会将一些耗时的操作放在后台, 比如网络请求或者读写文件,操作数据库等等,等到操作完成之后回到主线程去更新UI, 有了上面的这些基础, 那么现在我们就可以轻松的去做到这样一些操作.

读写数据库

上面说了网络请求的例子, 接下来再看看读写数据库, 读写数据库也算一个耗时的操作, 因此我们也最好放在IO线程里去进行, 这个例子就比较简单, 直接上代码:

public Observable<List<Record>> readAllRecords() {
      return Observable.create(new ObservableOnSubscribe<List<Record>>() {
          @Override
          public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
              Cursor cursor = null;
              try {
                  cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                  List<Record> result = new ArrayList<>();
                  while (cursor.moveToNext()) {
                      result.add(Db.Record.read(cursor));
                  }
                  emitter.onNext(result);
                  emitter.onComplete();
              } finally {
                  if (cursor != null) {
                      cursor.close();
                  }
              }
          }
      }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354