0 处识
没有高大上的理由,就是厌烦了AsyncTask和Handler
感觉代码臃肿、开发效率低、内存泄露,但是当年就是这样过来的,后来看到RxJava优雅的切换线程,震惊了!为了这么优雅的方式就要深入研究他一下(当时不知道有那么多操作符,而且也不知道处理复杂的业务逻辑也可以那么优雅)
- 线程切换
当年就是被
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
这两句给吸引的,subscribeOn和observeOn来切换线程。
subscribeOn:影响被观察者的执行线程,当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用
observeOn:影响观察者的执行线程,如果想要多次改变线程,可以多次使用 observeOn
现在整理了一下代码
private static <T> Observable.Transformer<T, T> createIOToMainThreadScheduler() {
return tObservable -> tObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
不用每次都麻烦的调用这两行。
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
- Schedulers.computation(): CPU 密集型计算所使用的 Scheduler
- AndroidSchedulers.mainThread(): 在 Android 主线程运行
- 内存泄露
虽然很方便使用了RxJava的线程切换发现还是会内存泄露,不过可以用一下三种方法解决
- CompositeSubscription
可以把页面中的请求都添加到CompositeSubscription中,然后在onDestory的时候,统一取消 - Rxlifecycle
第三方库取消订阅,把请求和组件的生命周期绑定到了一起,这样组件生命周期结束请求也结束了 - 自己动手
@Override
public void onDestroyView() {
super.onDestroyView();
unsubscribe();
}
protected void unsubscribe() {
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
}
比如项目中突然有个逻辑用到了Rxjava,没有生命周期也不方便建立一个CompositeSubscription就自己动手。
- 操作符
再到后来慢慢熟悉,发现有很多丰富的操作符可以使用,并且都是可以解决很多负责的现实问题的,比如create、from、filter等简单的,下面说一下项目中用到比较多的
- interval:创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
在注册、手机号登陆的时候,倒计时会用到 - concat:接收若干个Observables,发射有序数据流
在取数据缓存的时候用到,内存缓存、本地缓存、网络,哪一层有数据立即返回
Observable.concat(memorySource, diskSource, networkSource)
.takeFirst(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
- zip: 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
这个用的多,因为一般页面初始时都不止一个网络请求,我们希望多个网络请求结束后在一次性展示页面,所以这个可以很好的解决这个问题 - flatMap: 上面zip操作符是多个没有关系的Observable的组合,flatMap是可以把多个有先后顺序的Observable组合在一起的,比如先注册,注册成功了去登录,登录成功了调用用户信息 这个需求就可以用flatMap把他们组合在一起
- zipWith:
/**
* 重试机制
* 重复次数:5
* 间隔时间:interval的次方数,比如interval=2,第一次2s, 第二次4s, 第三次8s, 第四次16s, 第五次32s 结束
* 条件:没网络不重试,自定义逻辑不重试,网络连接并且网络不稳定才重试
*/
public static class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {
private long mInterval;
public RetryWhenProcess(long interval) {
mInterval = interval;
}
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
//没有网络,直接返回,不重试了
if (throwable instanceof UnknownHostException || !NetWorkUtils.isNetworkAvailable()) {
return Observable.error(throwable);
}
return Observable.just(throwable).zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer i) {
return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override
public Observable<? extends Long> call(Integer retryCount) {
return Observable.timer((long) Math.pow(mInterval, retryCount), TimeUnit.SECONDS);
}
});
}
});
}
});
}
}
- sample: 定期扫描源Observable产生的结果,在指定的间隔周期内进行采样
当时在项目中解决了背压问题,上游发送的流过快,下游就会throw error,用sample采样来减慢上游发送流的速度 - throttleFirst: 功能防抖,一个btn连续快速点击多次,如果不作处理,会连续调用click事件
- debuounce:联想搜索功能,在一定的时间不发送变化,在执行搜索动作
- ...
操作符太多了,而且还没有用一遍,后来工作中发现可悲的是遇到一个复杂的业务逻辑不知道有没有操作符能解决这个问题,所以就推荐一个地址Rxjava操作符
总结
用了很长一段时间了,发现好久没有用Handler、AsyncTask了,为什么呢?RxJava简洁、提升了开发效率、响应式编程、代码优雅等优点,需要研究一下源码看看底层代码是如何的,结果代码太庞大了,不过可以带着疑问去一段一段的读,不能像读Retrofit一样一口气读完,比如整个流是从哪里开始的(subscribe()开始的)?怎么切换的线程(handler)?底层是什么样的线程(比如Schedulers.io()是一个无数量上限的线程池,可以重用空闲的线程)?等问题
Rxjava已经升级到2.x,请看Demo
MVP和RxJava
现在一般使用RxJava+Retrofit+OkHttp组合做为应用框架的根基,但是为什么是他们呢?他们是如何组合在一起的?为什么和MVP是好基友?