RxJava源码分析(1)
Rxjava相信大家都不陌生,是现在很流行的一种解决异步通信的框架,分析源码,不会对RxJava2的源码逐字逐句的阅读,只寻找关键处,我们平时接触得到的那些代码进行分析。
分析的源码版本为:2.0.1
我们的目的:
- 知道源头(Observable)是如何将数据发送出去的。
- 知道终点(Observer)是如何接收到数据的。
- 何时将源头和终点关联起来的
- 知道线程调度是怎么实现的
- 知道操作符是怎么实现的
本文先达到目的1 ,2 ,3。
我个人认为主要还是适配器模式的体现,我们接触的就只有Observable和Observer,其实内部有大量的中间对象在适配:将它们两联系起来,加入一些额外功能,例如考虑dispose和hook等。
首先看最基础的应用
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG,"currentThread="+Thread.currentThread().getName());
e.onNext("11");
e.onComplete();
e.onError(new Throwable("cuowu"));
}
});
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"Disposable="+d.isDisposed());
}
@Override
public void onNext(String value) {
Log.d(TAG,"currentThread="+Thread.currentThread().getName());
Log.e(TAG,"onNext:"+value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError:"+e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
这是我们最普通的使用方式。
再根据使用方法分析源码
从create方法开始分析,首先看里面的参数ObservableOnSubscribe:
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
可以看到它里面就一个方法subscribe,这个方法正是我们重写的那个方法。再看它里面的参数ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}
这几个方法和dispose有关,我们可以暂时不予考虑,接着看看它的父类Emitter
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}
终于看到我们熟悉的方法了,这就是我们在示例中调用到的几个方法。参数分析完毕,就该来看看具体的流程了
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判null方法我们可以忽略
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins.onAssembly()是一个hook方法,暂时先不去考虑它,也就是说我们需要关注的就是new ObservableCreate<T>(source)这个方法,ObservableCreate是一个继承自Observable的类,它的构造方法如下,可以看到,将传入的参数ObservableOnSubscribe保存起来。
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//在这里就是将ObservableOnSubscribe保存起来
this.source = source;
}
至此,整个创建流程结束。再分析订阅的流程:
public final void subscribe(Observer<? super T> observer) {
//判null操作
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//hook方法
observer = RxJavaPlugins.onSubscribe(this, observer);
//判null操作
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//真正执行订阅的方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
从名字可以看出来,真正订阅的地方,但是这里我们需要判断Observable是哪一个了,还记得我们创建返回的是哪个Observable么?就是这个new ObservableCreate<T>(source),下面就该看它的subscribeActual方法了。
protected void subscribeActual(Observer<? super T> observer) {
//在这里创建的CreateEmitter实现了Disposable接口,暂时可以看成是一个Disposable,且通过传入的observer将
//observer保存起来
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//首先调用onSubscribe方法,传入Disposable
observer.onSubscribe(parent);
try {
//ObservableOnSubscribe这个源头和下游真正产生联系的方法,这时候源头才开始发送数据
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
source.subscribe(parent)我们知道我们重写了一下3个方法:
e.onNext("11");
e.onComplete();
e.onError(new Throwable("cuowu"));
这里的e就是parent即CreateEmitter,分别看它的onXXX()方法:
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//在这里判断是否Disposed,当调用了onError和onComplete会设为false,所以执行完这两个方法中任何一个后,源
//头将与下游断开连接,所以onError和onComplete是互斥的。
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
//通过这个可以知道,当先调用了onComplete方法再调用onError方法就会报错。
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
到这里一个最简单的流程结束。
总结
- 在subscribeActual()方法中,源头和终点关联起来。
- source.subscribe(parent);这句代码执行时,才开始从发送ObservableOnSubscribe中利用ObservableEmitter发送数据给Observer。即数据是从源头push给终点的。
- CreateEmitter 中,只有Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
- Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。根据上一点,验证此结论。
- 先error后complete,complete不显示。 反之会crash
在这里结束了一个最基本使用流程的源码分析,接下来,我们将分析一下常用的操作符以及线程转换。