什么是RxJava(ReactiveX.io链式编程)
定义:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
总结:RxJava 是一个 基于事件流、实现异步操作的库
理解:RXJava是一个响应式编程框架 ,采用观察者设计模式,观察者模式本身的目的就是『后台处理,前台回调』的异步机制优点:
由于 RxJava是基于事件流的链式调用,所以使得 RxJava:
逻辑简洁
实现优雅
使用简单
- 作用:
实现异步操作
类似于 Android中的 AsyncTask 、Handler作用
RxJava 有3个基本概念及原理
1.Observable(被观察者)
2.Observer(观察者)
3.subscribe(订阅)事件。
注意
1)RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
2)RxJava 规定,onNext() 接收被观察者发送的消息、可以执行多次;当不会再有新的 onNext () 发出时,需要触发 onCompleted () 方法作为标志。onError():事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
3)在一个正确运行的事件序列中, onCompleted() 和 onError () 有且只有一个,并且是事件序列中的最后一个。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
依赖库
//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 库
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'//转换器,请求结果转换成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'//配合Rxjava 使用
简单使用
public static void baseRx(){
//1.创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("1111");
emitter.onNext("2222");
emitter.onNext("3333");
emitter.onNext("4444");
//emitter.onError(new Throwable("abc"));
//emitter.onComplete();
}
});
//2.创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {//关闭线程
Log.e(TAG, "onSubscribe: " );
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+ s );
}
@Override
public void onError(Throwable e) {//失败
Log.e(TAG, "onError: "+e.getMessage() );
}
@Override
public void onComplete() {//成功
Log.e(TAG, "onComplete: " );
}
};
//3.被观察者订阅观察者
observable.subscribe(observer);
//线程切换
observable
//被订阅者在子线程中
.subscribeOn(Schedulers.io())
//订阅者在主线程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
//观察中可以重复指定线程
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())//主
.observeOn(Schedulers.io())//子
.observeOn(AndroidSchedulers.mainThread())//主
.subscribe(observer);
}
Android功能使用
final Retrofit homeRetrofit = new Retrofit.Builder()
.baseUrl(ApiServer.homeUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
final ApiServer server = homeRetrofit.create(ApiServer.class);
final Observable<HomeBean> home = server.getHome("" + count);
home.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<HomeBean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(HomeBean homeBean) {
List<HomeBean.ResultsBean> results = homeBean.getResults();
homeList.addAll(results);
srl.finishRefresh();
srl.finishLoadMore();
adapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
Log.e("TAG", "onError()" + e.getMessage());
}
@Override
public void onComplete() {
}
});
其他操作符使用( 查看操作符)
- 创建操作符
//遍历输出
public static void rxFrom(){
Integer[] a = {1,2,3,4,5};
// Observable.fromArray(1,2,3,4)
//Observable.fromArray("a","b","c")
Observable.fromArray(a).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer);
}
});
}
//数组合并输出
public static void rxJust(){
Integer[] a = {1,2,3};
Integer[] b = {9,8,7};
Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
@Override
public void accept(Integer[] integers) throws Exception {
for (Integer i: integers) {
Log.e(TAG, "accept: "+i);
}
}
});
}
//范围输出
public static void rxRange(){
Observable.range(0,20).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
//定时器
public static void rxInterval(){
Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
});
}
//闪屏
private void rxjavaInterval() {
final Long time = 5L;
subscribe = Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("TAG", "倒计时:" + aLong);
if (aLong < time && !subscribe.isDisposed()) {
tv.setText("记录改变生活" + (time - aLong - 1));
} else {
Intent intent = new Intent(WelcomActivity.this, MainActivity.class);
startActivity(intent);
finish();
}
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
subscribe.dispose();
subscribe = null;
}
- 过滤操作符
//过滤输出
public static void rxFilter(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer>3){
return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer );
}
});
}
- 变换操作符
①Map:通过指定一个Fun函数将Observeble转换成一个新的Observable对象并发射,观察者收到新的observable处理。
public static void rxMap(){
Integer[] a = {1,2,3,4,5};
Observable.fromArray(a).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer+"abc";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.e(TAG, "accept: "+s );
}
});
}