前言
第一次接触RxJava是在学习Retrofit的时候,那个时候经常看到别人都是Retrofit+RxJava一起使用的,于是后来自己也上网研究了一下,经过一段时间的学习总算是把RxJava给弄懂了,在这里就分享一下我的使用心得,给想入门的同学引一引路.
概述
RxJava是什么
对于RxJava,官方给的说法是一个使用Java虚拟机观察序列异步和基于事件的程序库;绕过这些官方语言,以我自己的话来说,它就是一个用观察者模式对程序进行异步控制的这么一个库.
观察者模式
既然提到了RxJava的核心是观察者模式,那么这里就简单的说说什么是观察者模式.观察者模式完美的将观察者和被观察的对象分离开,在这种模式中有两个对象:观察者(Observer)和被观察者(Observerable),他们两个通过订阅(Subscribe)的方式产生联系,当两者建立起联系以后,那么当被观察者作出一些变化之后,观察者能够立即获知到,并根据被观察者作出的变化作出相应的反应.
举个例子:护士和病人即观察者和被观察者,当病人不舒服的时候就按铃,护士听到铃声以后赶来照顾病人,它们之间通过铃作为纽带把它们联系到一起,这样一来护士就不需要时时刻刻盯着病人
RxJava的优点
说了那么多,那到底RxJava有啥好处呢,异步和简洁应该是它最大的优点,它可以随时随地的切换线程,同时又能保证代码的清晰简明.至于它是怎么做到的,后面我会一点一点的来分析.
入门
环境搭建
1.RxJava的GitHub地址:https://github.com/ReactiveX/RxJava
2.RxAndroid的GitHub地址:https://github.com/ReactiveX/RxAndroid
3.导入Jar包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:latest.version'
rxandroid是一个对rxjava的扩展库,这个库里包含了一些和Android有关的内容,下面我会具体介绍到
入门案例
实现RxJava的步骤分三步
第一步:创建被观察者(Observable)
第二步:创建观察者(Observer)
第三步:订阅(Subscribe),即让观察者和被观察者产生联系
基于以上理论,下面我们用代码来演示一下
1.创建Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("123");
e.onComplete();
}
});
2.创建Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i(TAG,"onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i(TAG,"onNext:"+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i(TAG,"onError");
}
@Override
public void onComplete() {
Log.i(TAG,"onComplete");
}
};
3.订阅
observable.subscribe(observer);
最终输出结果为:onSubscribe、onNext:123、onComplete,可以看到在创建Observable的时候,传入了一个ObservableOnSubscribe对象作为参数,它决定了事件序列执行顺序,而observer里的方法决定了,执行内容,最后通过订阅将两个对象绑定在一起
create()
是最基本的一种创建Observeable的方法,基于这个方法,RxJava为我们提供了很多种方法来创建事件队列.
fromArray(T...item)
传入多个参数,并将这些参数依次发给观察者
just(T...)
和fromArray(T...item)
差不多
操作符
可以说RxJava那么受欢迎,很大程度得益于操作符的存在,操作符可以让事件队列在发送的过程中进行转换加工处理,比如你创建的时候往队列里放了一个香蕉,但是经过中间转换以后,香蕉变成了苹果,这么说可能会比较抽象,下面我就简单的介绍几个常用操作符,来加深一下对它的理解
map
Observable.fromArray("小明") //输入内容
.map(new Function<String, String>() { //map转换
@Override
public String apply(@NonNull String s) throws Exception {
return s + "的爸爸";
}
})
.subscribe(new Consumer<String>() { //订阅
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, s);
}
});
这里出现了几个新东西Function,Consumer,咱们一一来解释其作用,首先是Function<T,R>
,它就是一个接口,作用是传入一个值,然后传出另一个值,T代表传入值的类型,R代表传出值的类型,用它可以对输入值进行一些运算,返回一些其他值.map的作用就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列.而Consumer<T>
则是一个接受单一值的函数接口,作用类似于Observer里的onNext
用来处理接收过来的值;现在我们可以这样来理解:Observable原先的序列中仅仅存储了小明这个事件,而如今经过map的变换,就把事件源中的小明改造成了小明的爸爸这一事件
flatMap
flatMap
算是一个比较有用的变换,它的作用和map
类似,但是我觉得它又比map
高级多了,到底是哪里高级呢,下面我们通过一段代码来分析一下
Observable.fromArray("1","2")
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(@NonNull String s) throws Exception {
Log.e(TAG,"原来的事件:"+s);
return Observable.fromArray("一","二");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG,"改造后的事件:"+s);
}
});
打印的Log如下
从结果看出
map
和flatMap
类似,都是对数据进行了变换,不同的是flatMap
返回的是Observeable
对象,我们可以把之前的事件处理以后放到一个新的Observeable
中,需要注意的是这个新的Observeable
并不是直接发送事件,它还是被放到了原先的Observeable
中由原先的Observeable
来发送事件,下面画个图来理解一下这个过程从图里可以看出,原先的
Observeable
序列中的事件经过变换,又变成了一个新的序列,最后再由原先Observeable
把这些事件统一交给Subscriber
的回调方法
当然了,类似的操作符还有很多,比如zip操作符,它就是把多个数据流合并到一处,最终发送到一个地方;总而言之,操作符的作用就是在事件发送过程中对数据进行处理的.
线程调度(Scheduler)
说到这线程调度嘛,这就是RxJava另一个牛逼的地方了;利用RxJava给的Scheduler咱们可以随时随地的切换线程,这就是RxJava实现异步的方法,具体怎么使用呢,且听我娓娓道来.
案例
在看代码前,有几个概念需要先说明一下,subscribeOn()
和 observeOn()
这两个方法可以对线程进行切换,而具体切换到哪个线程则是由调度器Scheduler
来控制,下面讲讲两个方法的使用以及作用范围.
observeOn
observeOn()可以多次调用,一般放在map和flatmap等操作符前,用来控制操作符中的操作,作用范围在下一个observeOn出现前
observeable.just(T...)
.observeOn(Schedulers.io())
.map(1)
.flatMap(2);
.observeOn(Schedulers.newThread())
.map(3)
.subscribe(4)
结果是1、2是在io线程里执行的;3、4是在newThread线程里执行的
subscribeOn
subscribeOn()位置放在哪里都行,但只能调用一次;一般就是指定Observable创建和doOnSubscribe的线程;比如:Observeable.create(...)
;Observeable.fromArray(...)
或者Observeable.doOnSubscribe(...)
都由subscribeOn
来控制,如果程序中没有调用过observeOn
,而只调用了subscribeOn
,那么程序里所有的map
、flatMap
都会在subscribeOn
指定的线程中执行直到出现下个observeOn
Observable.fromArray(...)
.map(1)
.flatMap(2)
.subscribeOn(Schedulers.io())
1、2都是在io线程中执行, 除非在程序中某处执行了observeOn
常用的Scheduler
调度器类型 | 作用范围 |
---|---|
AndroidSchedulers.mainThread() | AndroidUI线程 |
Schedulers.io() | IO线程(读写文件、数据库和网络信息交互等)都可以放在IO线程里来执行,它的模式和newThread()差不多,但效率比newThread()高 |
Schedulers.newThread() | 开启一个新线程 |
Schedulers.single() | 单例线程,可以把两段程序放在同一个线程里顺序执行 |
背压(Backpressure)
概念
在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。那么随之而来的就是如何处理这些未处理的消息。举个例子,使用zip操作符将两个无限大的Observable压缩在一起,其中一个被观察者发送消息的速度是另一个的两倍。一个比较不靠谱的做法就是把发送比较快的消息缓存起来,当比较慢的Observable发送消息的时候取出来并将他们结合在一起。这样做就使得rxjava变得笨重而且十分占用系统资源。
解决这个问题的方法就是使用背压策略,在RxJava1.0中,如果使用Observeable的时候产生了背压问题系统是会抛MissingBackpressureException异常的,而在RxJava2.0中Observable不再支持背压,多出了Flowable
(也是被观察者)来支持背压,当然了这里既然提到了一个新的被观察者,那么就不得不提它的搭档观察者了Subscriber
通常这两个是一起使用的,下面来看看使用Flowable
是如何使用背压策略的
Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)
Flowable的几种Backpressure策略
MISSING
如果流的速度无法保持同步,可能会抛出MissingBackpressureException或IllegalStateException。
BUFFER
上游不断的发出onNext请求,直到下游处理完,也就是和Observable一样了,缓存池无限大,最后直到程序崩溃
ERROR
会在下游跟不上速度时抛出MissingBackpressureException。
DROP
会在下游跟不上速度时把onNext的值丢弃。
LATEST
会一直保留最新的onNext的值,直到被下游消费掉。
下面咱们通过一段代码来具体分析
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int i = 0; i < 129; ++i){
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError"+t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
在Flowable中背压采取拉取响应的方式来进行水流控制,也就是说Subscription是控制上下游水流的主要方式,一般的,我们需要调用Subscription.request()来传入一个下游目前能处理的事件的数量,不过你不传也是可以的,因为在Flowable内部已经设置了一个默认值(128),具体的可以去看Flowable的源码,如上面所写,我设置了s.request(3);表示下游只能出来3个事件,而上游我传了129个事件进来,下游显然是无法处理的,因此程序会抛出MissingBackpressureException异常;如果感兴趣的同学可以试试其他几种策略模式,这里就不再赘述了.
补充
也不知道该补充些啥,那么简单的介绍几个知识点吧(可能有点乱,凑合着看吧)
- 用
Observablet.timer(long delay, TimeUnit unit)
可以定时发送事件 -
Filter
操作符可以在数据数据发送过程中过滤掉一些数据 - 一般来说,Observable不会抛异常。它会调用 onError 终止Observable序列,以此通知所有的观察者发生了一个不可恢复的错误. 但是,也存在一些异常.例如:如果
onError
调用失败了,Observable不会尝试再次调用onError
去通知观察者,它会抛出RuntimeException,OnErrorFailedException或者OnErrorNotImplementedException.
4.暂时就这些了,以后想到的话我还会继续补充
结语
看完上面的介绍,首先你得明白我们到底拿RxJava来做什么,怎么做,有啥好处,如果你把上面的内容都看懂了,那么恭喜你你已经入门了;剩下的只需要在实际开发中去应用了,最后:如果有写的不对的地方还请见谅,也欢迎各位大神指正.