RxJava的作用
这里引用github上的说明
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
RxJava是基于观察者模式
- 简单的说一下什么是观察者模式
Observable==>被观察者
Observer ==> 观察者
subscribe ==> 订阅关系
观察者模式需要理解的是,其实并不是观察者一直在观察着被观察者,然后发现被观察者的动作.
事实上是被观察者对象发生了任何动作之后,通过接口反馈给观察者.
这跟订报纸很相像:订报纸的人
(观察者) 花了100块订阅
(订阅关系)了报社
(被观察者)一年的报纸.
然后那,报社每次有新报纸了之后,主动给订报纸的人把报纸送过去.这就是一个简单的观察者模式.
报社
/**
* 这里假设被观察者是报社
*/
public class 报社 {
private List<订阅关系> 订阅组 = new ArrayList<>();
public void 订阅报纸(订阅关系 订阅关系){
订阅组.add(订阅关系);
}
public void 有新版报纸了(){
for (订阅关系 订阅关系 : 订阅组) {
订阅关系.送报纸();
}
}
}
订报纸的人A
/**
* 这里是订阅报纸的人A
*/
public class 订报纸的人A implements 订阅关系 {
private String TAG = "订报纸的人A";
@Override
public void 送报纸() {
Log.e(TAG, "收到新版报纸: ");
}
}
订报纸的人B
/**
* 这里是订阅报纸的人B
*/
public class 订报纸的人B implements 订阅关系 {
private String TAG = "订报纸的人B";
@Override
public void 送报纸() {
Log.e(TAG, "收到新版报纸: ");
}
}
订阅关系
/**
* 花100块/年在报社定订阅报纸
*/
public interface 订阅关系 {
void 送报纸();
}
订阅报纸
var 订报纸的人A = 订报纸的人A()
var 订报纸的人B = 订报纸的人B()
val 报社 = 报社()
报社.订阅报纸(订报纸的人A)
报社.订阅报纸(订报纸的人B)
报社.有新版报纸了()
打印结果.png
RxJava的简单使用
-
创建被观察者对象
//被观察者 Observable observable=Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("第一周的报纸"); emitter.onNext("第二周的报纸"); emitter.onNext("第三周的报纸"); emitter.onComplete(); } });
- Observable==>被观察者;
- ObservableOnSubscribe<T> 这里可以理解为计划表;
- 重写subscribe方法,里边是计划表的具体内容;
- ObservableEmitter<String>对象的Emitter是发射器的意思;
- ObservableEmitter有三种发射的方法,分别是onNext、onError、onComplete()
- onNext方法可以无限调用,表示下一个发射的意思.
- onComplete可以重复调用,但是Observer(观察者)只会接收一次,接收到既结束;
- onError不可以重复调用,在事件处理过程中出异常时,onError() 会被触发.
- onCompleted() 和 onError() 二者是互斥的,Observer(观察者)只能接收到一个.
-
创建观察者对象
//观察者 Observer<String> observer=new Observer<String>() { @Override public void onSubscribe(Disposable d) { mDisposable=d; Log.e(TAG,"onSubscribe"); } @Override public void onNext(String value) { if ("第二周的报纸".equals(value)){ mDisposable.dispose(); return; } Log.e(TAG,"onNext:"+value); } @Override public void onError(Throwable e) { Log.e(TAG,"onError="+e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"onComplete()"); } };
- onSubscribe 当订阅成功的时候会调用
- onNext对应的Observable onNext方法(),Observable 中调用几次,不出意外这里就会接收到几次.
- onComplete订阅时间完成,这里只能接收到一次.结束本次订阅.
- onError出现异常时调用.结束本次订阅.
- Disposable 有两个方法 mDisposable.dispose() mDisposable.isDisposed
- dispose()主动解除订阅
- isDisposed():查询是否解除订阅 true 代表 已经解除订阅
RxJava如不及时取消订阅,当Activity或者Fragment销毁的时候可能会造成内存泄漏.
取消订阅之后,将不会在接受到后续的onNext事件.
-
建立订阅关系
observable.subscribe(observer);
Log.png
- 这里我只收到了前两周的报纸.因为我们在收到第二周的报纸之后取消了订阅.
我们注销掉 mDisposable.dispose();这行代码之后.
销掉 mDisposable.dispose()之后的Log.png - 这次调用了onComplete.完成了整个订阅事件.