前言
理解线程切换原理有什么意义?
1、可以清楚的知道这个线程切换操作影响到哪些代码的执行线程,不会影响到哪些代码的执行线程
2、灵活运用线程切换来实现复杂的应用场景
3、有利于在发生线程相关的问题时进行调试
不指定线程
在Observable中根据id去获取Drawable 对象,将获取到Drawable 对象发送给下游的Observer。
Observable.create(ObservableOnSubscribe<Drawable> { e->
run {
val drawable = theme.getDrawable(drawableRes)
e.onNext(drawable)
}
}).subscribe(Consumer { s-> run {
imageView!!.setImageDrawable(s)
}})
RxJava遵循的是线程不变换的原则,在那个线程调用了subscribe(),就在哪个线程生产事件e.next()
上面代码片段中没有涉及到线程切换,下面我们首先谈谈
subscribeOn() 和 observeOn()
Observable.create(ObservableOnSubscribe<Drawable> { e->
run {
val drawable = theme.getDrawable(drawableRes)
e.onNext(drawable)
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(Consumer { s-> run {
imageView!!.setImageDrawable(s)
}})
subscribeOn()------->指定上面的代码执行线程
即:代码片段:
Observable.create(ObservableOnSubscribe<Drawable> { e->
run {
val drawable = theme.getDrawable(drawableRes)
e.onNext(drawable)
}
});
observeOn()------->指定下面的代码执行线程
即:代码片段:
Observable.subscribe(Consumer { s-> run {
imageView!!.setImageDrawable(s)
}})
How?先一个一个来说:
1、subscribeOn(Schedulers.io())
在Schedulers有很多策略模式:比如 Schedulers.io()Schedulers.newThread(),Schedulers.computation()等等还有其他。
先看一下这块的关系:
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
追到:Schedulers类
IO = RxJavaPlugins.initIoScheduler(new IOTask());
走进IOTask:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
IOTask是一个实现Callable接口的静态类不类,这个看着比较眼熟:
static class WorkThread implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("WorkThread");
return "sucess";
}
}
一个有返回值的任务,启动方式给之前的线程有点不一样,与AsnyTask的用法完全一样,AnsyTask目前我们先不分析,后面会和大家一一讲到。
WorkThread workThread =new WorkThread();
FutureTask task =new FutureTask(workThread);
new Thread(task).start();
我们接着看:
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
从上述可以总结出,Scheduler 中策略模式对应的关系,如下所示:
Schedulers.newThread()代码和Schedulers.IO()区别不是不大,就不贴出来了,有意向的老铁可以去看这块的代码。
接着看Observable.subscribeOn(Schedulers.io())
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
}
传入ObservableSubscribeOn(自定义资源)对象前面文章有讲过
ObservableSubscribeOn类
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Scheduler类
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
上面代码片段中创建了Worker对象,看看到底干了什么,通过Work对象我们来到了NewThreadWorker类中
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
}
看到ScheduledExecutorService对象有什么感想,线程池 对的没错,从Scheduler对象中可以看出,NewThreadWorker ,DisposeTask拿到了线程池对象,创建DisposeTask对象,DisposeTask是一个线程,给线程添加了Disposable中断功能,通传进来的线程进行层层封装之后调用, w.schedule(task, delay, unit);方法提交到线程池让线程执行,所有在上面调用subscribeOn(Schedulers.io())将自定义资源执行的线程为子线程,中去对应的资源文件。最后补充完整这张图:
2、.observeOn(AndroidSchedulers.mainThread())
在observeOn()中传入AndroidSchedulers.mainThread(),给前面也是一样的AndroidSchedulers类
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
再看看ObservableObserveOn对象需要特别注意的是
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
将传递Observer进行包装得到ObserveOnObserver对象,
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
在ObserveOnObserver对象中有worker 对象,同样的Work是一个抽象类,派生出了HandlerWorker对象:HandlerWorker
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
HandlerWorker代码片段中出现了handler对象,调用 handler.sendMessageDelayed(message, unit.toMillis(delay));方法将线程池中执行完之后的数据进行发送,但是有个问题,只是看到数据在发送但是没有看到那个地方在接收数据,于是回想自定义资源中持有包装后的Observer对象,那我们这包装后的Observer对象就是ObserveOnObserver对象,我在ObserverOnObserver对象中找到接受的数据的方法:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
最后调用drainNormal()中 a.onNext(v);将拿到数据传递到下游,整个两次线程切换的大致流程图如图所示:
总结:
总的来说,RxJava的处理顺序像一条流水线,这不仅仅是代码写起来像一条链上,当你切换流水的流向(线程),整条链都改变了方向,并不会进行分流。
1、observeOn() 只是在收到 onNext() 等消息的时候改变了从下一个开始的操作符的线程运行环境。
2、subscribeOn() 线程切换是在 subscribe() 订阅的时候切换,他会切换他下面订阅的操作符的运行环境,因为订阅的过程是自下而上的,所以第一个出现的 subscribeOn() 操作符反而是最后一次运行的。
不得不说 Handler 在安卓中的地位真的是很牛逼。