RxJava2线程切换简单用例:
通过subscribeOn()方法指定被观察者的工作线程,以及observeOn()指定观察者的工作线程。
Observable.create(ObservableOnSubscribe<Int> { e ->
Log.i("RxJava", "subscribe运行在" + Thread.currentThread())
e.onNext(0)
e.onComplete()
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
Log.i("RxJava", "onSubscribe运行在" + Thread.currentThread())
}
override fun onNext(integer: Int) {
Log.i("RxJava", "onNext运行在" + Thread.currentThread())
}
override fun onError(e: Throwable) {}
override fun onComplete() {
Log.i("RxJava", "onComplete运行在" + Thread.currentThread())
}
})
}
运行结果:Observable#subscribeOn
指定了被观察者Observable的工作线程,需要的参数类型为Scheduler
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//传递Observable自身和scheduler作为参数,创建ObservableSubscribeOn对象并返回
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
- 在上一篇源码分析一中可以知道,Observable#subscribeOn最后会返回ObservableSubscribeOn的实例。
- 在用例当中,调用了Schedulers.io()作为Scheduler参数,传递给Observable#subscribeOn方法。
Schedulers.io()
//Schedulers.java
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
Schedulers.io()拿到的是IoScheduler的对象,RxJava的套路就是调用A类的B方法,最后返回一个BA对象。需要了解IoScheduler是怎么得到的,可以去查询一下源码,这里比较简单,不展开分析了。
IoScheduler
public final class IoScheduler extends Scheduler {
//RxThreadFactory继承了ThreadFactory接口
static final RxThreadFactory WORKER_THREAD_FACTORY;
static {
...
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
...
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;//线程池
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
}
1.IoScheduler被创建时调用了无参构造方法,内部再调用了带有ThreadFactory的重载构造方法。
2.WORKER_THREAD_FACTORY变量在静态代码块中被初始化,类型为RxThreadFactory。
再回到Observable#subscribeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//创建ObservableSubscribeOn对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
/**
* ObservableSubscribeOn对象内部保存了传递进来的Scheduler对象。
* ObservableSubscribeOn是Observable的子类。
**/
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}
所以Observable调用subscribeOn(Scheduler scheduler)方法后,会创建ObservableSubscribeOn的对象,并且把上一步传递进来的ObservableCreate对象和IOScheduler 对象保存起来。
Observable#observeOn
这个方法指定了Observer的工作线程,需要的参数类型为Scheduler
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
在用例当中,调用了AndroidSchedulers.mainThread()作为Scheduler参数,传递给Observable#subscribeOn方法。
AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
AndroidSchedulers.mainThread()方法最后得到的是HandlerScheduler对象(按照RxJava套路,这里应该返回一个叫做MainThreadScheduler对象,但是这里居然不按套路来)。
再回到Observable#observeOn(Scheduler scheduler)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
//创建一个ObservableObserveOn对象并把scheduler保存起来
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
}
observeOn方法和subscribeOn方法基本也是一样的逻辑,创建ObservableObserveOn对象,并且把上一步传递过来的ObservableSubscribeOn对象和HandlerScheduler对象保存起来。
Observable#subscribe()
Observable对象实例为ObservableObserveOn,而根据RxJava的Observable#subscribe(observer)方法的套路,所以会调用ObservableObserveOn的subscribeActual方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//scheduler为HandlerScheduler的类型
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//用ObserveOnObserver订阅上游数据源。这样当数据从上游发送下来,会由ObserveOnObserver对应的onXXX()处理
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
source对象是在ObservableObserveOn创建时传递进来的,实例为ObservableSubscribeOn对象。所以会调用ObservableSubscribeOn的subscribeActual()方法。
ObservableSubscribeOn#subscribeActual()
@Override
public void subscribeActual(final Observer<? super T> s) {
//创建SubscribeOnObserver实例
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//s为ObserveOnObserver对象,回调ObserveOnObserver的onSubscribe方法
//最后回调observer的onSubscribe
s.onSubscribe(parent);
//SubscribeTask继承了Runnable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//SubscribeTask中的source为ObservableCreate对象
//run方法的执行会回调ObservableOnSubscribe的subscribe方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
- 把ObserveOnObserver对象传递给SubscribeOnObserver的构造方法,创建SubscribeOnObserver对象。
- 回调ObserveOnObserver的onSubscribe()方法,最终回调到observer的onSubscribe()方法。
- 创建SubscribeTask对象,调用scheduler对象的scheduleDirect()方法,然后把返回值传递给SubscribeOnObserver对象。
Scheduler#scheduleDirect(Runnable)
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorker方法在Scheduler类中为抽象方法,Scheduler的实例为IoScheduler
//调用IoScheduler的createWorker()方法, 返回的实例为EventLoopWorker
final Worker w = createWorker();
//decoratedRun就是run对象
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//把Worker和Runnable传递给DisposeTask,创建DisposeTask的实例
DisposeTask task = new DisposeTask(decoratedRun, w);
//调用EventLoopWorker的schedule()方法
w.schedule(task, delay, unit);
return task;
}
//IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
//IoScheduler$EventLoopWorker.java
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
- scheduleDirect方法首先调用createWorker方法,createWorker方法在Scheduler类中为抽象方法,而在IOScheduler中createWorker()方法返回EventLoopWorker对象。
- 把Worker和Runnable传递给DisposeTask,创建DisposeTask的实例,DisposeTask继承了Runnable和Dispose接口。
- 调用EventLoopWorker的schedule()方法,然后会调用到IoScheduler$ThreadWorker的schedule()方法,ThreadWorker继承自NewThreadWorker类,进入NewThreadWorker的scheduleActual()方法。
NewThreadWorker#scheduleActual()
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//把Runnable封装到ScheduledRunnable对象中
//ScheduledRunnable继承了Runnable和Callable和Disposable接口
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
...
//把sr提交给线程池executor执行
//sr封装了ObservableSubscribeOn的内部类SubscribeTask对象
//最后会执行SubscribeTask对象的run方法
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
...
}
return sr;
}
//SchedulerPoolFactory#create()
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
- NewThreadWorker的scheduleActual方法中的executor对象是通过Executors.newScheduledThreadPool()方法创建出来的定长线程池。
- 通过线程池executor执行Runnable包装后的Callable对象。
- Runnable对象是ObservableSubscribeOn的subscribeActual方法中创建的SubscribeTask对象。
- 所以Observable被观察者的工作线程是Executors来启动的,回调到SubscribeTask的run方法中。
SubscribeTask#run()
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//source为ObservableCreate
//parent为SubscribeOnObserver
source.subscribe(parent);
}
}
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.i("RxJava: subscribe运行在", "" + Thread.currentThread());
e.onNext(0);
}
})
- run方法这里的工作线程为子线程。
- 在ObservableSubscribeOn对象中,source为ObservableCreate对象,source.subscribe(parent)的执行会回调最终会使用例子中的ObservableOnSubscribe#subscribe()。
- ObservableOnSubscribe#subscribe()的执行会调用e.onNext()方法, e的实例对象为ObservableSubscribeOn的内部类SubscribeOnObserver对象,SubscribeOnObserver封装了ObservableObserveOn对象。
- 最后会调用ObservableObserveOn子类的ObserveOnObserver的onNext()
ObservableObserveOn$ObserveOnObserver#onNext()
@Override
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
//把被观察者发送过来的数据设置到queue队列中
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//Scheduler.java
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
- ObserveOnObserver的onNext方法中,先把被观察者发送过来的数据设置到queue队列中。
- 执行schedule(),调用HandlerScheduler子类HandlerWorker的schedule()方法。
HandlerScheduler$HandlerWorker#schedule()
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
//run对象为ObservableObserveOn.ObserveOnObserver
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
- 变量run为ObservableObserveOn的子类ObserveOnObserver对象。
- handler为主线程的handler,所以run对象的run方法会执行在主线程中。
- HandlerWorker的schedule就是通过主线程的handler发送message,实现线程的切换。
ObservableObserveOn$ObserveOnObserver#run()
@Override
public void run() {
...
drainNormal();
...
}
void drainNormal() {
...
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
...
try {
//不断从队列q中取出数据
v = q.poll();
} catch (Throwable ex) {
...
}
...
回调使用例子中的observer的onNext方法。
a.onNext(v);
}
...
}
}
1.ObserveOnObserver的run方法运行在主线程中,先调用drainNormal方法。
- drainNormal方法中,先从队列q中取出上游事件发送下来的数据,然后调用变量a的onNext方法,变量a是使用例子中的observer变量。
总结:
1、Schedulers.io()会得到IOScheduler对象。
2、subscribeOn(Schedulers.io())方法会通过IOScheduler创建EventLoopWorker对象,在通过EventLoopWorker对象来获取ThreadWorker对象,然后调用ThreadWorker对象的schedule()方法。
3、ThreadWorker的构造方法中会同时创建executor变量,executor为定长线程池,schedule()中会通过executor线程池来执行observable的subscribe()方法,工作线程为子线程。
4、AndroidSchedulers.mainThread()会得到HandlerScheduler对象,同时获取主线程的handler对象。
5.、事件流下游的ObservableObserveOn子类ObserveOnObserver获取到上游observable发送的数据后,会先放入到数据队列中。
6、observeOn(AndroidSchedulers.mainThread())方法会通过HandlerScheduler来创建HandlerWorker对象,然后调用HandlerWorker的schedule(Runnable)方法。
7、HandlerWorker的构造方法中会获取到HandlerScheduler传递的handler对象,然后HandlerWorker的schedule(Runnable)方法,会通过主线程的handler来包装ObserveOnObserver成为message并发送给主线程的消息队列,通过此操作来把观察者的工作线程切换到主线程。
8、ObservableObserveOn子类ObserveOnObserver的run方法就会运行在主线程,然后取出数据队列q中的数据,调用observer的onNext()方法完成整个事件流的发送和线程的切换。