Rxjava可以非常方便的完成线程的切换,链式调用这种艺术般的设计深受开发者的喜爱。本节通过源码来深入了解一下这其中的原理。
网上有很多介绍的文章,但大部分都有些晦涩难懂。本文旨在由浅入深,一步步深入“虎穴”。
最基本调用
先看一下最简单的调用方式:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("subscribe---" + Thread.currentThread().getName());
System.out.println("发送数据:hello \n");
e.onNext("hello");
e.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe---" + Thread.currentThread().getName() + '\n');
}
@Override
public void onNext(String s) {
System.out.println("onNext---" + Thread.currentThread().getName() + '\n');
System.out.println("接收数据:" + s);
}
@Override
public void onComplete() {
System.out.println("onComplete---" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
可以说没有添加任何逻辑,只有发送与接收。这里把每一步的所在线程,发送和接收的数据,打印了出来。
运行结果:
onSubscribe---main
subscribe---main
发送数据:hello
onNext---main
接收数据:hello
onComplete---main
注意一下subscribe
方法,代码里出现了两个。
第一个是我们发送数据用的,属于ObservableOnSubscribe
接口;
第二个是提交整个链式调用的,属于ObservableSource
接口。
不要搞混了,打印出来的是第一个。
然后注意一下打印出来的顺序,onSubscribe
是先于subscribe
的,稍后我们从源码中查看原因。
整个代码片段用到了两个Rxjava的方法,create
创建最初的数据源;ObservableSource
的subscribe
创建数据接收者。
下面我们先看看create
的实现:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
总共做了3件事:
- 检查source是否为空,合法性检查很常见,后面不再关注;
- 创建一个
ObservableCreate
对象,将source传给它; - 调用
RxJavaPlugins.onAssembly
方法,以ObservableCreate
为参,并最终返回该方法的返回值。
我们先看看RxJavaPlugins.onAssembly
是干嘛的,代码不搞透彻总觉得不够处女座:
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
从类名称也可以看出来,RxJavaPlugins
是一个hook类。说人话就是,我们可以设置一些自己的操作。
比如上述方法中的onObservableAssembly
我们可以设置一下,打印Observable
的类名:
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println(observable);
return observable;
}
});
看看结果:
io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
onSubscribe---main
subscribe---main
发送数据:hello
onNext---main
接收数据:hello
onComplete---main
可以看到打印出来的就是第2步提到的ObservableCreate
类。下面就来看一下这个类的构造方法:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
ObservableCreate
会把传进来的source先保存下来。这里的source
就是我们创建的用来发送数据的ObservableOnSubscribe
。如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("subscribe---" + Thread.currentThread().getName());
System.out.println("发送数据:hello \n");
e.onNext("hello");
e.onComplete();
}
})
到此,create
的流程走完了,我们看一下subscribe
,注意是链式调用里面的:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看似很长的代码,其实真正相关的就是调用了subscribeActual
方法:
protected abstract void subscribeActual(Observer<? super T> observer);
这是一个抽象方法,具体的实现,我们去子类中看。这里的子类就是我们的ObservableCreate
:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
- 将
observer
包装成CreateEmitter
对象; - 调用
observer.onSubscribe
,CreateEmitter
作为参数; - 调用
source.subscribe
,CreateEmitter
作为参数。
再次说明一下,这里的observer
和source
是我们自己写的!
这里也能看到,onSubscribe
是最先执行的方法。
我们调用e.onNext("hello");
发送数据,这个e
就是CreateEmitter
对象。
然后看一下CreateEmitter
:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
这里省略部分代码。该类是对observer
的一个封装。
需要注意的是,这里的onNext onComplete
继承自Emitter
接口,而不是Observer
接口。
Rxjava好多接口中存在同名方法,可能产生混乱,要记得区分。
添加subscribeOn
所有代码如下:
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println(observable);
return observable;
}
});
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("subscribe---" + Thread.currentThread().getName());
System.out.println("发送数据:hello \n");
e.onNext("hello");
e.onComplete();
}
})
.subscribeOn(Schedulers.single())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println();
System.out.println("onSubscribe---" + Thread.currentThread().getName() + '\n');
}
@Override
public void onNext(String s) {
System.out.println("onNext---" + Thread.currentThread().getName() + '\n');
System.out.println("接收数据:" + s);
}
@Override
public void onComplete() {
System.out.println("onComplete---" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
添加了subscribeOn(Schedulers.single())
,看下打印结果:
io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
io.reactivex.internal.operators.observable.ObservableSubscribeOn@1ee0005
onSubscribe---main
subscribe---RxSingleScheduler-1
发送数据:hello
onNext---RxSingleScheduler-1
接收数据:hello
onComplete---RxSingleScheduler-1
有如下变化:
- 多了一个
ObservableSubscribeOn
对象,该对象的创建过程与ObservableCreate
类似 -
onSubscribe
依旧在主线程调用,其他的都是在子线程
我们依次创建了ObservableCreate
和ObservableSubscribeOn
两个对象,那最终调用的就是ObservableSubscribeOn
的subscribe
方法。
先看一下ObservableSubscribeOn
的构造器:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
除了source
,还多了一个我们传入的Scheduler。
注意这里的source
已经是ObservableCreate
对象了。因为我们是在ObservableCreate
对象上执行的subscribeOn
方法。
我们最终调用的是ObservableSubscribeOn
的subscribe
方法,会执行到:
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到,还是会先在subscribe
方法调用的线程调用onSubscribe
,然后在scheduler
设置的线程运行SubscribeTask
。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
任务很简单,在scheculer设置的线程执行source
的subscribe方法。这样就完成了线程的切换。
这里还涉及到一个很重要的SubscribeOnObserver
类:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
该类的作用:
1、保证只有一次onSubscribe
回调,如下:
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
ObservableCreate
调用的是这个类中的onSubscribe
,该回调并不会继续向下传递,保证只回调一次。
2、及时dispose
3、其他
数据流向图:
添加observeOn
在subscribeOn
后面添加一句observeOn(Schedulers.computation())
,看下输出:
io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
io.reactivex.internal.operators.observable.ObservableSubscribeOn@1ee0005
io.reactivex.internal.operators.observable.ObservableObserveOn@25618e91
onSubscribe---main
subscribe---RxSingleScheduler-1
发送数据:hello
onNext---RxComputationThreadPool-1
接收数据:hello
onComplete---RxComputationThreadPool-1
可以看到,又多了一个ObservableObserveOn
类。
看一下构造器:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
同样传入source
和scheduler
。
-
delayError
——true,则接收到onError
事件后,立即停止onNext
分发,并向下传递onError
;false,则等待队列中的onNext
分发完毕后,再分发onError
事件。 -
bufferSize
——缓冲区大小,默认是128
我们最终调用的是ObservableObserveOn
的subscribe
方法,会执行到:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
若传入的是Schedulers.trampoline()
,则不作任何处理,直接把下游的observer
传递给上游的source
;否则使用ObserveOnObserver
包装下游的observer
,并将ObserveOnObserver
传递给上游的source
。
先看下ObserveOnObserver
的onSubscribe
方法:
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
...
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//向下传递
actual.onSubscribe(this);
}
}
这里省略了一些代码,先不去管它。可以看到会创建一个SpscLinkedArrayQueue
对象,初始大小为我们设置的bufferSize
。实际上,它的结构是这样的:
可以看到是一个链式的数组,可以无限扩容,每次扩容都会new一个新的数组,旧数组的最后一个元素指向新数组。并且数组的容量是bufferSize+1。SpscLinkedArrayQueue
在设置数组容量时,也会做限制,如下:
int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize));
最小是8,且一定是2的n次方。这是为了方便查找Index,即将求余数简化为n&(bufferSize-1)
,提高效率。
看一下onNext
:
@Override
public void onNext(T t) {
if (done) {
//一旦收到onError或onComplete,立即停止接收新的onNext事件
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
先不用考虑sourceMode,正常会将事件放入队列中,然后执行schedule
:
void schedule() {
if (getAndIncrement() == 0) {//每次调用使计数+1。只有为0时,才会执行worker.schedule,防止重复调用
worker.schedule(this);
}
}
在worker线程执行run方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
正常执行到drainNormal
:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {//队列为空时,最多自旋两次尝试取数据(missed控制)
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {//循环取数据
boolean d = done;
T v;
try {
//从队列取数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {//数据去完,跳出该层循环
break;
}
//向下游分发数据
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
这样就完成了线程切换。
missed作用:若当前计数为10,则第一次调用,missed和当前计数变为9;第二次调用计数归0,完成两次自旋后跳出循环,防止cpu空转。这时因为置0了,所以若有新的onNext
事件,schedule
方法又可以重新调用worker.schedule
,开启循环。
作者这种优化思路,非常值得借鉴。
至此,针对上面的例子,有如下结论:
- 每次链式调用都会创建一个新的
Observable
对象 -
onSubscribe
在当前线程调用 -
subscribeOn
多次调用时,数据在第一次调用的线程中发送 -
observeOn
多次调用时,数据在最后一次调用的线程中向下分发
上游是下游的source,下游是上游的observer,可以理解为一个双向链表结构。
subscribeOn
控制上游所在的线程,observeOn
控制下游所在的线程,这样就好理解多了。
多次调用observer
先看下代码吧。ObserveOnObserver#onSubscribe
被我们省略的部分代码:
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
//qd就是上一层的ObserveOnObserver对象
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
...
if (m == QueueDisposable.ASYNC) {
//ASYNC
sourceMode = m;
//可以理解为就是上游的队列
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
//设置上游的标志位
outputFused = true;
return ASYNC;
}
return NONE;
}
下游的observeOn会拿到上游的队列。
然后在onNext
中:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
//只有第一次的observeOn会执行入队逻辑
queue.offer(t);
}
schedule();
}
run
方法:
@Override
public void run() {
if (outputFused) {
//非最后一个observeOn调用
drainFused();
} else {
//最后一个observeOn调用
drainNormal();
}
}
drainNormal
已经介绍过了,就是从队列中取数据,向下分发。看下drainFused
:
void drainFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
//向下分发null
actual.onNext(null);
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
可以看到只是向下分发了null。这很好理解,因为最下层的observeOn
可以直接从最上层的observeOn
的队列中取得本次数据。
通过学习源码可以发现,作者在可靠性和性能上做了很多的工作,这都是值得我们学习的地方。
说在最后
Rxjava无疑是一个非常优秀的链式调用框架,能极大减少开发量。不过也有一些弊端不可忽视:
- jar包有2MB
- 调用过程产生大量的临时对象,使用不当会产生严重的内存问题
总而言之,因地制宜,量体裁衣。