lift()方法是RxJava中所有操作符的基础,可以通过它做各种各样的变化。弄清楚它的原理,也方便我们理解其他操作符。首先先看几个相关接口。
Func1 接口
public interface Func1<T, R> extends Function {
R call(T t);
}
Func1接口会按照泛型参数的顺序传入T,并返回R。
Operator 接口
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
按照Func1接口的定义,Operator接口会传入一个Subscriber<? super R>参数,并返回一个Subscriber<? super T>。
关于Operator和lift()中泛型顺序的问题
也许有人(is me)第一眼看到Observable<T>,Operator<R, T>,Func1<T, R>这几个类的泛型参数,头都大了,关键是Operator的泛型参数顺序为什么是R, T,而不是T, R?
其实这里不需要关心顺序是什么,只需要记住Operator<R, T>是按照泛型参数的顺序,传入一个Subscriber<R>参数,并返回一个Subscriber<T>,写成Operator<A, B>或者Operator<M, N>是没有任何区别的。
lift()调用流程
首先需要记住lift()方法是在一个已有Observable上调用的。
lift()方法核心代码:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = operator.call(o);
st.onStart();
// 这里的onSubscribe是调用lift方法的Observable中的onSubscribe
onSubscribe.call(st);
}
});
}
根据代码的调用流程来分析:
1、假设已有一个Observable<T>,调用lift()方法,生成一个Observable<R>,此时就有了两个Observable和两个OnSubscribe对象。
2、然后调用Observable<R>的subscribe()方法,传入一个Subscriber<R>对象,此时触发Observable<R>.onSubscribe.call()方法,也就是上面lift()方法中的call()方法。
3、在该方法中会调用onSubscribe.call()方法,注意这个onSubscribe是Observable<T>中的那个OnSubscribe<T>对象,它需要传入一个Subscriber<T>对象,这个对象是通过operator.call()方法生成的。正是这个Operator对象将两个Subscriber对象关联起来,OnSubscribe<T>在执行Subscriber<T>.onNext(T t)方法的时候也会执行Subscriber<R>.onNext(R r),而这里从T变成R,正好用到了传到Operator中的参数Func1<T, R>。
4、如果具体化一点,上面的Observable<T>就是事件源,对它进行lift()变换得到新的Observable<R>,这个新的Observable的回调已经固定,相当于是一个模板(也就是上面lift()方法中的call()方法)。这时调用subscribe(),传入的Subscriber<R>是用户定义的事件监听者,但它监听的是新的Observable<R>,这个Observable的回调是固定的,它并不能产生新事件,所以得靠事件源Observable<T>。这个时候Operator生成一个中间的Subscriber<T>对象,该对象的作用就是接收事件源的事件,并将事件转给用户定义的Subscriber。这个Subscriber<T>并没有消耗事件,而是起着一个代理的作用。所以Operator可以看做是一个生成代理的工具类。在这个转发过程中有一个数据类型的变化过程,也是通过Operator的转换器Func1实现的,想怎样转换数据,也是用户定义后传到Operator中的。
小结
1、我们需要把Observable的调用看做一条流。
2、对于Observable<T> -> Observable<R>这个变化,订阅者为Subscriber<R>,在subscriber()方法调用后,流的顺序为倒序的,即从Observable<R> -> Observable<T>,因为我们始终需要调用最开始的事件源。为了满足这个需求,会通过Operator<R, T>这个代理工具生成一个代理Subscriber<T>,这也解释了为什么在声明Operator时泛型参数的顺序写为R, T,正好可以和这一变化对应起来,用相同的泛型参数更便于理解。这样准备工作就都做好了。
3、Observable<T>开始向Subscriber<T>发送事件,发送的参数类型为T,这时候通过转换器Func1将T变成R,这样就能顺利的通过代理Subscriber<T>将事件发送给Subscriber<R>了。
4、所以流的路线为Observable<R> -> Observable<T> -> Subscriber<T> -> Subscriber<R>。一条线分成两部分,前半部分为准备工作,后半部分为执行操作。
下图是lift()的过程,其中虚线箭头代表生成,实线箭头代表调用。也可以参考 扔物线 - 给 Android 开发者的 RxJava 详解 中的配图。

map()方法
map()方法是RxJava中使用lift()最简单的方法,如果上面lift()方法过于抽象,可以通过该方法来加深理解。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
看到OperatorMap.call()方法,它直接生成一个新的Subscriber,通过上面的分析可以知道,这是一个代理Subscriber,所以它的onNext()等方法都只是直接调用了外部传进来的Subscriber。
举个例子:
Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
.map(new Func1<Float, Integer>() {
@Override
public Integer call(Float aFloat) {
return Math.round(aFloat);
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return Integer.toBinaryString(integer);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
log("2 map onNext->" + s);
}
});
// outputs
// 2 map onNext->1
// 2 map onNext->1000
// 2 map onNext->11111111111111111111110111101010
// 2 map onNext->110001001
该例子是一个Float->Integer->String的转换。我们按上面的流程来分析。
1、生成一个Observable<Float>。
2、调用map()生成一个Observable<Integer>。
3、再调用map()生成一个Observable<String>。
4、subscribe()一个Subscriber<String>。至此流的前半部分完成。
5、执行开始,Observable<String>发送事件,先生成一个Subscriber<Integer>传给Observable<Integer>(Observable<Integer>.onSubscribe.call())。
6、Observable<Integer>开始发送事件,同样的生成一个Subscriber<Float>传给Observable<Float>(Observable<Float>.onSubscribe.call())。
7、真正发送事件开始,Observable<Float>调用Subscriber<Float>.onNext(Float)等方法,同时Subscriber<Integer>.onNext(Integer)被调用,同时Subscriber<String>.onNext(String)被调用,事件发送完成。
8、虽然是流的模型,但其实是一堆内部类和外部类的嵌套关系。