一、简介
RxJava是响应式编程(Reactive Extensions)的java实现,它基于观察者模式的实现了异步编程接口。
Rxjava 3.x 的github官网;
RxJava2将被支持到2021年2月28日,错误的会同时在2.x和3.x修复,但新功能只会在3.x上添加;
Rxjava 3.0的一些改变:官方Wiki;
Rxjava 3.x 文档可以在官方javadoc中找到;
使用Rxjava3.x之前的准备工作:
1.1 添加依赖:
//RxJava的依赖包
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
//RxAndroid的依赖包
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
1.2 将项目的编译目标设置更改为 java8:
android {
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
二、Rx概念
2.1 字段含义
Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式
Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念
Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者
Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现
emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射
items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项。
2.2 上/下流
在RxJava中,数据以流的方式组织:Rxjava包括一个源数据流,源数据流后跟着若干个用于消费数据流的步骤。
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
在代码中,对于operator2来说,在它前面叫做上流,在它后面的叫做下流。
2.3 流对象
在RxJava的文档中,emission, emits, item, event, signal, data and message都被认为在数据流中被传递的数据对象。
2.4 背压(Backpressure)
当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。
为此,RxJava带来了backpressure的概念。背压是一种流量的控制步骤,在不知道上流还有多少数据的情形下控制内存的使用,表示它们还能处理多少数据。背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
在Rxjava1.0中,有的Observable支持背压,有的不支持,为了解决这种问题,2.0把支持背压和不支持背压的Observable区分开来:支持背压的有Flowable类,不支持背压的有Observable,Single, Maybe and Completable类。
- 在订阅的时候如果使用FlowableSubscriber,那么需要通过s.request(Long.MAX_VALUE)去主动请求上游的数据项。如果遇到背压报错的时候,FlowableSubscriber默认已经将错误try-catch,并通过onError()进行回调,程序并不会崩溃;
- 在订阅的时候如果使用Consumer,那么不需要主动去请求上游数据,默认已经调用了s.request(Long.MAX_VALUE)。如果遇到背压报错、且对Throwable的Consumer没有new出来,则程序直接崩溃;
- 背压策略的上游的默认缓存池是128。
背压策略:
error, 缓冲区大概在128
buffer, 缓冲区在1000左右
drop, 把存不下的事件丢弃
latest, 只保留最新的
missing, 缺省设置,不做任何操作
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
2.5 线程调度器(Schedulers)
对于Android开发者而言,RxJava最简单的是通过调度器来方便地切换线程。在不同平台还有不同的调度器,例如我们Android的主线程:AndroidSchedulers.mainThread()。
属性 | 类型 |
---|---|
AndroidSchedulers.mainThread() | 需要引用rxandroid, 切换到UI线程 |
Schedulers.computation() | 用于计算任务,如事件循环和回调处理,默认线程数等于处理器数量 |
Schedulers.io() | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需求,它默认是一个CacheThreadScheduler |
Schedulers.newThread() | 为每一个任务创建一个新线程 |
Schedulers.trampoline() | 在当前线程中立刻执行,如当前线程中有任务在执行则将其暂停, 等插入进来的任务执行完成之后,在将未完成的任务继续完成。 |
Scheduler.from(executor) | 指定Executor作为调度器 |
2.6 事件调度器
RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进行释放事件,这样才不会导致内存泄漏,这里的管理者我们称为事件调度器(或事件管理者)CompositeDisposable。
2.7 基类
RxJava 3 中的基类相比RxJava 2 没啥改变,主要有以下几个基类:
- io.reactivex.Flowable:发送0个N个的数据,支持Reactive-Streams和背压
- io.reactivex.Observable:发送0个N个的数据,不支持背压,
- io.reactivex.Single:只能发送单个数据或者一个错误
- io.reactivex.Completable:没有发送任何数据,但只处理 onComplete 和 onError 事件。
- io.reactivex.Maybe:能够发射0或者1个数据,要么成功,要么失败。
2.8 Observables的"热"和"冷"
Observable什么时候开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
在一些ReactiveX实现里,还存在一种被称作Connectable的Observable,不管有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。
基本使用
需要知道的是,RxJava以观察者模式为骨架,有两种常见的观察者模式:
- Observable(被观察者)/Observer(观察者):不支持背压
- Flowable(被观察者)/Subscriber(观察者):支持背压
3.1 Observable/Observer用法:
Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
Observer mObserver=new Observer<Integer>() {
//这是新加入的方法,在订阅后发送数据之前,
//回首先调用这个方法,而Disposable可用于取消订阅
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e("lucas", "onNext: "+value );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mObservable.subscribe(mObserver);
这种观察者模型不支持背压:当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM。所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线作为参考)。
3.2 Flowable/Subscriber用法
Flowable.range(0,10)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
//当订阅后,会首先调用这个方法,其实就相当于onStart(),
//传入的Subscription s参数可以用于请求数据或者取消订阅
@Override
public void onSubscribe(Subscription s) {
Log.w("TAG","onsubscribe start");
sub=s;
sub.request(1);
Log.w("TAG","onsubscribe end");
}
@Override
public void onNext(Integer o) {
Log.w("TAG","onNext--->"+o);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("TAG","onComplete");
}
});
Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
当然,Flowable也可以通过creat()来创建:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}
//需要指定背压策略
, BackpressureStrategy.BUFFER);
Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的。
根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会立刻执行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)这个方法调用之前做好初始化的工作;
当然,这也不是绝对的,我在测试的时候发现,通过create()自定义Flowable的时候,即使调用了subscription.request(n)方法,也会等onSubscribe()方法中后面的代码都执行完之后,才开始调用onNext。
TIPS: 尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。
3.3 其他的观察者
最常用的其实就是上面说的两种订阅观察者,但是一些情况下,我们也会用到一些其他的一类观察者比如
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
3.3.1 Single/SingleObserver用法
//被观察者
Single<String> single = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> e) throws Exception {
e.onSuccess("test");
e.onSuccess("test2");//错误写法,重复调用也不会处理,因为只会调用一次
}
});
//订阅观察者SingleObserver
single.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//相当于onNext和onComplete
Log.d("lucas", s );
}
@Override
public void onError(Throwable e) {
}
});
//运行结果
2020-04-03 23:02:37.337 15462-15462/com.ysalliance.getfan.myapplication D/lucas: test
Single类似于Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值(当然就不存在背压问题),所以当你使用一个单一连续事件流,这样你可以使用Single。Single观察者只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError。因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两个方法:
onSuccess - Single发射单个的值到这个方法
onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法
Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
Single的操作符:
Single也可以组合使用多种操作,一些操作符让你可以混合使用Observable和Single:
详细可参考:Single操作符
3.3.2 Completable/CompletableObserver
如果你的观察者连onNext事件都不关心,可以使用Completable,它只有onComplete和onError两个事件:
Completable.create(new CompletableOnSubscribe() {//被观察者
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.onComplete();//单一onComplete或者onError
}
}).subscribe(new CompletableObserver() {//观察者
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
Log.e("lucas", "onComplete: ");
}
@Override
public void onError(Throwable e) {
}
});
//打印结果
2020-04-03 23:12:08.099 16264-16264/com.ysalliance.getfan.myapplication E/lucas: onComplete:
3.3.3 Maybe/MaybeObserver
如果你有一个需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。
Maybe可能会调用以下其中一种情况(也就是所谓的Maybe):
onSuccess或者onError
onComplete或者onError
可以看到onSuccess和onComplete是互斥的存在,例子代码如下:
//被观察者
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
e.onSuccess("test");//发送一个数据的情况,或者onError,不需要再调用onComplete(调用了也不会触发onComplete回调方法)
//e.onComplete();//不需要发送数据的情况,或者onError
}
});
//订阅观察者
maybe.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//发送一个数据时,相当于onNext和onComplete,但不会触发另一个方法onComplete
Log.i("lucas", s);
}
@Override
public void onComplete() {
//无数据发送时候的onComplete事件
Log.i("lucas", "onComplete");
}
@Override
public void onError(Throwable e) {
}
});
//打印结果
2020-04-03 23:14:40.266 16558-16558/com.ysalliance.getfan.myapplication I/lucas: test
要转换成其他类型的被观察者,也是可以使用toFlowable()、toObservable()等方法去转换。
//判断是否登陆
Maybe.just(isLogin())
//可能涉及到IO操作,放在子线程
.subscribeOn(Schedulers.newThread())
//取回结果传到主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上面就是Maybe/MaybeObserver的普通用法,你可以看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以用这种观察者模式
3.4 事件调度器释放事件
public class Main {
private static CompositeDisposable mRxEvent = new CompositeDisposable();
public static void main(String[] args) {
Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但还是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//对应onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//对应onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//对应onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//对应onSubscribe()
}
});
mRxEvent.add(subscribe);
mRxEvent.clear();
}
}
CompositeDisposable提供的方法中,都是对事件的管理
- dispose():释放所有事件
- clear():释放所有事件,实现同dispose()
- add():增加某个事件
- addAll():增加所有事件
- remove():移除某个事件并释放
- delete():移除某个事件