Rxjava:flatmap源码分析:onComplete没有运行

之前在学习Rxjava时,写demo。发现在FlatMap运算中,即使上游发送了onComplete,在下游也无法执行onComplete:


        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
//                        Log.i(TAG, "!!!!onNext 1 before");
//                        e.onNext(1);
//                        Log.i(TAG, "!!!!onNext 1 after");

                        Log.i(TAG, "!!!!onNext 2 before");
                        e.onNext(2);
                        Log.i(TAG, "!!!!onNext 2 after");

                        // 位置1:如果注释以下代码,在位置10出将无法触发
                        Log.i(TAG, "!!!!onComplete before");
                        e.onComplete();
                        Log.i(TAG, "!!!!onComplete after");
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(final Integer integer) throws Exception {
                        Log.i(TAG, "@@@@flatMap " + integer);
                        Observable<String> ob = Observable
                                .create(new ObservableOnSubscribe<String>() {
                                    @Override
                                    public void subscribe(final ObservableEmitter<String> e) throws Exception {
                                        String value = "####flatMap: Observable2,create: " + integer;
                                        Log.i(TAG,value);
                                        Log.i(TAG, "####flatMap: Observable2, next " + integer + " before");
                                        e.onNext(value);
                                        Log.i(TAG, "####flatMap: Observable2, next " + integer + " after");

                                        if(integer == 2){
                                            // 位置2:如果注释以下代码,在位置10出将无法触发
//                                            Log.i(TAG, "####flatMap: Observable2, onComplete 2 before");
//                                            e.onComplete();
//                                            Log.i(TAG, "####flatMap: Observable2, onComplete  2 after");
                                        }
                                    }
                                });
                        return ob;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i(TAG, "XXXXsubscribe:" + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.i(TAG, "TTTTerror");
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        //位置10:触发onComplete .如果注释位置1和位置2中任何一处,都将无法触发这里
                        Log.i(TAG, "VVVVcomplete");
                    }
                }
                );

Log.i(TAG, "VVVVcomplete"); 没有执行

为什么???

分析Flatmap运算符源码,主要是ObservableFlatMap 这个类:

关键类:MergeObserver,InnerObserver
关键方法:mergeObserver.onNext(),merge.onComplete(),mergeObserver.drain(),merge.insertInnser(),merge.removeInner();
innerObserver.onComplete(),

mergeObserver.drain()关键代码:
  • 当innerObserver中触发了onComplete,将移除innerObserver


    QQ图片20171230235242.png
  • 当mergeObserver.onComplete或onError被触发,done置为true,并且没有innerObserver时,才能触发child.onComplete()


    QQ图片20171230235250.png
innserObserver.onComplete(),onError()
QQ图片20171230235254.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,565评论 7 62
  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,033评论 1 9
  • 最近项目里面有用到Rxjava框架,感觉很强大的巨作,所以在网上搜了很多相关文章,发现一片文章很不错,今天把这篇文...
    Scus阅读 6,894评论 2 50
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,193评论 6 151
  • 厦门是一个富有文艺气息的海滨城市,7月份的时候去了趟那里,姜母鸭,芒果绵绵冰,各种海鲜,回忆的我都流口水啦…… 海...
    柒柒猫1343阅读 371评论 0 2