Rxjava 框架结构
RxJava 的整体结构是一条链,其中:
- 链的上游:生产者 Observable
- 链的最下游:观察者 Observer
- 链的中间:各个中介节点,既是下游的 Observable,又是上游的 Observer
我们从一段最简单的 RxJava 代码来展开,即 Single。
首先说明一下:本片博客所看Rxjava2的源码版本为 [rxjava:2.2.9]
Single 分析
Single.just("1")
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onSuccess(String s) {
Log.e(TAG, "onSuccess: s=" + s);
}
@Override
public void onError(Throwable e) {
String errMsg = e.getMessage();
if (TextUtils.isEmpty(errMsg)) {
errMsg = e.getClass().getName();
}
Log.e(TAG, "onError: e=" + errMsg);
}
});
RxJava 这种连着调用的写法是可以拆分的,那我们简单的拆分下,方便看:
Single<String> stringSingle = Single.just("1"); // 创建一个新的上游 Single 对象
SingleObserver<String> observer = new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onSuccess(String s) {
Log.e(TAG, "onSuccess: s=" + s);
}
@Override
public void onError(Throwable e) {
......
Log.e(TAG, "onError: e=" + errMsg);
}
};
stringSingle
.subscribe(observer);
看下 just()方法做了什么事情
// Single.java#just() 源码
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "item is null");// 验空,不为空返回自己,为空抛异常
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
// 这个方法就是判空处理,如果 object 为空,抛异常程序终止;否则返回 object 自己
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
可以看到 just()就两行代码,一个验空方法和一个钩子方法,下面简单瞄一眼 onAssembly(),默认情况下不用关注它,不影响主流程:
// RxJavaPlugins.java # onAssembly()
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
// 这是一个钩子方法,默认情况下,就是我们什么也不做的话,默认 onSingleAssembly == null
// 即 我们传进来的什么,就返回什么
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
那么就剩下一个了,就是 RxJavaPlugins.onAssembly(new SingleJust<T>(item))
方法的参数,创建的一个新的对象
// SingleJust.java
public final class SingleJust<T> extends Single<T> {
final T value;
// 这个 T 就是上游生产的事件,这里保存下来了
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
// 直接调用 onSubscribe()和 onSuccess() 方法
// 因为 Single.just(T) 方法时没有延时,没有后续事件的上游被观察者,不会执行失败,一旦开始订阅(subscribe())就会立马执行完毕
// 注意这里是不会回调 onError()方法的,因为 Single过于简单,根本不会失败
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
SingleJust.java
实际上就把上游的值,给保存了下来,然后有一个 subscribeActual()
方法,执行了方法参数 observer 的 onSubscribe()和 onSuccess()方法。
嗯,到目前 Single.just("1")
这句代码就干了两件事,new 了一个 SingleJust 对象,并且把字符串 "1"
给存下来了。
继续往下看订阅代码 stringSingle .subscribe(observer);
subscribe 源码
// Single.java#subscribe()
// 实际上这里进了 SingleJust 里边去了,just()方法创建的 SingleJust 对象
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); // 判空
observer = RxJavaPlugins.onSubscribe(this, observer); // 下钩子,默认返回自己
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer); // 关键代码在这里,调用了实际的 subscribe 方法,把下游的 observer 传了进去
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
Single.just("1").subscribe() 这里就调用了上一步创建的 SingleJust 对象的 subscribeActual(observer)方法,就把上游生产的事件成功发送到了下游的 observer 了
subscribe()方法代码不多,并且有两个眼熟的判空代码,和一个钩子方法,都不用管,重点就是 subscribeActual(observer);
这一句,实际上是执行到了上一步我们创建的SingleJust
对象的.subscribeActual(observer)方法里边去了,只是简单的把下游的观察者对象 observer 对象作为参数传了进去。我们再来瞄一下 subscribeActual(observer);
// SingleJust.java#subscribeActual()
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
纳尼?直接就调用了传进来的 observer 的两个方法,即下游的 onSubscribe()和 onSuccess()方法就正常执行了,这个 Single 太简单了,简单到就不会执行下游的 onError()方法,下游一开始订阅(.subscrive()),事件就从上游直接传到了下游;
再来看下 Disposables.disposed()
是怎么 disposed 的,
// Disposables.java#disposed()
@NonNull
public static Disposable disposed() {
return EmptyDisposable.INSTANCE;
}
// 好继续往下跟踪,是一个枚举类
public enum EmptyDisposable implements QueueDisposable<Object> {
INSTANCE,
NEVER;
@Override
public void dispose() {
// no-op
}
@Override
public boolean isDisposed() {
// 始终返回 true,即一个 disposed 的状态
return this == INSTANCE;
}
// ……
}
当你调用 mDisposable.dispose();
切断上下游的事件时,什么作用都没有,因为这已经是一个被丢弃了的状态了,只是 Single 这里是这么简单处理的,因为用 Single 来发或生产事件是没有延时,也没有后续事件的,订阅的一瞬间就执行完毕了。我们甚至没看到下游观察者的 onError() 就没被调用。
来对 Single 做个小总结:
- 链的上游:生产者 Single
- 链的下游:观察者 SingleObserver
- Single.just(T) 会创建一个新的上游对象 SingleJust,并把我们的上游事件保存下来
- 一旦开始订阅(.subscribe),在 SingleJust 内部就会把事件直接交给 下游的 observer 处理
我们来看一个比较常用的操作符, map
Single.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
})
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onSuccess(String s) {
Log.e(TAG, "onSuccess: s=" + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: e=" + e.getMessage());
}
});
这个简单小实例中是把 int事件->String事件
,看看是事件是怎么转换的;
// Single.java#map()
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
// 哇 好眼熟,一个判空,一个钩子,不管,我们只看 创建的 SingleMap里边做了什么
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source; // 上游的事件生产者
final Function<? super T, ? extends R> mapper; // map 操作符,事件变换函数
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
// 同样的,这里的执行时机是下游的观察者订阅了事件的一瞬间
// 这个 source 在这个例子中,就是上游新建的 SingleJust对象,把上游和下游连接了起来
// 这个t 就是下游我们写的观察者,订阅的时候传了进来
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
// ......
}
}
// SingleMap 的内部类
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t; // 下游的 observer
final Function<? super T, ? extends R> mapper; // 事件变换函数
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
// 这里调用时机,还记得么?就是新创建的上游 SingleJust.java#subscribeActual()里调用的
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
MapSingleObserver
个人理解是装饰者模式的一种体现,它是对下游 observer 的一种装饰,即对被装饰对象.subscribe(new SingleObserver<String>() {})匿名内部类的
功能的一种增强。
画张图描述一下 不加 map 操作符时,代码的执行流程:
再看下 加了 map
操作符之后的流程,稍微复杂了点
当加了一个 map
操作符后,会创建一个新的上游对象 SingleMap
,然后新对象包裹旧对象(SingleJust
),同时会创建一个新的下游观察者 MapSingleObserver
,同样包裹真实的本例中下游观察者SingleObserver
;一旦订阅发生(.subscribe()
),事件的执行流行为:
SingleMap 会把事件往上传给 SingleJust,SingleJust(MapSingleObserver 回调 onSubscribe 和 onSuccess),然后往下传 MapSingleObserver(回调给真实的 observer.onSubscribe() 和 变换后回调给 observer.onSuccess ) 到此结束。
最后的总结开篇已经总结过了,至此我们已经明白了几件事:
- 链的上游是如何发送数据的
- 链的下游是如何接收数据的
- 如何将上下游进行关联起来的
- 简单的操作符
map
是如何工作的
事实上只有当下游发生订阅事件(调用.subscribe()) 时,上游才会发事件;同时会调用onSubscribe(Disposable)方法,如果链的中间有节点,那么会创建一个新的上游对象,然后包裹住老的上游对象,每个中间节点作用既是它上游的 observer,又是它下游的 Observable。
下一篇会介绍下,线程切换和事件中断丢弃相关源码。