我之前写过RxJava1.x中Map操作的原理,然而时隔半年,我已经全然忘记RxJava1.x是怎么实现的了,唯一的印象是挺复杂的。
当时可能也是浅尝辄止,对其理解不是那么深,过一段时间决定就忘记了。所以把这些现在理解的东西记录下来是很有必要的。
所以,蹬蹬瞪蹬……,RxJava2版本的Map原理闪亮登场。
本文是基于RxJava浅析——事件如何从上游传递到下游。建议先看下这篇。
先上代码,没错,本文就是要分析下这段代码的执行过程。跟上一篇分析区别就在与create()与subscribe()之间多了一个map操作。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError:", e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
我们知道Observable.create()会返回一个ObservableCreate对象。ObservableCreate是Observable的子类。记住这是我们第一个碰到的具体的Observable实现类。
那么核心代码就在这里了:
//Observable的map方法。ObservableCreate.map使用的是父类(Observable)的实现
//不是源码,去掉了一些空检查等
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<T, R>(this, mapper);
}
//附上Observable.create()方法对比一下
//代码位置:io.reactivex.Observable.java
//不是源码,是我简化后的代码。源码还有空检查,Hook方法的回调等。这些不影响整个逻辑,所以可以先忽略。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
呀呀呀,这段代码是不是很熟悉,跟create()方法一样的套路啊。只是这个返回的是一个ObservableMap,这个ObservableMap也是Observable的子类,这是我们碰到的第二个具体的Observable实现类。
来看看构造方法中看了啥。
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);//其实就是在父类中保存source,不用在乎这个super。
this.function = function;//我们外部实现的Function接口。
}
呀呀呀,也只是保存了一下source和function。套路还是一样的。注意注意,这个source应该是create()方法返回的ObservableCreate对象哦~
奥,解释一下这个Function也是一个接口。
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
这个接口中只有一个apply()方法,接收T类型参数,返回R类型参数。在我们的例子里就是接收Integer类型的参数,返回String类型的参数。妥妥的,就是一个map操作嘛。把Integer类型转换成了String类型。怎么转换?当然是我们自己实现Function接口。在例子里只是简单地把Integer转换成了String而已。
所以到此为止,啥也没发生。
关键当然还是把上下游连接起来的subscribe(Observer)方法。
map操作时候返回的是ObservableMap对象,所以subscribe(Observer)方法最后会调到ObservableMap.subscribeActual()方法。这个还不清楚的,参见“RxJava浅析——事件如何从上游传递到下游”
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); //Point 0
}
这货干了两件事情:
- 创建了
MapObserver。 - 调用了
source.subscribe()方法,并把创建出来的MapObserver传入。
先来讲第一件事情,这是我们除了自己创建的Observer之外,第一次遇到RxJava2内部创建的Observer对象。MapObserver是ObservableMap的静态内部类。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);//Point 1
this.mapper = mapper;//Point 2
}
}
可以看到这个MapObserver继承BasicFuseableObserver, 可以猜到其父类肯定是继承Observer.
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R>
这个QueueDisposable可以就理解成Disposable。我们暂时不扯那么远。
回到Point 1,这句话就是把我们外部的Observer给保存下来。
Point 2,这句话就是把我们的function保存下来。
重点来了!
看Point 0处的代码,这个source是我们的ObservableCreate,这里的入参是刚刚创建的ObserverMap。所以RxJava内部又发生了我们在“RxJava浅析——事件如何从上游传递到下游”所分析的事情。
还记得么?这个subscribe()方法最终调到ObservableCreate.subscribeActual()方法。再来看一遍:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);//Point 3
try {
source.subscribe(parent); //Point 4
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里就很熟悉啦。只是这里的observer是RxJava内部帮我们创建的ObserverMap。
Point 3 会调到ObserverMap.onSubscribe()
//代码在BasicFuseableObserver中,非源码。去掉其他的一些代码
@Override
public final void onSubscribe(Disposable s) {
this.s = s;
actual.onSubscribe(this);
}
这个actual就是我们在创建ObserverMap时传入的,就是我们外部实现的Observer。可以把this传出去,是因为ObserverMap本身也是Disposable接口的实现类。当然入参s(实际上是CreateEmitter)也会保存下来。
Point 4中会调到我们外部实现的ObservableOnSubscribe接口的subscribe()。即执行事件发送。再提一下,这个source是在创建ObservableCreate时保存的。
所以呢,当我们使用CreateEmitter发送数据时: e.onNext(1)
实际上也是先调到CreateEmitter的onNext(1),他调到他自己保存的Observer的onNext(1),在我们的例子里这个就是ObserverMap。所以我们来看ObserverMap的onNext方法。
//代码位置:ObserverMap的onNext方法。
final Function<? super T, ? extends U> mapper;
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
关键点在于mapper.apply(t),这句话把T类型的对象转换成了U类型的对象,也就是我们例子中把Integer类型的对象转换成了String类型。requireNonNull只是对转换后的对象进行了空检查。因为这个转换是我们外部提供的实现,并不能保证非空。
最后再调用actual.onNext(v)就是调到了我们提供的Observer的onNext方法,此时类型已经发生了转换。
以上就是map操作的原理啦。对onError和onComplete事件其实没有起什么作用。
最后用一个图来解释:

没有Map操作的时候
- 事件流向:CreateEmitter->Observer。
- 创建流的过程如图中数字所示。
增加Map操作的时候事件流向
- 事件流向:CreateEmitter->ObserverMap->Observer。
- 创建流的过程如图中数字所示。