lift()是RxJava中变换思想的核心,接受一个Operator参数,返回一个新的Observable。
Operator
/**
* Operator function for lifting into an Observable.
* @param <T> the upstream's value type (input)
* @param <R> the downstream's value type (output)
*/
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
Operator接口继承了Func1接口,接受一个Subscriber,接受的Subscriber是下游调用Observable#subscriber时传递的Subscriber,也就是订阅了lift()返回的Observable的Subscriber。Operator#call返回的Subscriber则是用于订阅上游的Observable使用的。 数据的变换主要就发生在Operator#call中
lift()
// lift()核心代码
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
create(new Observable.OnSubscribe {
@Override
public void call(Subscriber subscriber) {
Subscriber parent = operator.call(subscriber);
parent.onStart();
onSubscribe.call(parent);
}
}
}
在lift()中的主要操作就是返回一个新的Observable,该Observable在被订阅时会通过Operator#call获取一个subscriber用于订阅调用lift()的Observable。
例如编写一个将Observable<Object>变成Observable<Integer>的Operator
public class O2IOperator implements Observable.Operator<Integer, Object> {
@Override
public Subscriber<? super Object> call(final Subscriber<? super Integer> child) {
Subscriber<Object> parent = new Subscriber<Object>() {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Object o) {
try {
int n = (int) o;
} catch (ClassCastException e) {
unsubscribe();
onError(e);
}
}
};
child.add(parent);
return parent;
}
}
其中,只是在onNext()中将Object转形成int类型的值,如果不能成功转型,将向下游抛出异常(child#onError)并且取消订阅上游的数据。还有就是将parent添加到child的Subscription中,这样当child取消订阅时,也将同时调用parent#unsubscribe退订上游数据,这在上篇文章《读RxJava源码:理解subscribe原理》中有提及。
总结
lift()函数通过接收的Operator参数完成了Observable之间的链式订阅。
通过如下代码观察完整的调用过程
Observable<Object> o1 = Observable.just((Object) 1);
Observable<Integer> o2 = o1.lift(new O2IOperator());
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
};
o2.subscribe(subscriber);
- 创建了一个Observable<Object> o1
- 通过lift()创建了一个Observable<Integer> o2
- 创建一个Subscriber用于订阅 o2
- subscriber 订阅 o2
- o2 在被订阅时通过O2IOperator#call取得一个subscriber用于订阅 o1
- o1 在被订阅时将1传递给从O2IOperator#call取得的subscriber
- 从02IOperator#call取得的subscriber收到数据处理后传递给 subscriber
- over