RxJava通过责任链的方式,将各个 操作符
节点串连起来。当调用订阅subscribe
方法时,链上节点都会依赖订阅上一个节点。那线程切换是如何实现的?
subscribeOn
操作符是如何让上游节点工作在指定线程上?
observeOn
操作符是如何让下游节点工作在指定线程上?
subscribeOn
操作符关键代码:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
///scheduler 指定工作线程
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
.......
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
///scheduler.scheduleDirect 线程启动
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
........
}
///关键在SubscribeTask中的`run`方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
///在线程运行的代码块中进行了**订阅**
source.subscribe(parent);
}
}
subscribeOn分析
Observable.just("goods")
.subscribeOn(Schedulers.io())
.subscribe()
等价于
Schedulers.io().createWorker().schedule {
Observable.just("goods").subscribe()
}
observeOn分析
Observable.just("goods")
.observeOn(Schedulers.io())
.subscribe()
等价于
Observable.just("goods").subscribe {
Schedulers.io().createWorker().schedule {
///do somethind
}
}