【Android】RxJava2可以这么学!

一、简介

关于RxJava的介绍与使用,在网上已经有很多的相关的教程,无奈自身水平有限,绝大多数讲解的原理噼里啪啦一堆,说的一堆高大上的名词把自己绕的云里雾里,领悟不透;刚好最近公司要求做分享,自己就做了一份关于RxJava的介绍。
这里我将用河水的上游和下游代替被观察者和观察者, 用通俗易懂的话把它们的关系解释清楚, 在这里我将从事件流向这个角度来说明RxJava的基本工作原理。


二、简单原理分析

首先,我们来看一张图:

简单原理图

上图中河流的上游产生了三个事件,而下游就会接收到三个事件,在这里上游按照顺序发送了1,2,3三个事件,而下游在收到的事件顺序也是先1,后2,再3的顺序;简单来说,这里的上游下游就分别对应着RxJava中的ObservableObserver,它们之间的连接就对应着subscribe()。下面用简单的代码来说明这种关系。


三、简单使用

3.1 上游Observable的创建
//创建一个上游 即Observable:
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter)
          throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
        //e.onError(new IOException());
      }
    });

在创建上游Observable的时候有涉及到的三个方法说明:
** 1. onNext(T t):**
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
2. onComplete():
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
3. onError(Exception ex):
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
注:以上三个方法都是在被ObservableEmitter这个方法所调用的,关于这个方法又是做什么的呢?


3.2 关于上游Observable中的ObservableEmitter()方法说明

Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
然而,这并不意味着我们就可以随意的发送乱七八糟的事件,我们需要满足以下的规则才行:
1. 上游可以发送无限个onNext, 下游也可以接收无限个onNext;
2. 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件;
3. 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件;
4. 上游可以不发送onComplete或onError;
5. 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
说了这么多,那么我们的下游到底要如何接收处理呢,下面我们来看一下下游的创建。


3.3 下游Observer的创建
//创建一个下游 即Observer
    Observer<Integer> observer = new Observer<Integer>() {
      @Override public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
      }
      @Override public void onNext(@NonNull Integer integer) {
        Log.d(TAG, "onNext" + integer);
      }
      @Override public void onError(@NonNull Throwable e) {
        Log.d(TAG, "onError");
      }
      @Override public void onComplete() {
        Log.d(TAG, "onComplete");
      }
    };

在创建下游的时候我么发现,在下游除了接收了onNext、onError、onComplete三个方法之外,还有一个onSubscribe方法,那么这个方法是做啥的?这里我简单介绍下:

关于onSubscribe(Disposable d)方法的说明:

onSubscribe是在Observer中执行的第一个方法,在onNext方法之前执行,关于它传入的参数Disposable,可以将其理解成一个上游和下游之间的机关,当我们调用其对应的dispose()方法的时候,他就会将上游和下游之间切断,从而导致下游不会接收到新的事件。

注:调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件。
上面说了这么多,那么onNext、onError、onCompleted方法到底有何区别。


3.4 onNext、onComplete、onError三个方法的区别

我们通过以下三幅图来了解关于这三个方法的区别。

只发送onNext()事件
发送onNext和onComplete事件
发送onNext和onError事件

3.6 上游Observable与下游Observer的关联

我们现在已经创建好了上游和下游,该如何关联起来呢,很简单,我们可以用subscribe()方法进行关联:

//建立连接
observable.subscribe(observer);

关于observable()方法,它其实有很多的重载方法,具体如下:

observable重载的方法.png

下面我简单的对说明一下这些重载方法:

  1. 不带任何参数的 subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
  1. 带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
        emitter.onNext(4);
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });

打印结果如下:


打印结果

3.7 关于Observable的其他创建方法

除了我们使用Observable.create()方法创建Observable之外还有:
Observable.just():
注:just()方法里存放的其实就是onNext()发射的内容,其存放的个数最多为10个,超过10个则会报错,这是因为just()方法重载了10次。

Observable.fromArray():
注:fromArray()方法里存放的同样也是onNext()发射的内容,与just()方法区别的是fromArray()里面的参数是可变数组,存放的个数没有限制。

Observable.fromIterable():
注: fromIterable()方法里传入的参数为一个集合,集合中每个条目对应的就是一个onNext()发射的内容。

下面一一举例说明:

//Observable.just()创建方法
    Observable.just(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });
  //Observable.fromArray()创建方法
    Observable.fromArray(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });
  //Observable.fromIterable()创建方法
    List<Integer> list = new ArrayList<>();
    list.add(1);
    list.add(2);
    list.add(3);
    list.add(4);
    Observable.fromIterable(list).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });

3.8 RxJava的线程控制

正常情况下, 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.,例如我们在Activity的按钮点击事件中做如下操作:

Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        Log.d(TAG, "Observable所在的线程:" + Thread.currentThread().getName());
        e.onNext(1);
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(@NonNull Integer integer) throws Exception {
        Log.d(TAG, "Observer所在的线程:" + Thread.currentThread().getName());
        Log.d(TAG, "onNext: " + integer);
      }
    });
打印结果

上面这样肯定是满足不了我们的需求的, 我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI,该如何做?
案例分析说明RxJava中的线程切换:
需求:从网上获取云南电信所有频道的信息,将频道中的频道名显示到UI界面上。

 Observable.create(new ObservableOnSubscribe<Channel>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
        //网络请求频道的数据
        Channel channel = testApi.getChannel().execute().body();
        e.onNext(channel);
      }
    }).subscribe(new Consumer<Channel>() {
          @Override public void accept(@NonNull Channel channel) throws Exception {
            List<Channel.DataBean> data = channel.getData();
            for (Channel.DataBean dataBean : data) {
              tv.append(dataBean.getChannelName() + "\n");
            }
          }
        });

上面的方法在按钮点击事件里面执行的,即在主线程执行的;然而我们的Observable方法执行的是网络请求,一个耗时操作,应该在子线程执行才对,当我们执行上面代码会出现以下错误:

异常错误(一)

该如何将Observable切换到子线程呢?
RxJava中有一个subscribeOn()方法,可以指定Observable的线程,因此,我们将之前的代码修改如下:

Observable.create(new ObservableOnSubscribe<Channel>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
        //网络获取频道数据
        Channel channel = testApi.getChannel().execute().body();
        e.onNext(channel);
      }
    })
        .subscribeOn(Schedulers.io())    //切换上游线程操作
        .subscribe(new Consumer<Channel>() {
          @Override public void accept(@NonNull Channel channel) throws Exception {
            List<Channel.DataBean> data = channel.getData();
            for (Channel.DataBean dataBean : data) {
              tv.append(dataBean.getChannelName() + "\n");
            }
          }
        });

继续运行,发现还是报错了,什么原因?那是因为我们开始的时候将Observable切换到了子线程,根据之前说的,默认Observer里面执行的线程跟Observable线程是一致的,而这里Observer涉及到更新UI操作,应该在主线程执行才对。

错误异常(二)

该如何将Observer切换到子线程呢?
RxJava中有一个subscribeOn()方法,可以指定Observer的线程,因此,我们将之前的代码修改如下:

Observable.create(new ObservableOnSubscribe<Channel>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
        //网络获取频道数据
        Channel channel = testApi.getChannel().execute().body();
        e.onNext(channel);
      }
    })
        .subscribeOn(Schedulers.io())     //切换上游线程操作到io线程
        .observeOn(AndroidSchedulers.mainThread())  //切换下游线程到主线程
        .subscribe(new Consumer<Channel>() {
          @Override public void accept(@NonNull Channel channel) throws Exception {
            List<Channel.DataBean> data = channel.getData();
            for (Channel.DataBean dataBean : data) {
              tv.append(dataBean.getChannelName() + "\n");
            }
          }
        });

再来开一下运行结果:

正确的运行结果

以上表明我们的线程切换达到了我们需要的目的。

在RxJava中已经内置了很多线程选项供我们选择, 例如有:

Schedulers.io() : 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作

Schedulers.computation(): 代表CPU计算密集型的操作, 例如需要大量计算的操作

Schedulers.newThread(): 代表一个常规的新线程

AndroidSchedulers.mainThread(): 代表Android的主线程

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.


四、总结

关于RxJava的简单使用就介绍到这里,后期有空我会向大家介绍一下RxJava2的其他实用的方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容