前言:学习了这么多天的RxJava系列文章,虽然会用了,但是确不懂的具体是怎么回事,所以说会用的话还是不行,要去了解下其内部的源码,原理过程,这样对自己水平才能提高。以下带领大家进入RxJava源码分析。
1.实列分析
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到的整数是" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
打印结果:
04-27 01:38:32.713 14274-14274/? D/MainTwoActivity: 开始采用subscribe连接
04-27 01:38:32.713 14274-14274/? D/MainTwoActivity: 接收到的整数是1
04-27 01:38:32.713 14274-14274/? D/MainTwoActivity: 接收到的整数是2
04-27 01:38:32.713 14274-14274/? D/MainTwoActivity: 接收到的整数是3
04-27 01:38:32.713 14274-14274/? D/MainTwoActivity: 对Complete事件作出响应
看以上打印结果,我们发现Observer接口中的onSubscribe()方法是最先调用的,其次才收到onNext()事件,最后是onComplete()事件.
1.我们先从源码Observable.create点进去看:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
先抛开其他代码吧,我们发现这边new ObservableCreate<T>(source) new了一个ObservableCreate对象把我们刚才外部传进来的接口传入进去了,接着我们在一起来看ObservableCreate这个类:
//我截取了少部分代码用于讲解
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
我们发现这个时候只是把传进来的ObservableOnSubscribe<T> source存起来了,并且这个类是继承extends Observable<T>是Observable的子类,这个create()事件,到这里暂时没什么事情做了,接着我们继续往下看subscribe这个方法,点进去看源码:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
以上方法传了一个观察者(Observer)进来,接下来我们主要看方法中的 subscribeActual(observer)这个代码,点击进去发现这个方法是一个抽象方法:
protected abstract void subscribeActual(Observer<? super T> observer);
有抽象方法,所以我们就应该想到肯定是Observable的子类去实现这个抽象方法的,刚才我们第一步看Observable.create()源码的时候不是返回了一个ObservableCreate类吗,我们前面提到了,ObservableCreate这个类是Observable的子类,所以我们直接去看ObservableCreate中的subscribeActual实现方法:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
看上面subscribeActual方法中的实现:首先构造类一个CreateEmitter类把观察者(Observer)传进去,这个类就是我们demo中的ObservableOnSubscribe接口方法subscribe中的ObservableEmitter<Integer> e参数,接着调用传进去来的observer的onSubscribe(parent)方法,所以观察者的onSubscribe方法是最先调用的,这个时候其实真正的事件还没开始发送呢这个方法就先调用了,这个方法把CreateEmitter对象传出去了,这个对象具体有什么作用呢,接下来我们一起看看这个类的源码:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
我们发现这个类有观察者的引用,然后还有onNext(),onError(),onComplete(),setDisposable等方法,对了方法其实就是为了调用观察者(Observer)的几个回调方法,你们发现没没个方法中都 if (!isDisposed())这个判断其实这个就是可以取消这个事件发送,比如说我发了一个事件,后面不突然不想发了,那么我就可以通过setDisposable这个方法来取消事件的发送。
接下来我们继续看subscribeActual方法中的source.subscribe(parent)代码,source是什么,就是上层传进来的那个对象ObservableOnSubscribe,调用这个对象的subscribe方法,所以这个时候才是走到了demo中的subscribe这个 回调方法,可以看到我们在demo中的这个回调方法中写了:
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete()
这个几个方法正是CreateEmitter类中的几个方法,所以等这几个方法发出之后,CreateEmitter中的对应方法酒会调用Observer的方法回调出去,因此就像我们demo中打印的结果一样。
至此,基本订阅流程我们就理清楚了。我们从Observable#subscribe(Observer) 开始,将 Observer 传给 Observable,而 Observable 又会在 onNext(T) 方法中激活 Observer 的 onNext(T) 方法。我们在示例只涉及了少量的 Observable/Observer,事实上,我们在 RxJava 中运用的操作符都会在内部创建一个 Observable 和 Observer,虽然Observable#subscribeActual(Observer) 中都有自己特定的实现,但是它们大部分都是做两个操作,一是将「下游」传来的 Observer 根据需求进行封装;二就是让「上游」的 Observable subscribe() 该 Observer。以上情况都是只有一个Observable,如果存在很多Observable的情况呢又是怎样的情况呢?
2.多个Observable的情况:
多个Observable的情况其实经过我们看前面的分析我们大概就知道: RxJava2.x 的基本订阅流程是从 Observable#subscribe(Observer) 开始的,而该方法会触发「上游」 Observable 的 Observable#subscribeActual(Observer) 方法,而在该「上游」 Observable 中又会触发「上游的上游」Observable 的 Observable#subscribeActual(Observer) 方法。我们不妨用以下述源码举例:
//第一个Observable
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
//第二个Observable
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s);
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
我们前面已经看了Observable.create的源码了。接下来我们看看flaMap的源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
@SuppressWarnings("unchecked")
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
接着我们点击new ObservableFlatMap类:
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
发现flaMap也是是Observable子类,重写了subscribeActual方法,该方法中也是会反问上游的subscribe方法,因此,我们讨论出,Observable#subscribe(Observer) 会调用 Observable#subscribeActual(Observer) ,该方法是一个抽象方法,由子类覆写,所以展现了 Observable 的多态性,而且如何激活上游 Observable 的 subscribe(Observer)/subscribeActual(Observer) 方法的关键点也在此。实现方式就在于 Observable#subscribeActual(Observer) 方法虽然是一个抽象方法,但是它的子类实现中都包含有一句 source.subscribe(Observer),其中 source 就是上游 Observable(实际上是 ObservableSource,但是我们此处不妨就理解成 Observable,毕竟我们对这个对象更熟悉一些,Observable 是 ObservableSource 接口的实现),所以就可以理解在每一个 Observable 的 subscribeActual(Observer) 方法中它都会调用上游的 subscribe(Observer)/subscribeActual(Observer) 方法,直至到达第一个 Observable 的 subscribe(Observer)/subscribeActual(Observer) 中。所以不管有几个Observable,原理流程都是一样的。