RxJava2源码分析
RxJava的鼎鼎大名相信Android开发的同学都非常熟悉了,其实不仅仅有RxJava,还有RxJs,RxKotlin等等一系列。可以说Rx并不是一种局限于Android的框架,Rx是一种思想,我们深入了解了RxJava,同样会加深我们对其他Rx系列的认知。
使用方法
我们来看一个常见的例子:
Observable.create(ObservableOnSubscribe<Int> { e ->
e.onNext(1)
e.onComplete()
}).map {
Log.d("map-thread : ", Thread.currentThread().name)
Log.d("--", "------------------------------")
"result : $it"
}.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe-thread : ", Thread.currentThread().name)
Log.d("--", "------------------------------")
}
override fun onNext(s: String) {
Log.d("onNext-thread : ", Thread.currentThread().name)
Log.d("onNext-result", s)
Log.d("--", "------------------------------")
}
override fun onError(e: Throwable) {
Log.d("onError-thread : ", Thread.currentThread().name)
Log.d("onError-message : ", e.message)
Log.d("--", "------------------------------")
Log.d("--", "------------------------------")
Log.d("--", "------------------------------")
}
override fun onComplete() {
Log.d("onComplete-thread : ", Thread.currentThread().name)
Log.d("--", "------------------------------")
Log.d("--", "------------------------------")
Log.d("--", "------------------------------")
}
})
这是一个使用Kotlin写的例子,对Kotlin不熟悉的同学无需关注代码细节,大致能看懂什么意思就行。首先发送一个数字1,然后通过 map 操作符把数字1变成 result : 1 ,将之前的操作切换到IO线程,将之后的操作切换到主线程。
整个RxJava的使用可以分为三个部分:
- 创建发送数据的原始Observable
- 使用Rxjava的操作符对发送事件做相应变换
- 使用subscribeOn和observeOn做线程切换
接下来我会对这三个部分做详细的说明。
创建发送数据的原始Observable
创建发送数据的原始Observable采用的是 Observable.create() ,当然使用 Observable.just(1) 、 Observable.from(list) 等等这些方法也可以创建发送数据的Observable,其实这些操作符的本质都是创建了一个新的Observable,在订阅的时候发送了一些数据。我们着重看一下 Observable.create() :
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//确保source非空
ObjectHelper.requireNonNull(source, "source is null");
//返回一个ObservableCreate对象
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
第一句很好理解,确保传入的 ObservableOnSubscribe 对象非空,第二句本质是直接返回了 new ObservableCreate<T>(source)
执行的结果,我们以后在看到 RxJavaPlugins.onAssembly(obj)
类似的代码时,直接可以理解为返回了一个 obj 。所以这个方法实际上做了一件事:创建了一个 ObservableCreate 类型的对象并返回。
这里我们先了解一下在订阅时,也就是调用 subscribeWith(observer) 时具体是做了什么:
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
public final void subscribe(Observer<? super T> observer) {
//确保observer非空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//这里一般都是直接返回传入的observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//不同类型的Observable的具体订阅的方法,所有的数据发送操作都在这个方法中
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
先看第一个方法 subscribeWith(E observer) ,在这个方法内部调用了 subscribe(observer) 这个方法。 subscribe(observer) 这个方法的核心是 subscribeActual(observer)
这一句。subscribeActual(observer) 这个方法是Observable内的一个抽象方法,Observable经过create操作符后会变成一个 ObservableCreate,经过map操作符会变成一个 ObservableMap 其他的操作符都是一个套路。经过操作符转化而来的Observable都是Observable的子类,在这些子类的内部都会实现 subscribeActual(observer) 这个抽象方法,并在这个方法内做发送事件的相关操作。
经过上面的分析,我们知道了 subscribeActual(observer) 这个方法是不同Observable的关键,有了这个前置知识,我们接着上面的进度看一下 ObservableCreate 这个Observable子类中的 subscribeActual(observer) 方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//对observer的包装
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//这是关键
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
关键是 source.subscribe(parent)
这一句。这个source就是 ObservableOnSubscribe 的实例,是在 Observable.create(source) 中传入的参数。 ObservableOnSubscribe 是一个接口,在这个接口的 subscribe 方法中我们定义了数据的发送方式。这个 ObservableOnSubscribe 可以理解为一个 数据发送的剧本 ,这个剧本具体的实现细节写在了 subscribe方法内。在最上面的例子中,我们通过 e.onNext(1);e.onComplete()
发送了一个数字1,紧接着通知事件已经发送完毕。
使用RxJava的操作符对发送事件做相应变换
由于RxJava的事件操作符太多,我们这里只讲map操作符的源码分析:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//确保mapper非空
ObjectHelper.requireNonNull(mapper, "mapper is null");
//创建并返回一个ObservableMap对象
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
是不是很熟悉,map操作符和create操作符是一个套路,事实上所有的操作符都是一个套路: 创建并返回了一个对原有Observable的包装类 ,那我们来看一下这个 ObservableMap :
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
//对原始observer的包装类
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v;
try {
//在这里将T类型的原始数据t变换为类型U的新数据v
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//调用原始observer的onNext
actual.onNext(v);
}
//忽略无关代码......
}
ObservableMap 内部有有两个值得我们注意的点:一个是我们上面提过的 subscribeActual 方法;另一个是它的内部类 MapObserver。
subscribeActual 中依然是 source.subscribe(new MapObserver<T, U>(t, function))
这句熟悉的代码,值得注意的是这句代码并没有传入原始的observer,而是传入了 MapObserver 对象。简而言之, MapObserver 是一个包装类,它的内部包含我们原始的observer: actual 和我们变换的具体操作: mapper 。新的 MapObserver 包装类在它的 onNext 方法中做了两步操作:
- 通过 mapper.apply(t) 实现数据的类型变换,获取新的变换后的数据
- 将新类型的数据传入,通过 actual.onNext(v) 实现原始observer接收变换后新数据的功能
其实在不了解RxJava的源码之前,我们常常会惊叹于RxJava的神奇:明明我发送的是数字1,我要接收的却是字符串类型的数据,这中间仅仅是通过了一个map操作符,简直太神奇了!在我们了解了map操作符的源码之后,就会知道:我们在订阅的时候map操作符将原始的observer包装成了一个 MapObserver 对象,在这个 MapObserver 内部的 onNext 方法中首先会将原始数据 1 通过我们传入的变换规则变换为 result : 1,在变换之后才会调用我们编写的原始observer来处理新的 String 类型的数据。
使用subscribeOn和observeOn做线程切换
在分析线程切换之前,我们先明确一些前置知识:从主线程切换到子线程的操作很简单,新开一条线程,在新的线程执行即可。从子线程切换到主线程,在Android开发中,所有的第三方框架使用的都是Handler机制。所以不要把这些第三方框架想的特别难,他们的本质都是Android中的基础知识。
我们以 subscribeOn(Schedulers.io())
这个例子来分析,首先传入的是 Schedulers.io() ,直接说结论吧,它就是一个 IoScheduler 类型的单例对象。继续看代码:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
我们都很熟悉了,本质是创建并返回了一个 ObservableSubscribeOn 类型的对象。我们继续看 ObservableSubscribeOn 的内部:
public void subscribeActual(final Observer<? super T> s) {
//创建一个对原始observer的包装对象
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//这里是关键
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
//忽略无关代码......
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//这一句我们非常熟悉了,在这里实现了事件的订阅
source.subscribe(parent);
}
}
这里跟 ObservableMap 中的 subscribeActual 也是一个套路:
- 创建一个下一级observer的包装类 SubscribeOnObserver
- 通过 source.subscribe(parent) 实现事件的订阅
通过看 SubscribeOnObserver 这个包装类中的 onNext 方法也可以明白:它的内部直接调用的就是 actual.onNext(t) ,没有像map一样做数据的变换,这很好理解,因为 subscribeOn(schedule) 本身就只是为了切换线程,并不做其他多余的操作,所以这个包装类中的 onNext 才会直接调用下一级observer的onNext。
可能有的小伙伴要有问题了:你说通过 source.subscribe(parent) 实现事件的订阅,但 subscribeActual 中并没有你说的代码啊?其实在这里 scheduler.scheduleDirect(new SubscribeTask(parent))
这里传入了一个 SubscribeTask 对象,这个对象其实是个 Runnable ,在它的 run() 方法中调用了 source.subscribe(parent)
。到目前为止,我们唯一陌生的就是 scheduleDirect 这个方法了,这个方法定义在 Scheduler 这个线程调度类中,它实际调用的是:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorker是一个抽象方法,不同的线程调度器有不同的实现
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//事件调度处理
w.schedule(task, delay, unit);
return task;
}
scheduleDirect 这个方法做了两件事:创建一个 Worker 工作类,调用工作类的 schedule 来进行事件调度。不同的线程调度器会有不同的处理,对于 IoSchedule 这个调度器来说,它最终是执行这个方法:
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//忽略无关代码......
Future<?> f;
try {
if (delayTime <= 0) {
//通过线程池来开启一个子线程执行
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
通过上面的分析我们可以知道, subscribeOn(Schedulers.io()) 本质上是通过线程池开启了一个线程,在这个新的线程中还是通过调用 source.subscribe(observer) 来订阅事件。
subscribeOn(Schedulers.io()) 分析过了,我们再来看一下 subscribeOn(AndroidSchedulers.mainThread()) 这个线程切换。
AndroidSchedulers.mainThread() 返回的是一个 HandlerScheduler 类型的线程调度器,它的 scheduleDirect 方法定义如下:
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//本质是通过handler机制实现线程切换
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
从它的方法内部可以看到,它是通过 handler.postDelayed()
来实现切换到主线程的功能的,这个handler是定义在主线程的handler。其他类型的线程调度器就不再分析了,本质都是大同小异。
接下来我们来看一下 observeOn(AndroidSchedulers.mainThread()) ,额,其实它的套路也是一样的,创建并返回了一个 ObservableObserveOn ,我们主要关注它的 subscribeActual 方法:
protected void subscribeActual(Observer<? super T> observer) {
//忽略无关代码......
Scheduler.Worker w = scheduler.createWorker();
//订阅包装类ObserveOnObserver
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//下一级观察者
final Observer<? super T> actual;
//对应Scheduler里的Worker
final Scheduler.Worker worker;
//上一级Observable发送的数据队列
SimpleQueue<T> queue;
Disposable s;
//如果onError了,保存对应的异常
Throwable error;
//是否完成
volatile boolean done;
//是否取消
volatile boolean cancelled;
// 代表同步发送 异步发送
int sourceMode;
....
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//忽略无关代码......
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
//执行过error / complete ,直接返回
if (done) {
return;
}
//如果数据源类型不是异步的, 默认不是
if (sourceMode != QueueDisposable.ASYNC) {
//将上一级Observable发送的数据加入队列中
queue.offer(t);
}
//这句代码是线程调度的核心,进入相应Worker线程,在相应线程发送数据队列中的数据
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
//这句代码是线程调度的核心,进入相应Worker线程,在相应线程发送数据队列中的数据
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
//开始调度
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
//默认是false
if (outputFused) {
drainFused();
} else {
//取出队列中的数据并发送
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
//如果已经结束或队列为空,跳出循环
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//从队列里取出一个值
v = q.poll();
} catch (Throwable ex) {
//异常处理并跳出函数
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//调用下一级的observer的onNext方法发送事件
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//如果已经disposed
if (cancelled) {
queue.clear();
return true;
}
// 如果已经结束
if (d) {
Throwable e = error;
//如果是延迟发送错误
if (delayError) {
//如果空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//停止worker(线程)
worker.dispose();
return true;
}
} else {
//发送错误
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
//发送complete
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
可以看到 subscribeOn 与 observeOn 不同的是二者线程切换的地方。 subscribeOn 是将 source.subscribe(observer) 整个部分切换到了相应的线程,而 observeOn 则是在 ObserveOnObserver 这个包装类中的 onNext 、 onError 方法中做了线程的切换,具体是在 onNext 等方法中创建了相关的 Worker 工作类,并调用了这个类的 schedule 方法,这个跟我们讲 subscribeOn 是一样的。
到此为止,RxJava2的源码就基本分析完了。最后根据最初的例子,用一幅图来展示RxJava的流程: