1 定义
Rxjava是基于事件流实现异步操作的库
2 解决了什么问题
通过基于事件流的链式调用可以简洁&优雅地实现异步操作
注:在逻辑复杂性越高的情况下,简洁&优雅的特点更突出
3 原理
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的
3.1 4个成员
成员 | 作用 |
---|---|
被观察者(observable) | 产生事件 |
观察者 (observer) | 接受&响应事件 |
订阅(subscribe) | 连接 被观察者&观察者 |
事件(event) | 被观察者 & 观察者 沟通的载体 |
- 三种事件
Rxjava不仅把每个事件单独处理,还会把他们看做一个队列
事件类型 | 描述 | 使用 | 事件的回调方法 |
---|---|---|---|
Next | 普通事件,将要处理的事件添加到事件队列中 | Observable: 可发送无限个Next事件 Observer: 可接受无限个Next事件 |
onNext() |
Complete | 事件队列完结,意味着不再发出Next事件 | 该事件唯一 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件 |
onCompleted() |
Error | 事件队列异常,意味着处理事件的过程中发生异常、队列终止,不允许再有事件发出 | 该事件唯一 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件 |
onError() |
注:onComplete和onError唯一并且互斥
3.2 基本流程
3.3 基本使用
3.3.1 添加依赖
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.0.7'
3.3.2 创建被观察者&产生事件
//通过create创建被观察者和创建事件序列
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
//定义需要发送的事件序列
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//ObservableEmitter是事件发射器,做了两件事:定义发射事件 & 向观察者发送事件
e.onNext("菜品1");
e.onNext("菜品2");
e.onNext("菜品3");
}
});
3.3.3 创建观察者&定义响应事件的行为
//创建观察者对象
Observer<String> observer = new Observer<String>() {
//在接受事件前调用
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始订阅subscribe");
}
//接受被观察者发送的Next事件并响应,
@Override
public void onNext(String s) {
Log.d(TAG, "接收到Next事件:" + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "接收到onError事件:");
}
@Override
public void onComplete() {
Log.d(TAG, "接收到onError事件:");
}
};
注 :除了使用Observer 接口创建观察者,还可以使用Subscriber 抽象类,用法一致,本质上,在订阅subscribe的过程中,Observer总是会先被转换成Subscriber再使用
Subscriber 抽象类对Observer接口进行了扩展,增加了两个方法:
(1) onStart():在还未响应事件前调用,用于做一些初始化工作
(2) unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露
3.3.4 通过订阅连接观察者和被观察者
observable.subscribe(observer);
3.3.5 链式调用
将前面的三个步骤串起来
//通过create创建被观察者和创建事件序列
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
//定义需要发送的事件序列
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//ObservableEmitter是事件发射器,做了两件事:定义发射事件 & 向观察者发送事件
e.onNext("菜品1");
e.onNext("菜品2");
e.onNext("菜品3");
}
}).subscribe(new Observer<String>() {
//在接受事件前调用
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始订阅subscribe");
}
//接受被观察者发送的Next事件并响应,
@Override
public void onNext(String s) {
Log.d(TAG, "接收到Next事件:" + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "接收到onError事件:");
}
@Override
public void onComplete() {
Log.d(TAG, "接收到onError事件:");
}
});
事件接受序列