为什么写这篇文章呢?
很多人在写Rxjava源码分析,笔者也看了很多,感觉讲得套路都一样,看完还是一脸懵逼,尤其是事件怎么变换,印象深刻的是很多人对BackPressure的翻译,背压,真是要命,看了使用,理解以后,我想为什么不叫BackupPressure,其实是备份压力,也就是发送事件的缓存压力,写这篇文章并不是说自己理解的多好,权当拿出来分享。
从一个简单的例子开始,例子借用曾辉大神的,看过他的课,很不错。
从网络下载一张图片,转成Bitmap,再画上水印,最后显示出来,线程切换先不说,用了Thread,尽量简化过程,便于分析
new Thread(new Runnable() {
@Override
public void run() {
Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1555994563474&di=8ca112bd67ccbd6de75727fe064f8187&imgtype=0&src=http%3A%2F%2Fpic29.nipic.com%2F20130601%2F12122227_123051482000_2.jpg")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String urlPath) throws Exception {
// 第五步
URL url = new URL(urlPath);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
InputStream inputStream = urlConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
})
.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
bitmap = createWatermark(bitmap, "RxJava2.0");
return bitmap;
}
})
.subscribe(new Consumer<Bitmap>() {
@Override
public void onNext(final Bitmap bitmap) {
// 第七步
Log.e("TAG", "item = " + bitmap);
runOnUiThread(new Runnable() {
@Override
public void run() {
mImage.setImageBitmap(bitmap);
}
});
}
});
}
}).start();
为了分析,我们把代码做了修改,如下,分步,取消了Rxjava引以为傲的流式
new Thread(new Runnable() {
@Override
public void run() {
Observable<String> justObservable=Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1555994563474&di=8ca112bd67ccbd6de75727fe064f8187&imgtype=0&src=http%3A%2F%2Fpic29.nipic.com%2F20130601%2F12122227_123051482000_2.jpg");
Observable<Bitmap> ObversableMap1=justObservable.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String urlPath) throws Exception {
// 第五步
URL url = new URL(urlPath);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
InputStream inputStream = urlConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
});
Observable<Bitmap> ObversableMap2 =ObversableMap1.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
bitmap = createWatermark(bitmap, "RxJava2.0");
return bitmap;
}
});
Consumer<Bitmap> consumer=new Consumer<Bitmap>() {
@Override
public void onNext(final Bitmap bitmap) {
// 第七步
Log.e("TAG", "item = " + bitmap);
runOnUiThread(new Runnable() {
@Override
public void run() {
mImage.setImageBitmap(bitmap);
}
});
}
};
ObversableMap2 .subscribe(consumer);
}
}).start();
好啦,开始分析了,源码是简化过的
其实Rxjava代码块主要分两部分,第一是构建逻辑块的过程和执行逻辑块的过程
划重点哈,构建过程和执行过程
构建过程
先上图:
看完图后,分析一下,先不说怎么看源码,构建的过程中,自己先想想,为什么能不停地使用map等操作符一直连续调用,最后都通过subscribe关联起来,
稍微一细想就知道,无论怎么变换,从头到位构建过程中,都产生的是一个Observable,比如map操作符,其实map就调用了一句话
public <R> Observable<R> map(Function<T, R> function) {
return onAssembly(new ObservableMap<>(this,function));
}
那OnAssembly干了什么呢,就只是把产生的obvservable返回
private static <T> Observable<T> onAssembly(Observable<T> source) {
// 留出来了
return source;
}
注意了,调用OnAssembly创建新的observable时,传入了上级的Observable和要做的function
就这样一层层构建下去,直到最后的observable。示例代码中,最后的MapObservable2,持有上级MapObservable1和自己要做的事funMap2,MapObservable1持有最上层的JustObservaable和他自己要做的事funMap1
执行过程
那么接下来,当调用MapObservable2的subscribe时发生了什么呢?注意下图箭头,是从下往上看,其实执行流程是从下往上,再从上往下执行的
执行逻辑块的过程 从下往上看
当调用所有的Observable时,其实最后都调用了它们的subscribeActual,subscribeActual其实就一句话,执行上级的subcribe,同时把自己的的observer传给上级。
public class ObservableMap<T,R> extends Observable<R> {
final Observable<T> source;// 前面的 Observable
final Function<T, R> function;// 当前转换
public ObservableMap(Observable<T> source, Function<T, R> function) {
this.source = source;
this.function = function;
}
@Override
protected void subscribeActual(Observer<R> observer) {
// 第一步
// 对 observer 包裹了一层
source.subscribe(new MapObserver(observer,function));
}
private class MapObserver<T> implements Observer<T>{
final Observer<R> observer;
final Function<T, R> function;
public MapObserver(Observer<R> source, Function<T, R> function) {
this.observer = source;
this.function = function;
}
@Override
public void onSubscribe() {
//observer.onSubscribe();
}
@Override
public void onNext(@NonNull T item) {
// item 是 String xxxUrl
// 要去转换 String -> Bitmap
// 4.第四步 function.apply
try {
R applyR = function.apply(item);
// 6. 第六步,调用 onNext
// 把 Bitmap 传出去
observer.onNext(applyR);
} catch (Exception e) {
e.printStackTrace();
observer.onError(e);
}
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
直到顶层的JustObservable时,他的subscribeActual其实就时开始调用下级的observer,如下图,开始了执行阶段从上往下的过程,下图的observer是MapObserverable1的Observer,这样一层一层调用下去直到结束
public class ObservableJust<T> extends Observable<T> {
private T item;
public ObservableJust(T item) {
this.item = item;
}
@Override
protected void subscribeActual(Observer<T> observer) {
try {
// 3.第三步 observer -> MapObserver.onNext(String)
observer.onNext(item);
observer.onComplete();
}catch (Exception e){
observer.onError(e);
}
}
}