引言
该篇文章主要是关于RxJava的基本使用的代码讲解。
1.两种使用方式
RxJava的两种使用方式分别为:
- 分步骤使用
- 基于事件流的链式调用
2. 第一种方式:分步骤使用
/**
* RxJava的使用方式1:分步骤实现
*/
private void useMethod1() {
//1. 创建被观察者对象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// create() 是 RxJava 最基本的创造事件序列的方法
// 此处传入了一个 OnSubscribe 对象参数
// 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
// 2. 在复写的subscribe()里定义需要发送的事件
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
//创建观察者
//方式1:采用Observer接口
Observer<Integer> observer = new Observer<Integer>() {
//观察者接收事件前,默认最先调用复写onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Logger.d("开始采用subscribe连接");
}
//当被观察者产生Next事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onNext(Integer value) {
Logger.d("对Next事件作出响应" + value);
}
//当被观察者产生Error事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onError(Throwable e) {
Logger.d("对Error事件作出响应");
}
//当被观察者产生Complete事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onComplete() {
Logger.d("对Complete事件作出响应");
}
};
observable.subscribe(observer);
}
3. 第二种方式:基于事件流的链式调用
/**
* RxJava的使用方式2:基于事件流的链式调用
*/
private void useMethod2(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
//观察者接收事件前,默认最先调用复写onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Logger.d("开始采用subscribe连接");
}
//当被观察者产生Next事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onNext(Integer value) {
Logger.d("对Next事件作出响应" + value);
}
//当被观察者产生Error事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onError(Throwable e) {
Logger.d("对Error事件作出响应");
}
//当被观察者产生Complete事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onComplete() {
Logger.d("对Complete事件作出响应");
}
});
}
- log日志:
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 开始采用subscribe连接
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Next事件作出响应1
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Next事件作出响应2
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Next事件作出响应3
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Complete事件作出响应
接下来我们看看观察者Observer的重写方法onSubscribe方法中的Disposable的作用,我们可以利用Disposable断开观察者和被观察者的连接,修改Observer的重写方法如下所示:
//创建观察者
//方式1:采用Observer接口
Observer<Integer> observer = new Observer<Integer>() {
//观察者接收事件前,默认最先调用复写onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Logger.e("开始采用subscribe连接");
mDisposable = d;
}
//当被观察者产生Next事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onNext(Integer value) {
Logger.e("对Next事件作出响应" + value);
if (value == 2) {
// 设置在接收到第二个事件后切断观察者和被观察者的连接
mDisposable.dispose();
Logger.e("已经切断了连接:" + mDisposable.isDisposed());
}
}
//当被观察者产生Error事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onError(Throwable e) {
Logger.e("对Error事件作出响应");
}
//当被观察者产生Complete事件&观察者接受到时,会调用该复写方法进行相应
@Override
public void onComplete() {
Logger.e("对Complete事件作出响应");
}
};
observable.subscribe(observer);
}
对应的log日志:
03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 开始采用subscribe连接
03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 对Next事件作出响应1
03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 对Next事件作出响应2
03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 已经切断了连接:true
参考文章