Rxjava(一)事件变换的研究

为什么写这篇文章呢?
很多人在写Rxjava源码分析,笔者也看了很多,感觉讲得套路都一样,看完还是一脸懵逼,尤其是事件怎么变换,印象深刻的是很多人对BackPressure的翻译,背压,真是要命,看了使用,理解以后,我想为什么不叫BackupPressure,其实是备份压力,也就是发送事件的缓存压力,写这篇文章并不是说自己理解的多好,权当拿出来分享。
从一个简单的例子开始,例子借用曾辉大神的,看过他的课,很不错。
从网络下载一张图片,转成Bitmap,再画上水印,最后显示出来,线程切换先不说,用了Thread,尽量简化过程,便于分析

new Thread(new Runnable() {
            @Override
            public void run() {
                Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1555994563474&di=8ca112bd67ccbd6de75727fe064f8187&imgtype=0&src=http%3A%2F%2Fpic29.nipic.com%2F20130601%2F12122227_123051482000_2.jpg")
                        .map(new Function<String, Bitmap>() {
                            @Override
                            public Bitmap apply(String urlPath) throws Exception {
                                // 第五步
                                URL url = new URL(urlPath);
                                HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                                InputStream inputStream = urlConnection.getInputStream();
                                Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                                return bitmap;
                            }
                        })
                        .map(new Function<Bitmap, Bitmap>() {
                            @Override
                            public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
                                bitmap = createWatermark(bitmap, "RxJava2.0");
                                return bitmap;
                            }
                        })
                        .subscribe(new Consumer<Bitmap>() {
                            @Override
                            public void onNext(final Bitmap bitmap) {
                                // 第七步
                                Log.e("TAG", "item = " + bitmap);
                                runOnUiThread(new Runnable() {
                                    @Override
                                    public void run() {
                                        mImage.setImageBitmap(bitmap);
                                    }
                                });
                            }
                        });
            }
        }).start();

为了分析,我们把代码做了修改,如下,分步,取消了Rxjava引以为傲的流式

new Thread(new Runnable() {
            @Override
            public void run() {
                Observable<String> justObservable=Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1555994563474&di=8ca112bd67ccbd6de75727fe064f8187&imgtype=0&src=http%3A%2F%2Fpic29.nipic.com%2F20130601%2F12122227_123051482000_2.jpg");

                Observable<Bitmap>  ObversableMap1=justObservable.map(new Function<String, Bitmap>() {
                            @Override
                            public Bitmap apply(String urlPath) throws Exception {
                                // 第五步
                                URL url = new URL(urlPath);
                                HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                                InputStream inputStream = urlConnection.getInputStream();
                                Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                                return bitmap;
                            }
                        });

                Observable<Bitmap> ObversableMap2 =ObversableMap1.map(new Function<Bitmap, Bitmap>() {
                            @Override
                            public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
                                bitmap = createWatermark(bitmap, "RxJava2.0");
                                return bitmap;
                            }
                        });

                Consumer<Bitmap> consumer=new Consumer<Bitmap>() {
                    @Override
                    public void onNext(final Bitmap bitmap) {
                        // 第七步
                        Log.e("TAG", "item = " + bitmap);
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                mImage.setImageBitmap(bitmap);
                            }
                        });
                    }
                };

                ObversableMap2 .subscribe(consumer);


            }
        }).start();

好啦,开始分析了,源码是简化过的
其实Rxjava代码块主要分两部分,第一是构建逻辑块的过程和执行逻辑块的过程
划重点哈,构建过程和执行过程

构建过程

先上图:


image1.png

看完图后,分析一下,先不说怎么看源码,构建的过程中,自己先想想,为什么能不停地使用map等操作符一直连续调用,最后都通过subscribe关联起来,
稍微一细想就知道,无论怎么变换,从头到位构建过程中,都产生的是一个Observable,比如map操作符,其实map就调用了一句话

public <R> Observable<R> map(Function<T, R> function) {
        return onAssembly(new ObservableMap<>(this,function));
    }

那OnAssembly干了什么呢,就只是把产生的obvservable返回

private static <T> Observable<T> onAssembly(Observable<T> source) {
        // 留出来了
        return source;
    }

注意了,调用OnAssembly创建新的observable时,传入了上级的Observable和要做的function
就这样一层层构建下去,直到最后的observable。示例代码中,最后的MapObservable2,持有上级MapObservable1和自己要做的事funMap2,MapObservable1持有最上层的JustObservaable和他自己要做的事funMap1

执行过程

那么接下来,当调用MapObservable2的subscribe时发生了什么呢?注意下图箭头,是从下往上看,其实执行流程是从下往上,再从上往下执行的
执行逻辑块的过程 从下往上看

image2.png

当调用所有的Observable时,其实最后都调用了它们的subscribeActual,subscribeActual其实就一句话,执行上级的subcribe,同时把自己的的observer传给上级。

public class ObservableMap<T,R> extends Observable<R> {
    final Observable<T> source;// 前面的 Observable
    final Function<T, R> function;// 当前转换
    public ObservableMap(Observable<T> source, Function<T, R> function) {
        this.source = source;
        this.function = function;
    }

    @Override
    protected void subscribeActual(Observer<R> observer) {
        // 第一步
        // 对 observer 包裹了一层
        source.subscribe(new MapObserver(observer,function));
}
 private class MapObserver<T> implements Observer<T>{
        final Observer<R> observer;
        final Function<T, R> function;
        public MapObserver(Observer<R> source, Function<T, R> function) {
            this.observer = source;
            this.function = function;
        }

        @Override
        public void onSubscribe() {
            //observer.onSubscribe();
        }

        @Override
        public void onNext(@NonNull T item) {
            // item 是 String  xxxUrl
            // 要去转换 String -> Bitmap
            // 4.第四步 function.apply
            try {
                R applyR = function.apply(item);
                // 6. 第六步,调用 onNext
                // 把 Bitmap 传出去
                observer.onNext(applyR);
            } catch (Exception e) {
                e.printStackTrace();
                observer.onError(e);
            }
        }

        @Override
        public void onError(@NonNull Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            observer.onComplete();
        }
    }
}

直到顶层的JustObservable时,他的subscribeActual其实就时开始调用下级的observer,如下图,开始了执行阶段从上往下的过程,下图的observer是MapObserverable1的Observer,这样一层一层调用下去直到结束

public class ObservableJust<T> extends Observable<T> {
    private T item;
    public ObservableJust(T item) {
        this.item = item;
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {

        try {
            // 3.第三步 observer -> MapObserver.onNext(String)
            observer.onNext(item);
            observer.onComplete();
        }catch (Exception e){
            observer.onError(e);
        }
    }
}

总结,到此为止,不知道能不能给大家讲清楚事件的切换过程

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354