当知道了RxJava是如何创建被观察者,接下来就是更重要的一步了,Rxjava是如何订阅的呢?
还是先看下使用过程:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("我是RxJava");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Rxjava被订阅");
}
@Override
public void onNext(String s) {
System.out.println("Observer接收消息: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("Observer出现错误: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Rxjava 完成");
}
});
当我们点进去看subscribe方法的代码:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
// 还是一样的判断是否为空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//和创建时是一样的,为我们提供hook点
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 需要重点关注
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
当点进去查看时,发现这个方法是一个抽象类的抽象方法:
protected abstract void subscribeActual(Observer<? super T> observer);
还记得我们创建时返回的是什么吗?
ObservableCreate,查看代码也知道ObservableCreate继承了Observable自然也实现了subscribeActual方法,这就知道了需要查看的代码:ObservableCreate#subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
我们先看下observer.onSubscribe(parent):在这里就调用了我们new出来匿名内部类的onSubscribe()方法;
至于CreateEmitter,就是将我们的观察者又包装了一层
看下CreateEmitter的继承的类型
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
}
创建出来了发射器,这还没有完,继续回到subscribeActual():
Snipaste_2020-06-01_22-20-35.png
可以看出把创建出来的发射器交给了我们的被观察者
直到这我们才完成了订阅过程,看看订阅过程的时序图:
RxJava订阅.png
仅仅这样是不行的还有一个RxJava中最重要的东西,就是线程的切换