前言
RxJava框架运用了装饰设计模式(ps.叫静态代理也行,关于两种设计模式的区别)
其中几乎所有的操作符本质是都是new Obvservable(OnSubscribe<T> f)
,所以当你进行
Obvservable.create()
.map()
.lift
.subscribe()
等各种连点操作后,最原始的Obvservable
对象早就被封装了好几层了。。。当然最终调用subscrib()
方法的也肯定是最终被封装后的Obvservable
对象.
map源码分析
map几乎算所有操作符中最简单的一个了, 看完map再看lift, 鹅妹子嘤~~~
Obvservable 中
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
其中create是所有操作符中都常用的一个静态方法,本质就是new Obvservable(OnSubscribe<T> f)
,所以重点还是在于这个传入的OnSubscribe
对象:
重点在于将原始Subscriber封装后的MapSubscriber的onNext方法,其他方法可忽略。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
//原先的Observable
final Observable<T> source;
//自定义的转换方法
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
//将新包装后的MapSubscriber的subscription事件交给原先的Subscriber处理
o.add(parent);
// 下面一句话本质上调用的是source.onSubscribe.call(parent)
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
//重点在这里
result = mapper.call(t);
} catch (Throwable ex) {
...
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
...
actual.onError(e);
}
@Override
public void onCompleted() {
...
actual.onCompleted();
}
...
}
}