线程调度源码分析
1:subscribeOn
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
//只是在Observable和Observer之间增加了一句线程调度代码
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
subscribeOn(Schedulers.io())这句其实就是创建了一个Observable。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
然后传进了一个参数Schedulers,Schedulers类里有各种线程的Scheduler的具体实现
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
}
先看下ObservableSubscribeOn这个类的实现
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//下游订阅的时候。将Observer包装成SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//实现线程切换的代码,SubscribeTask是一个Runnable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;//下游Observer
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//此时调用上游的订阅方法。即在scheduler的线程里运行
source.subscribe(parent);
}
}
}
追一下scheduler.scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorker由自己的实现类具体实现
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
DisposeTask也是一个Runnable
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
//手动调用Runnable的run方法
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
看下IOScheduler 的worker 创建EventLoopWorker
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final ThreadWorker threadWorker;
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
最终调用到NewThreadWorker的scheduleActual
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//executor为根据RxThreadFactory创建的ScheduledExecutorService
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
到此经过几次Runnable的封装,最后其实就是在子线程里调用的runnable的run方法。
根据之前提到的,订阅开始后。反向调用Observable的subscribeActual方法,那么不管中间切换了几次subscribeOn线程,那么真正执行代码的线程就是第一次切换的那个线程。可以做个测试
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(MainActivity.class.getSimpleName(),Thread.currentThread().getName()+"--源subscribe---"+Thread.currentThread().getId());
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.computation()).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
Log.e(MainActivity.class.getSimpleName(),Thread.currentThread().getName()+"--map---"+Thread.currentThread().getId());
return Integer.parseInt(s);
}
}).subscribeOn(Schedulers.io()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e(MainActivity.class.getSimpleName(),Thread.currentThread().getName()+"--订阅subscribe---"+Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印如下
2:observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//同样创建Observable
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
看ObservableObserveOn的subscribeActual实现
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建线程worker
Scheduler.Worker w = scheduler.createWorker();
//调用上游数据源的订阅方法之后调用了ObserveOnObserver的onXXX
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver里的代码比较多以onNext为例
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//这里ObserveOnObserver实现了Runnable
worker.schedule(this);
}
}
最终也是调用的 executor.submit执行ObserveOnObserver里的run方法.
这里总结下因为observeOn的线程切换是在数据向下游传递的切换的,所以每次切换均生效。
同样将上面例子改为observeOn
bservable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(MainActivity.class.getSimpleName(),Thread.currentThread().getName()+"--源subscribe---"+Thread.currentThread().getId());
e.onNext("1");
e.onComplete();
}
}).observeOn(Schedulers.computation()).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
Log.e(MainActivity.class.getSimpleName(),Thread.currentThread().getName()+"--map---"+Thread.currentThread().getId());
return Integer.parseInt(s);
}
}).observeOn(Schedulers.io()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e(MainActivity.class.getSimpleName(),Thread.currentThread().getName()+"--订阅subscribe---"+Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
结果如下