前言
通过前面一篇文章,我们应该能大志理解RxJava的原理了吧。
既然RxJava被称为强大的异步处理库,对线程的控制也就不可或缺,今天我们就来谈谈RxJava的线程调度。
RxJava内置线程选项
RxJava中已经内置了很多线程选项供我们选择,其中下面四个已经能满足我们日常的开发。
在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高。
线程 | 含义及作用 |
---|---|
Schedulers.io() | 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作 |
Schedulers.computation() | 代表CPU计算密集型的操作, 例如需要大量计算的操作 |
Schedulers.newThread() | 代表一个常规的新线程 |
AndroidSchedulers.mainThread() | 代表Android的主线程 |
RxJava线程调度解析
我们第一篇文章所用到的例子都没有用到涉及到对线程的处理,所以上游和下游的工作都是在同一个线程中的。
当我们在主线程中去创建一个上游Observable来发送事件, 则这个上游默认就在主线程发送事件。
当我们在主线程去创建一个下游Observer来接收事件, 则这个下游默认就在主线程中接收事件。
我们来实践一下看看是不是跟我们说的一样:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String s = "1";
Log.d(TAG, "The observable thread is: " + Thread.currentThread().getName());
Log.d(TAG, s + " send");
e.onNext(s);
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "The observer thread is: " + Thread.currentThread().getName());
Log.d(TAG, s + " receive");
}
};
observable.subscribe(consumer);
输出结果:可以看出:
上游Observable指定了3次线程,可是线程保留在第一次指定的newThread中;
下游Observer同样指定了3次线程,每次指定都进行了线程切换,最后以主线程main为指定线程。
实战
说了这么多,栗子呢!!!
我们来模仿一下从通过url从服务器获取图片(子线程耗时操作),再设置到imageView上(主线程更新布局)的例子。
Observable.create(new ObservableOnSubscribe<Drawable>() {
@Override
public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
try {
Drawable drawable = getDrawable(url); // 通过url从服务器去获取drawable图片
if (!e.isDisposed()) {
if (drawable != null) {
e.onNext(drawable);
}
e.onComplete();
}
} catch (Exception e1) {
if (!e.isDisposed()) {
e.onError(e1);
}
}
}
}).subscribeOn(Schedulers.io()) // 子线程做耗时操作获取图片
.observeOn(AndroidSchedulers.mainThread()) // 主线程更新UI
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable drawable) throws Exception {
mImageView.setImageDrawable(drawable); // 给imageView设置图片
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
DebugUtil.exception(throwable);
}
});
这就是一个Rxjava工作中线程切换的例子啦。
在这里有一个地方要解释一下,我们在调用observable的发送事件前,都加了一个isDisposed()的判断,这其实跟我们上一篇文章提到的Disposable对象有关,我们需要在Activity退出的时候调用dispose()方法,使得下游收不到事件,这样就不会去更新UI了,不然就会出现APP Crash的问题了。