RxJava-入门就是那么简单

前言

第一次接触RxJava是在学习Retrofit的时候,那个时候经常看到别人都是Retrofit+RxJava一起使用的,于是后来自己也上网研究了一下,经过一段时间的学习总算是把RxJava给弄懂了,在这里就分享一下我的使用心得,给想入门的同学引一引路.

概述

RxJava是什么

对于RxJava,官方给的说法是一个使用Java虚拟机观察序列异步和基于事件的程序库;绕过这些官方语言,以我自己的话来说,它就是一个用观察者模式对程序进行异步控制的这么一个库.

观察者模式

既然提到了RxJava的核心是观察者模式,那么这里就简单的说说什么是观察者模式.观察者模式完美的将观察者和被观察的对象分离开,在这种模式中有两个对象:观察者(Observer)和被观察者(Observerable),他们两个通过订阅(Subscribe)的方式产生联系,当两者建立起联系以后,那么当被观察者作出一些变化之后,观察者能够立即获知到,并根据被观察者作出的变化作出相应的反应.
举个例子:护士和病人即观察者和被观察者,当病人不舒服的时候就按铃,护士听到铃声以后赶来照顾病人,它们之间通过铃作为纽带把它们联系到一起,这样一来护士就不需要时时刻刻盯着病人

RxJava的优点

说了那么多,那到底RxJava有啥好处呢,异步和简洁应该是它最大的优点,它可以随时随地的切换线程,同时又能保证代码的清晰简明.至于它是怎么做到的,后面我会一点一点的来分析.

入门

环境搭建

1.RxJava的GitHub地址:https://github.com/ReactiveX/RxJava
2.RxAndroid的GitHub地址:https://github.com/ReactiveX/RxAndroid
3.导入Jar包

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:latest.version'

rxandroid是一个对rxjava的扩展库,这个库里包含了一些和Android有关的内容,下面我会具体介绍到

入门案例

实现RxJava的步骤分三步
第一步:创建被观察者(Observable)
第二步:创建观察者(Observer)
第三步:订阅(Subscribe),即让观察者和被观察者产生联系
基于以上理论,下面我们用代码来演示一下
1.创建Observable

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
        e.onNext("123");
        e.onComplete();
    }
});

2.创建Observer

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.i(TAG,"onSubscribe");
    }

    @Override
    public void onNext(@NonNull String s) {
        Log.i(TAG,"onNext:"+s);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.i(TAG,"onError");
    }

    @Override
    public void onComplete() {
        Log.i(TAG,"onComplete");
    }
};

3.订阅

observable.subscribe(observer);

最终输出结果为:onSubscribe、onNext:123、onComplete,可以看到在创建Observable的时候,传入了一个ObservableOnSubscribe对象作为参数,它决定了事件序列执行顺序,而observer里的方法决定了,执行内容,最后通过订阅将两个对象绑定在一起
create() 是最基本的一种创建Observeable的方法,基于这个方法,RxJava为我们提供了很多种方法来创建事件队列.
fromArray(T...item) 传入多个参数,并将这些参数依次发给观察者
just(T...)fromArray(T...item) 差不多

操作符

可以说RxJava那么受欢迎,很大程度得益于操作符的存在,操作符可以让事件队列在发送的过程中进行转换加工处理,比如你创建的时候往队列里放了一个香蕉,但是经过中间转换以后,香蕉变成了苹果,这么说可能会比较抽象,下面我就简单的介绍几个常用操作符,来加深一下对它的理解

map

Observable.fromArray("小明")      //输入内容
        .map(new Function<String, String>() {   //map转换
            @Override
            public String apply(@NonNull String s) throws Exception {
                return s + "的爸爸";
            }
        })
        .subscribe(new Consumer<String>() {     //订阅
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, s);
            }
        });

这里出现了几个新东西Function,Consumer,咱们一一来解释其作用,首先是Function<T,R>,它就是一个接口,作用是传入一个值,然后传出另一个值,T代表传入值的类型,R代表传出值的类型,用它可以对输入值进行一些运算,返回一些其他值.map的作用就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列.而Consumer<T>则是一个接受单一值的函数接口,作用类似于Observer里的onNext 用来处理接收过来的值;现在我们可以这样来理解:Observable原先的序列中仅仅存储了小明这个事件,而如今经过map的变换,就把事件源中的小明改造成了小明的爸爸这一事件

flatMap

flatMap算是一个比较有用的变换,它的作用和map类似,但是我觉得它又比map高级多了,到底是哪里高级呢,下面我们通过一段代码来分析一下

Observable.fromArray("1","2")
        .flatMap(new Function<String, Observable<String>>() {
            @Override
            public Observable<String> apply(@NonNull String s) throws Exception {
                Log.e(TAG,"原来的事件:"+s);
                return Observable.fromArray("一","二");
            }
        })
        .subscribe(new Consumer<String>() {

            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG,"改造后的事件:"+s);
            }
        });

打印的Log如下

打印的Log

从结果看出mapflatMap类似,都是对数据进行了变换,不同的是flatMap返回的是Observeable对象,我们可以把之前的事件处理以后放到一个新的Observeable 中,需要注意的是这个新的Observeable 并不是直接发送事件,它还是被放到了原先的Observeable 中由原先的Observeable 来发送事件,下面画个图来理解一下这个过程
这里写图片描述

从图里可以看出,原先的Observeable序列中的事件经过变换,又变成了一个新的序列,最后再由原先Observeable把这些事件统一交给Subscriber的回调方法

当然了,类似的操作符还有很多,比如zip操作符,它就是把多个数据流合并到一处,最终发送到一个地方;总而言之,操作符的作用就是在事件发送过程中对数据进行处理的.

线程调度(Scheduler)

说到这线程调度嘛,这就是RxJava另一个牛逼的地方了;利用RxJava给的Scheduler咱们可以随时随地的切换线程,这就是RxJava实现异步的方法,具体怎么使用呢,且听我娓娓道来.

案例

在看代码前,有几个概念需要先说明一下,subscribeOn()observeOn() 这两个方法可以对线程进行切换,而具体切换到哪个线程则是由调度器Scheduler来控制,下面讲讲两个方法的使用以及作用范围.
observeOn
observeOn()可以多次调用,一般放在map和flatmap等操作符前,用来控制操作符中的操作,作用范围在下一个observeOn出现前

 observeable.just(T...)
            .observeOn(Schedulers.io())
            .map(1)
            .flatMap(2);
            .observeOn(Schedulers.newThread())
            .map(3)
            .subscribe(4)

结果是1、2是在io线程里执行的;3、4是在newThread线程里执行的
subscribeOn
subscribeOn()位置放在哪里都行,但只能调用一次;一般就是指定Observable创建和doOnSubscribe的线程;比如:Observeable.create(...);Observeable.fromArray(...)或者Observeable.doOnSubscribe(...)都由subscribeOn来控制,如果程序中没有调用过observeOn,而只调用了subscribeOn,那么程序里所有的mapflatMap都会在subscribeOn指定的线程中执行直到出现下个observeOn

Observable.fromArray(...)
          .map(1)
          .flatMap(2)
          .subscribeOn(Schedulers.io())

1、2都是在io线程中执行, 除非在程序中某处执行了observeOn

常用的Scheduler

调度器类型 作用范围
AndroidSchedulers.mainThread() AndroidUI线程
Schedulers.io() IO线程(读写文件、数据库和网络信息交互等)都可以放在IO线程里来执行,它的模式和newThread()差不多,但效率比newThread()高
Schedulers.newThread() 开启一个新线程
Schedulers.single() 单例线程,可以把两段程序放在同一个线程里顺序执行

背压(Backpressure)

概念

在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。那么随之而来的就是如何处理这些未处理的消息。举个例子,使用zip操作符将两个无限大的Observable压缩在一起,其中一个被观察者发送消息的速度是另一个的两倍。一个比较不靠谱的做法就是把发送比较快的消息缓存起来,当比较慢的Observable发送消息的时候取出来并将他们结合在一起。这样做就使得rxjava变得笨重而且十分占用系统资源。

解决这个问题的方法就是使用背压策略,在RxJava1.0中,如果使用Observeable的时候产生了背压问题系统是会抛MissingBackpressureException异常的,而在RxJava2.0中Observable不再支持背压,多出了Flowable(也是被观察者)来支持背压,当然了这里既然提到了一个新的被观察者,那么就不得不提它的搭档观察者了Subscriber 通常这两个是一起使用的,下面来看看使用Flowable 是如何使用背压策略的
Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)

Flowable的几种Backpressure策略

MISSING
如果流的速度无法保持同步,可能会抛出MissingBackpressureException或IllegalStateException。
BUFFER
上游不断的发出onNext请求,直到下游处理完,也就是和Observable一样了,缓存池无限大,最后直到程序崩溃
ERROR
会在下游跟不上速度时抛出MissingBackpressureException。
DROP
会在下游跟不上速度时把onNext的值丢弃。
LATEST
会一直保留最新的onNext的值,直到被下游消费掉。

下面咱们通过一段代码来具体分析

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int i = 0; i < 129; ++i){
                    e.onNext(i);
                }
                e.onComplete();

            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                    }
                    @Override
                    public void onNext(Integer s) {
                        Log.d(TAG, "onNext: " + s);
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "onError"+t);
                    }
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");

                    }
                });

在Flowable中背压采取拉取响应的方式来进行水流控制,也就是说Subscription是控制上下游水流的主要方式,一般的,我们需要调用Subscription.request()来传入一个下游目前能处理的事件的数量,不过你不传也是可以的,因为在Flowable内部已经设置了一个默认值(128),具体的可以去看Flowable的源码,如上面所写,我设置了s.request(3);表示下游只能出来3个事件,而上游我传了129个事件进来,下游显然是无法处理的,因此程序会抛出MissingBackpressureException异常;如果感兴趣的同学可以试试其他几种策略模式,这里就不再赘述了.

补充

也不知道该补充些啥,那么简单的介绍几个知识点吧(可能有点乱,凑合着看吧)

  1. Observablet.timer(long delay, TimeUnit unit)可以定时发送事件
  2. Filter操作符可以在数据数据发送过程中过滤掉一些数据
  3. 一般来说,Observable不会抛异常。它会调用 onError 终止Observable序列,以此通知所有的观察者发生了一个不可恢复的错误. 但是,也存在一些异常.例如:如果onError调用失败了,Observable不会尝试再次调用onError去通知观察者,它会抛出RuntimeException,OnErrorFailedException或者OnErrorNotImplementedException.
    4.暂时就这些了,以后想到的话我还会继续补充

结语

看完上面的介绍,首先你得明白我们到底拿RxJava来做什么,怎么做,有啥好处,如果你把上面的内容都看懂了,那么恭喜你你已经入门了;剩下的只需要在实际开发中去应用了,最后:如果有写的不对的地方还请见谅,也欢迎各位大神指正.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容