经过上一节的基础使用,相信大家对RxJava就有一定的了解了,下边让我们继续深入的了解它。
RxJava流程图
在Rxjava中,Scheduler是调度器,相当于线程控制器。多分为以下五个场景场景:
查看源码得知:
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
- Schedulers.io(): I/O 操作(读写、网络信息交互…)所使用的 Scheduler。和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Observable.just("大宝来了").
subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("just", "onSubscribe ");
}
@Override
public void onNext(String s) {
Log.e("just", "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.e("just", "onError " + e.getMessage());
}
@Override
public void onComplete() {
Log.e("just", "onComplete ");
}
});
输出结果:
2019-06-18 21:44:36.220 6733-6733/com.click.guide.learningrxjava E/just: onSubscribe
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onNext 大宝来了
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onComplete
- Schedulers.newThread(): 开启新线程,并在新线程执行操作
Observable.just("大宝来了").
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("just", "onSubscribe ");
}
@Override
public void onNext(String s) {
Log.e("just", "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.e("just", "onError " + e.getMessage());
}
@Override
public void onComplete() {
Log.e("just", "onComplete ");
}
});
输出结果:
2019-06-18 21:44:36.220 6733-6733/com.click.guide.learningrxjava E/just: onSubscribe
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onNext 大宝来了
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onComplete
- Schedulers.single(): 线程单例,所有的任务都在一个线程中执行,当此线程中有任务在执行的时候其他任务将按照队列先进先出的顺序依次执行。
Observable.just("大宝来了").
subscribeOn(Schedulers.single()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("just", "onSubscribe ");
}
@Override
public void onNext(String s) {
Log.e("just", "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.e("just", "onError " + e.getMessage());
}
@Override
public void onComplete() {
Log.e("just", "onComplete ");
}
});
输出结果:
2019-06-18 21:44:36.220 6733-6733/com.click.guide.learningrxjava E/just: onSubscribe
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onNext 大宝来了
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onComplete
- Schedulers.computation(): 用于cpu密集型计算任务,不会被I/O等操作限制性能的耗时操作,例如XML,JSON文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU核数,不可以用于IO操作,因为IO操作的等待时间会浪费cpu。
Observable.just("大宝来了").
subscribeOn(Schedulers.computation()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("just", "onSubscribe ");
}
@Override
public void onNext(String s) {
Log.e("just", "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.e("just", "onError " + e.getMessage());
}
@Override
public void onComplete() {
Log.e("just", "onComplete ");
}
});
输出结果:
2019-06-18 21:44:36.220 6733-6733/com.click.guide.learningrxjava E/just: onSubscribe
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onNext 大宝来了
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onComplete
- Schedulers.trampoline(): 在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停下来,等插入进来的任务执行完成之后,再将未完成的任务接着执行。
Observable.just("大宝来了").
subscribeOn(Schedulers.trampoline()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("just", "onSubscribe ");
}
@Override
public void onNext(String s) {
Log.e("just", "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.e("just", "onError " + e.getMessage());
}
@Override
public void onComplete() {
Log.e("just", "onComplete ");
}
});
输出结果:
2019-06-18 21:44:36.220 6733-6733/com.click.guide.learningrxjava E/just: onSubscribe
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onNext 大宝来了
2019-06-18 21:44:36.242 6733-6733/com.click.guide.learningrxjava E/just: onComplete
- 另外, RXAndroid 提供了 AndroidSchedulers.mainThread(),作用在返回数据在 Android 主线程运行。
我们知道,RxJava 在切换线程时会用到了两个方法。
subscribeOn() :影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
observeOn():影响跟在后面的操作(指定观察者运行的线程)。如果想要多次改变线程,可以多次使用 observeOn。
今天就介绍到这里,这里边参考了很多牛人的博客,这里很感谢。
参考 调度器Schedulers
参考 详解 RxJava2 的线程切换原理
参考 Android:随笔——RxJava的线程切换