RxJava订阅过程和线程切换原理
1 - Observable.just("hello world").subscribe(observer)
这是RxJava中的生产/消费模式中最简单的一种,就是生产发送“hello world"在用observer去监听消费数据,那么具体内部RxJava是如何实现的呢?
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 1.创建实际的Observable类ObservableJust,并把数据传进去
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
// subscribe操作
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 2.Observable的subscribeActual是abstract,实现在ObservableJust中
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
}
// Observable.just("Hello world"),创建的Observable实现类
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
// subscribe时具体实现逻辑
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
// 3.先调用observer.onSubscribe()方法
observer.onSubscribe(sd);
// 4.执行后续的调用
sd.run();
}
@Override
public T call() {
return value;
}
}
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
@Override
public void run() {
// 5.如果时START,设置为ON_NEXT
if (get() == START && compareAndSet(START, ON_NEXT)) {
// 6.调用observer.onNext()方法
observer.onNext(value);
if (get() == ON_NEXT) {
// 7.设置为ON_COMPLETE
lazySet(ON_COMPLETE);
// 8.调用observer.onComplete()方法
observer.onComplete();
}
}
}
}
上面代码的注释可以清晰的了解到整个Observable.just("Hello world").subscribe(observer)的调用过程,首先是在Observable.just("Hello world")中创建实际的Observable对象ObservableJust实例,然后在subscribe(observer)时,调用ObservableJust的subscribeActual方法,在subscribeActual中先调用obsever.onSubscribe(),再调用ScalarDisposable的run()方法,run()方法中处理了onNext()/onComplete()逻辑。
2 - Observable.just("hello world").subscribeOn(Schedulers.IO).subscribe(observer),subscribeOn(scheduler)的线程切换分析,数据生产方的线程切换
public abstract class Observable<T> implements ObservableSource<T> {
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 1.创建ObservableSubscribeOn,即把原始Observable转为ObservableSubscribeOn对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 2.把原始Observable保存为source
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 3.把原始observer转成另一个代理类,代理类中对onSubscribe和dispose做了处理
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 4.调用Observer.onSubscribe()方法
s.onSubscribe(parent);
// 5.此处是关键,创建SubscribeTask,它是个Runnable,给scheduler去调用,即进行线程切换,
// 在SubscribeTask中包含了orignalObservable.subscribe(orignalObserver)逻辑,
// 这样就使订阅逻辑执行在scheduler线程中。此处的scheduler.scheduleDirect()逻辑后面在分析,
// 只要理解为把一个Runnable放在执行线程执行即可。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
@Override
public void run() {
// 5.source即为原始Observable,parent为原始Observer的代理类(SubscribeOnObserver),
//即这里实际就是执行原始未切换线程逻辑下的Observable.subscribe(observer),
// 注意因为上面已经调用了originObserver.onSubscribe(parent);
// 所以在代理类onSubscribe方法中没有调用originObserver.onSubscribe()
source.subscribe(parent);
}
}
}
从上面代码注释可以了解到,RxJava subscribe(发送端) 切换线程subscribe(Schedulers.IO)逻辑会把原始Observable转为另一个订阅时线程切换的Observable(ObservableSubscribeOn),在ObservableSubscribeOn.subscribeActual()中把原始observer转为一个代理对象parent,调用originObserver.onSubscribe()方法,并把未切换线程的subscribe逻辑包装为Runnable,再把Runnable给Scheduler去调用执行,从而达到切换线程执行subscribe逻辑。
3 - Observable.just("hello world").observeOn(Schedulers.IO).subscribe(observer), observeOn(scheduler)的线程切换分析,数据消费方线程切换
public abstract class Observable<T> implements ObservableSource<T> {
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// 1.创建ObservableObserveOn,即把原始Observable转为ObservableObserveOn对象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 2.创建ObserveOnObserver,即把原始observer转为ObserveOnObserver对象,
// ObserveOnObserver中会把observer相关的回调通过worker切换到指定线程去调用
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 3.切换线程调用
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// 4.切换到worker线程调用,里面具体调用哪个方法执行逻辑较多,总体来说到这里就达到了observer回调切换线程的目的
worker.schedule(this);
}
}
}
}
从上面代码注释可以了解到,RxJava observer(接收端) 切换线程observerOn(Schedulers.IO)逻辑会把原始Observable转为另一个订阅时线程切换的Observable(ObservableObserveOn),在ObservableObserveOn.subscribeActual()中把原始observer转为一个代理对象ObserveOnObserver,在ObserveOnObserver中对observer相关的回调做线程切换处理,从而达到observer回调切换线程的目的。
4 - 生产端和消费端都切换线程的分析, Observable.just("hello world").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer)
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 1.创建实际的Observable类ObservableJust,并把数据传进去
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 2.创建ObservableSubscribeOn,即把ObservableJust转为ObservableSubscribeOn对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// 3.创建ObservableObserveOn,即把ObservableSubscribeOn转为ObservableObserveOn对象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
}
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
@Override
protected void subscribeActual(Observer<? super T> s) {
// 4.这是ObservableJust执行subscribe的逻辑。记:TAG-4
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
}
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
public void run() {
// 5.ObservableJust发送端逻辑,这里会调线程切换转化后的Observer,记后面的TAG-10位置
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 6.把原始Observable保存为source
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 注意此处的s不是原始的observer,是在ObservableObserveOn中转化过的observer,可以见下面分析
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//7.把observable.subscribe(observer)包装进Runnable中,并给scheduler去执行,留意此处在哪调用的,后面会分析。记:TAG-7,这里会调用到第六步TAG-8
//注意注意注意!!!:这里达到发送端切换线程目的
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
@Override
public void run() {
// 8.在io线程中执行observable.subscribe(observer),注意此处的source是ObservableJust,它会调用TAG-4。记:TAG-8
source.subscribe(parent);
}
}
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 9.创建ObserveOnObserver,即把原始observer转为ObserveOnObserver对象,并调用到上面第7步的流程,即TAG-7的地方
// 注意注意注意!!!:ObserveOnObserver会把原始observer的回调放在指定线程去回调,达到接收端切换线程目的
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 10.observer切换线程的地方。记TAG-10
schedule();
}
}
}
从上面代码注释可以了解到,其实RxJava对于发送端和接收端切换线程的逻辑就是把对应的Observable转为另一个Observable,且对于observeOn(scheduler)会把observer转为切换线程调用的oberver,当subscribe时,就会从转化后的Observable一级一级调用到原始的Observable方法,当然中间做了subscribe的切换线程操作,在原始Observable上再调用转化后的可线程切换的Observer的回调,在切换线程Observer中对原始Observer回调进行线程切换后调用。
RxJava源码分析系列文章主题目录:
- 1. RxJava源码分析-----初始篇
- 2. RxJava源码分析之 --- 订阅过程和线程切换
- RxJava源码分析之 --- 操作符
- RxJava源码分析之 --- Backpressure
- RxJava源码分析之 --- hook