之前在使用Retrofit
进行网络请求时,里面用到了RxJava
,但是对其却不甚理解,只是照搬网上所查,对其进行简单的使用。现在在空余时间进行重新学习,并记录下来,期望能帮助后学者。
那RxJava
是什么呢?它是一个基于事件流,实现异步操作的库
,顾名思义我们用它来进行异步操作
。它被推崇的原因在与简洁
,即使在程序逻辑变的越来越复杂的情况下,依然能够保持简洁。RxJava
的异步实现是通过观察者模式
实现的。
RxJava
有四个基本概念:Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)、事件。Observable
和 Observer
通过 subscribe()
方法实现订阅关系,从而 Observable
可以在需要的时候发出事件来通知Observer
。下面我们来进行简单的使用。
添加依赖
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.2.0'
代码所在网址:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
首先,创建被观察者 Observable
,它决定什么时候触发事件以及触发怎样的事件。
创建:create(OnSubscribe)
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello RxJava!");
subscriber.onCompleted();
}
});
创建:just(T...)
,将传入的参数依次发送出来
Observable<String> Observable1 = Observable.just("Hello RxJava!");
创建:from(T[]) / from(Iterable<? extends T>)
,将传入的数组或者集合拆分成具体对象后,依次发送出来
String[] strs = new String[]{"Hello", "RxJava", "!"};
Observable<String> Observable2 = Observable.from(strs);
List<String> strList = Arrays.asList(strs);
Observable<String> Observable3 = Observable.from(strList);
注意:上面创建Observable
的三种方法是等价的。
其次,创建观察者 Observer
,它决定触发的事件将会有什么行为。
创建:Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {//事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
Log.e("LHC", "RxJava-->onCompleted");
}
@Override
public void onError(Throwable e) {//事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
e.printStackTrace();
Log.e("LHC", "RxJava-->onError");
}
@Override
public void onNext(String s) {//发送事件
Log.e("LHC", "RxJava-->onNext:" + s);
tvRxJava.setText(s);
}
};
注意:在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
创建:Subscriber
,Subscriber是实现了Observer的抽象类,对Observer进行了一点扩展
/**
* 创建观察者二
* 除了使用Observer来创建观察者外,还可以使用Subscriber,Subscriber是实现了Observer的抽象类,对Observer进行了一点扩展
* 但是他们的使用方法是一样的。额外增加了onStart()和unsubscribe()方法。
* unsubscribe():取消订阅
* onStart():它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作
*/
subscriber = new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
//这是Subscriber增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作
}
@Override
public void onCompleted() {
Log.e("LHC", "RxJava-->onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.e("LHC", "RxJava-->onError");
}
@Override
public void onNext(String s) {
Log.e("LHC", "RxJava-->onNext:" + s);
tvRxJava.setText(s);
}
};
最后,订阅subscribe
/**
* observable.subscribe(observer) 这种关联也是将观察者observer转换成Subscription,然后下面的处理和调用
* observable.subscribe(subscriber) 的处理是一样的
*/
observable.subscribe(observer);//订阅,将观察者和被观察者关联
observable.subscribe(subscriber);//订阅,将观察者和被观察者关联
除了上面两种订阅方式外,subscribe
还支持不完整定义的回调,RxJava
会自动根据定义创建出 Subscriber
,代码如下:
Observable<String> observable = Observable.just("Hello", ", RxJava!!");
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.e("LHC", "Action1-->onNextAction:"+ s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable e) {
Log.e("LHC", "Action1-->onErrorAction");
e.printStackTrace();
}
};
Action0 onCompleted = new Action0() {
@Override
public void call() {
Log.e("LHC", "Action1-->onCompleted");
}
};
observable.subscribe(onNextAction);// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction, onErrorAction);// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction, onCompleted);// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
注意:
1.Action0
,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。
2.Action1
, 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。
这样RxJava
的简单使用流程就完成了。上面为了介绍我们是分步骤实现的,下面我们使用链式调用来实现:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello RxJava!");
subscriber.onCompleted();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.e("LHC", "RxJava-->onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.e("LHC", "RxJava-->onError");
}
@Override
public void onNext(String s) {
Log.e("LHC", "RxJava-->onNext:" + s);
tvRxJava.setText(s);
}
});
当然也可以写成:
Observable.just("Hello", ", RxJava!!").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("LHC", "Action1:"+ s);
}
});
这样代码足够简单了吧...
RxJava的使用之变换
RxJava的使用之Scheduler