RxJava2 出来好一阵子了,现在只是基本会用,但是还不知道人家源码怎么写的,这几天断断续续的看大神的源码,又自己思考了下,所以留下这篇文章帮助大家理解RxJava2源码
最简单的示例代码如下:
Observable.create(
//===============这里是第一部分A===============
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("你好世界");
e.onComplete();
}
}
//===============这里是第一部分A-结束===============
).subscribe(
//===============这里是第二部分B===============
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
//===============这里是第二部分B===============
);
个人觉得为了好理解源码,我把以上代码分为了A和B二个重要的部分,分别是A整体,B整体。
当我们点击Observable.create()方法查看源码:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
我们看到传入了ObservableOnSubscribe这么个东东,也就是传入我们A部分的代码,那这个是啥?点击ObservableOnSubscribe查看
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
一看接口来着,里面还有一个ObservableEmitter,继续点击看看
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}
又看到Emitter继续点击
public interface Emitter<T> {
void onNext(T value);
void Error(Throwable error);
void onComplete();
}
一看这个就有点熟悉了,一看不就是我们A那部分的里面的么?
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("你好世界");
e.onComplete();
}
}
所以我就猜想就是ObservableEmitter<String> e 接口调用了接口的onNext,onComplete()
现在我们回到Observable.create()方法里面来,我们点击ObservableCreate<T>
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
他传入了一个source也就是我们的A,他传递A干嘛?
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//传递A进来赋给了ObservableOnSubscribe<T> source
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//下面这部分暂时先不看,后面马上会讲到
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
我们一看他又继承了Observable<T>,Observable<T>就是我们点击进来的那个类,现在我们可以查看subscribe(B),我下面的A,B已经分别代表了第一部分,第二部分
Observable.create(A).subscribe(B)
此时传递的是B,B是什么,B代表的是我们开头定义的那一大堆代码,进入subscribe(B)源码
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//关键部分代码
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
................
}
我们看到它调用了 subscribeActual(observer);这个方法,这个方式是一个抽象类,所以它必须实现,他的实现在哪?它就在刚刚我叫你们暂时先不看的,它的实现到那里去了,因为ObservableCreate<T>继承了Observable<T>,那我们回到那个方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//将B的实现类传递给CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//此时observer调用B里面的onSubscribe,所以CreateEmitter这个玩意也是Disposable
observer.onSubscribe(parent);
try {
//这里有个soruce,source是什么?不就是我们的A么,
//调用A里面的subscribe并且传递parent,此时是不是我们会执行e.onNext("你好世界")
//这个东东,所以这一切都连接起来了
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//实现了ObservableEmitter这个接口,ObservableEmitter是什么?
//就是onNext(T t) onError(Throwable t) onComplete() ,
//别忘了当时我们创建Observable.create(A),A里面就含有ObservableEmitter<String>
/**
*new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("你好世界");
e.onComplete();
}
}
*
**/
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
//将B里面实现赋值给observer
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t); //赋值后的 observer,调用B 里面的onNext(t)方法
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
//...........很多代码省略
public interface Observer<T> {
void onSubscribe(Disposable d);//这里就是上面代码 observer.onSubscribe(parent);调用过的地方
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
参考文章:http://blog.csdn.net/zxt0601/article/details/61614799
本人也是初学者,嘻嘻,欢迎相互交流