前言
RxJava系列文章
RxJava系列文章(一) - 网络图片加载水印一般写法
RxJava系列文章(二) - 网络图片添加水印RxJava写法
RxJava系列文章(三) - 线程调度切换源码分析
RxJava系列文章(四) - 普通观察者与RxJava的观察者
RxJava系列文章(五)- CopyOnWriteArrayList与ConcurrentLinkedQueue
RxJava系列文章(六)- new Handler()与new Handler(Looper.getMainLooper())区别
RxJava系列文章(七) - 你是否了解RxJava
RxJava系列文章(八) - RxPermission
1. 概述
这篇文章主要记录下 RxJava中线程调度切换源码分析:
.subscribeOn(Schedulers.io());
.observeOn(AndroidSchedulers.mainThread());
其实所有第三方框架都有共性,都是基于最原始的框架的封装,其本质是不会变的,比如:
1>:ButterKnife - findViewById;
2>:OkHttp -> socket + okio;
3>:RxJava -> 线程池 + handler;
2. 解析子线程切换:subscribeOn(Schedulers.io())
这句代码意思就是:它上边所有操作都是在子线程中执行的
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
scheduler.scheduleDirect()调用了scheduler.io() ,io其实就是:
IO = RxJavaPlugins.initIoScheduler(new IOTask());
-> DEFAULT = new IoScheduler();-> 创建一个线程池的封装对象
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// createWorker() -> IOScheduler 的 createWorker() -> EventLoopWorker
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
// w.schedule -> EventLoopWorker的 schedule
w.schedule(task, delay, unit);
return task;
}
execute 指的是 schedule;
submit 指的是 schedule;
对上游处理是包裹
3. 解析主线程切换:observeOn(AndroidSchedulers.mainThread())
// MainThreadScheduler 策略
Scheduler.Worker w = scheduler.createWorker();
// 调用上游的 subscribe(),对下游的 observer 进行代理包裹 ObserveOnObserver
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
对下游的处理:worker.schedule(this);
schedule()方法如下:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 这里只是把消息发送出去,但是消息并没有执行
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}