map变换
从Obserable事件发出开始,每一次map操作符的调用,就是将上一个的Obserable事件用ObserableMap包装了一次,而ObserableMap继承自抽象类Obserable,本质是静态代理模式。
map变换的精华也就在ObservableMap这个类中:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
public void subscribeActual(Observer<? super U> t) {
this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
public void onNext(T t) {
if(!this.done) {
if(this.sourceMode != 0) {
this.actual.onNext((Object)null);
} else {
Object v;
try {
v = ObjectHelper.requireNonNull(this.mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable var4) {
this.fail(var4);
return;
}
this.actual.onNext(v);
}
}
}
public int requestFusion(int mode) {
return this.transitiveBoundaryFusion(mode);
}
@Nullable
public U poll() throws Exception {
T t = this.qs.poll();
return t != null?ObjectHelper.requireNonNull(this.mapper.apply(t), "The mapper function returned a null value."):null;
}
}
}
这段实例代码来自https://www.jianshu.com/p/b3b0170152ff:
Observable.just("http://img.taopic.com/uploads/allimg/130331/240460-13033106243430.jpg")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String urlPath) throws Exception {
URL url = new URL(urlPath);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
InputStream inputStream = urlConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
})
.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
bitmap = createWatermark(bitmap, "RxJava2.0");
return bitmap;
}
})
.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(Bitmap bitmap) throws Exception {
return bitmap;
}
})
.subscribe(new Consumer<Bitmap>() {
@Override
public void onNext(final Bitmap bitmap) {
mImage.setImageBitmap(bitmap);
}
});
上图的流程:自上而下发射事件(map变换),再自下而上订阅事件(subscribe),最后自上而下正真执行事件function(onNext方法)。
首先,由Obserable.just生成一个事件源,然后通过map变换,将事件源向下发射,这个过程是怎样的呢?让我们看下
Obserable类中的map方法
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
RxJavaPlugins.onAssembly方法我们不需关注,可以直接简化成return new ObservableMap<T, R>(this, mapper);
参数this就是Obserable.just生成的最初的事件源
ObservableMap构造方法如下:
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
ObserverableMap继承自AbstractObservableWithUpstream:
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
public final ObservableSource<T> source() {
return this.source;
}
}
结合起来看,就是ObserverableMap中保存了上游的Observerable事件源,以及变换方法(就是map方法中传入的function)。
多次map变换,也就是重复以上这一过程,不断的将上游的Observerable事件源包装为新的ObserverableMap。
最终,事件的执行需要我们subscribe消费者:Observable.subscribe(subscriber)
。
此时的Observable其实是ObservableMap,subscibe其实调用的是ObservableMap中的subscribeActual方法。为什么是这样调用的,回到ObservableMap父类Observable中看下就清楚了:
@SchedulerSupport("none")
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
this.subscribeActual(observer);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
throw npe;
}
}
protected abstract void subscribeActual(Observer<? super T> var1);
回到ObservableMap的subscribeActual方法中,回想下java的多态,继续看:
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
拆开来看其实分为两步:
1.Observer observer = new MapObserver<T, U>(t, function)
这里,生成Observer的代理类MapObserver,同时将之前ObservableMap中保存的变换方法传进去。
2.source.subscribe(observer )
source为之前ObservableMap中保存的上游Observable事件,将下游的Observer交给上游处理。
总体上来看,就是自下而上,每遇到ObservableMap就重复一下这个订阅过程。一直到ObservableJust,即我们的事件源,看下ObservableJust,它也是Observable的子类,同时也重写了subscribeActual方法:
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable(s, this.value);
s.onSubscribe(sd);
sd.run();
}
ScalarDisposable又是一个静态代理,它代理了那个代理了好多层的Observer
,sd.run()方法里面调用了代理了好多层的Observer
的onNext代理方法。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
public void onNext(T t) {
if(!this.done) {
if(this.sourceMode != 0) {
this.actual.onNext((Object)null);
} else {
Object v;
try {
v = ObjectHelper.requireNonNull(this.mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable var4) {
this.fail(var4);
return;
}
this.actual.onNext(v);
}
}
}
public int requestFusion(int mode) {
return this.transitiveBoundaryFusion(mode);
}
@Nullable
public U poll() throws Exception {
T t = this.qs.poll();
return t != null?ObjectHelper.requireNonNull(this.mapper.apply(t), "The mapper function returned a null value."):null;
}
}
onNext方法内部,
首先this.mapper.apply(t),
执行了变换方法。
继续又调用了下游Observer的onNext方法。
这样就一层一层自上而下的执行onNext方法,将事件串成流那样执行了。
总结
整体流程分析下来,我们发现ObservableMap这个类太关键了,它持有者上游的Obserable事件源
以及变换方法
,同时它的subscribeActual方法中,将下游的Observer代理成MapObserver。MapObserver是ObservableMap的内部类,代理方法onNext里,执行了变换方法
。onNext的执行是在ObserableJust的subscribeActual触发的。