最近我准备在年后离职,所以就看了看RxJava的源码,相信我会加入到年后的求职大军中23333。其实现在突然离开苏州还有点不舍,从18年来到苏州我在这里呆了2-3年了,去过很多地方,有很多的朋友都在这里,也对这个公司很熟悉。现在开始做年后的面试准备,我会和大家一起看看源码,今天我们就从RxJava开始。之前RxJava用过很多次但是没有怎么看过它的源码,今天就好好看。
首先我们要弄清楚RxJava中的几个类:
- Observable:被订阅者,是事件的来源,通过Emitter发射数据给Observer。
- Observer:订阅者,通过注册(onSubscribe)过程传给被订阅者,订阅者监听开始订阅,监听订阅过程中会把Disposable传给订阅者,然后在被订阅者中的发射器(Emitter)发射数据给订阅者(Observer)。
- Disposable:释放器,通常有两种方式会返回Disposable,一个是在Observer的onSubscribe方法回调回来,第二个是在subscribe订阅方法传consumer的时候会返回。
- Emitter:发射器,在发射器中会接收下游的订阅者(Observer),然后在发射器相应的方法把数据传给订阅者(Observer)。
- Scheduler:调度器,用于切换线程,不同的调度器(Scheduler)可以将代码放入到不同线程去执行和观察。
- Consumer:消费器,消费器其实是Observer的一种变体,Observer的每一个方法都会对应一个Consumer,比如Observer的onNext、onError、onComplete、onSubscribe都会对应一个Consumer。
1、操作符create源码分析
下面是一个简单的实例代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(@io.reactivex.rxjava3.annotations.NonNull Integer s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
Observable.create()方法的实现是这样子的
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
里面new了一个ObservableCreate对象传到了RxJavaPlugins.onAssembly()中,而RxJavaPlugins.onAssembly()返回了什么呢?我们看一下下面的代码:
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
这段代码中因为我们没有初始化f所以它就是null,所以RxJavaPlugins.onAssembly()返回的就是传入的对象的本身,返回的就是source。
2、subscribe()操作符的源码分析
以下代码有省略,我只放出来关键部分的代码
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
public final void subscribe(@NonNull Observer<? super T> observer) {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
}
protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
}
其中RxJavaPlugins.onSubscribe()和上面的RxJavaPlugins.onAssembly()一样,返回的就是我们传入的observer。subscribeActual()调用的就是我们上游的 Observable.create()创建的ObservableCreate对象,它是Observable的子类。大家记住这个subscribeActual()方法它是之后我们要讲的其它操作符的关键。我们再看看subscribeActual()具体实现中做了什么?
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
}
}
subscribeActual()实际上它创建了一个发射器new CreateEmitter<>(observer)并且把我们在下游创建的Observer对面给传入进去了。而这个source就是我们一开始Observable.create()时我们自己创建的匿名对象,如下代码所示:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
实际上source.subscribe(parent)就会调用到我们new的匿名对象里面,我们调用发射器的时候如emitter.onNext(1)就会执行发射器的中的onNext(),下面代码中的observer就是我们在subscribe()订阅时我们创建的Observer(观察者)匿名对象
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
小总结:
- 当我们Observable.create()时就会new ObservableCreate<>(source)这个source就是我们创建的匿名对象,ObservableCreate作为它的成员变量保存起来。
- 我们去调用subscribe()方法时,会执行上游的Observable中的subscribeActual()方法,而我们上游创建的是ObservableCreate对象,所以接下来它会执行ObservableCreate类中的subscribeActual(observer)方法。
-subscribeActual()方法会把我们创建的Observer对象传入到CreateEmitter(发射器)中。并且执行source.subscribe(parent)。此时就会调用我们Observable.create()时创建的匿名对象中的subscribe()方法。- 当我们调用emitter.onNext(1)就会调用我们创建的Observer中的onNext()方法
3、map操作符源码解读
我们先看一段示例代码:
Observable<Integer> createObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
Observable<String> mapObservable = createObservable.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Throwable {
return String.valueOf(integer);
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(@io.reactivex.rxjava3.annotations.NonNull String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
mapObservable.subscribe(observer);
通过上面我们知道create()操作符会创建一个ObservableCreate对象,同理map操作符会创建一个ObservableMap,他们都是被观察者都继承了Observable,他们能完成不同的功能其实就是通过subscribeActual()方法实现的。下面我们将具体来看一下。
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
在调用map()时会创建一个ObservableMap对象。这个this就是createObservable对象,mapper就是我们创建的匿名对象。同理当我们调用subscribe()订阅方法的时候就会执行ObservableMap类中的subscribeActual()方法。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);//上游的被观察者对象,如ObservableCreate
this.function = function;//我们在map()方法中传入的匿名对象
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));//创建一个观察者对象,再订阅上游的被观察者
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;//我们在map()方法中传入的匿名对象
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {//当执行了onComplete()或者onError()就不再执行
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
//调用了我们创建的匿名对象中的applp()方法
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//downstream:下游的Observer对象
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
}
}
在subscribeActual()方法中创建了一个观察者对象(MapObserver),同时去订阅上游的被观察者(ObservableCreate)。
小总结
- map操作符在subscribeActual()中创建了一个观察者对象MapObserver,通过subscribe()去订阅上游的被观察者。
- 当上游的被观察者ObservableCreate发生变化之后(如调用了emitter.onNext(1))就会执行MapObserver类中的onNext()方法。
- 然后再调用我们传入的Function对象中的apply()方法。并将返回(v)值通过onNext()传入到下游的Observer(我们创建的匿名对象)中的onNext()方法中。
4、subscribeOn操作符(线程切换)
如下示例代码,我们通过subscribeOn()操作符去切换线程,从而使被观察者在哪个线程去执行。
Observable<Integer> createObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
Observable<Integer> observableSubscribeOn = createObservable.subscribeOn(Schedulers.io());
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(@io.reactivex.rxjava3.annotations.NonNull Integer s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
observableSubscribeOn.subscribe(observer);
下面我们看一下subscribeOn()的具体实现:
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
subscribeOn()中创建了一个ObservableSubscribeOn(被观察者)的对象
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
//创建一个新的观察者(SubscribeOnObserver)并将下游的观察者observer传入进去
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
//调用下游的observer的onSubscribe方法,这里我们可以看见,下游的onSubscribe还是在当前线程执行的
observer.onSubscribe(parent);
//这句代码的本质就是将一个实现了Runnable接口的SubscribeTask放到线程池去执行
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
当执行了subscribeActual()方法时里面包装了一个SubscribeOnObserver(观察者),并将下游的observer作为成员变量。并创建一个SubscribeTask对象,将这个对象交由scheduler去执行。scheduler就是Schedulers.io()。SubscribeTask 本身就一个实现了 Runnable接口的类,当线程开始实行的时候就会执行run()方法中的 source.subscribe(parent),所以我们才可以将其放到IO线程去执行。剩下的onNext(),__ onError()__,onComplete()类似都是调用下游的observer(观察者对象)。
从上面的源代码中我们明白了,不管subscribeOn()执行了多少次只会以第一次为准。
下面我们在看一下scheduler
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
当我们在调用Schedulers.io()时返回是一个IO的成员变成,它其实是在静态代码块中进行初始化的。
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
//初始化IO
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
下面的initIoScheduler()就是返回一个Scheduler,callRequireNonNull(defaultScheduler);
方法的实质就是获取Scheduler,最后通过new了一个IoScheduler对象。
@NonNull
public static Scheduler initIoScheduler(@NonNull Supplier<Scheduler> defaultScheduler) {
Objects.requireNonNull(defaultScheduler, "Scheduler Supplier can't be null");
Function<? super Supplier<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
小总结
- subscribeOn(Schedulers.io())的实质就是将被观察者(Observable)放入到线程池中去执行订阅source.subscribe(parent)
observeOn操作符源码解读
Observable<Integer> observableObserveOn = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
通过上面我们关于subscribeOn()源码解读我们知道其实质就是将subscribe()放入到子线程去执行,所以也能猜到observeOn()就是将onNext()放入到主线程去执行,下面我们来看一下源码。
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
在observeOn()里面创建了一个ObservableObserveOn对象,它同样也是一个被观察者继承至Observable,并且显示了Runnable接口。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//通过我们传入的scheduler,获取Worker
Scheduler.Worker w = scheduler.createWorker();
//订阅是在当前线程执行的
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
}
}
ObserveOnObserver收到上游的被观察者信息,调用onNext(),__ onError()__等方法时,就会将其放入到线程中去执行。关于drainFused()和drainNormal()的本质就下游的onNext()方法放入到主线程去执行(因为这两个方法行数有点多,我没有放入到上面的代码中去)。
小总结
- 我们看见observeOn()操作符和前面的一样都是将其封装成一个继承至Observable的ObservableObserveOn对象。
- 在其内部通过subscribeActual()订阅上游的被观察者,并且将下游的Observer(观察者)和Scheduler.Worker包装到它的静态内部类中(ObserveOnObserver)。
- 当上游的被观察发生变化的时候就会调用schedule()方法,将下游的onNext(),onError()等回调放入到observeOn()中传入的线程中去执行。
最后
RxJava的本质就是观察者模式,不同的操作符都是返回一个被观察者(Observable),并且将下游的观察者封装到另一个Observer中(如:将下游的Observer封装到MapObserver中),操作符能完成特定的功能是因为特定的Observable和Observer在subscribeActual(),onNext(),onSubscribe(),onError()等方法中完成了具体的实现。