RxJava之concatMap系列转换操作符源码介绍

转载请以链接形式标明出处:
本文出自:103style的博客

转换相关的操作符 以及 官方介绍

RxJavaconcatMap 系列 转换操作符 官方介绍 :Transforming Observables

以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析

concatMap

  • 官方示例:
    Observable.range(0, 5)
            .concatMap(i -> {
                long delay = Math.round(Math.random() * 2);
    
                return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
            })
            .blockingSubscribe(System.out::print);
    
    输出:
    01234
    
  • 返回对象的 ObservableConcatMapsubscribeActual 方法:
    单参数的concatMap操作符默认的 delayErrorsErrorMode.IMMEDIATE
    public void subscribeActual(Observer<? super U> observer) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
            return;
        }
        if (delayErrors == ErrorMode.IMMEDIATE) {
            SerializedObserver<U> serial = new SerializedObserver<U>(observer);
            source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
        } else {
            source.subscribe(new ConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END));
        }
    }
    
  • 继续看 SourceObserveronNext(T t)
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (fusionMode == QueueDisposable.NONE) {
            queue.offer(t);
        }
        drain();
    }
    
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        drain();
    }
    
    void drain() {
        ...
        for (;;) {
            ...
            if (!active) {
                ...
                if (!empty) {
                    ObservableSource<? extends U> o;
                    try {
                        //1.0
                        o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        dispose();
                        queue.clear();
                        downstream.onError(ex);
                        return;
                    }
                    active = true;
                    o.subscribe(inner);//2.0
                }
            }
            ...
        }
    }
    
    • (1.0) 在这里我们看到通过concatMap操作符传入Functionapply重新构建了一个 ObservableSource对象。
    • (2.0) 然后新建的 ObservableSource对象来 subscribe(observer)

concatMapXXX

concatMapCompletableconcatMapCompletableDelayErrorconcatMapDelayErrorconcatMapEagerconcatMapEagerDelayErrorconcatMapIterableconcatMapMaybeconcatMapMaybeDelayErrorconcatMapSingleconcatMapSingleDelayError
实现逻辑和concatMap类似,就不再赘述了。

官方示例:

  • concatMapCompletable

    Observable<Integer> source = Observable.just(2, 1, 3);
    Completable completable = source.concatMapCompletable(x -> {
        return Completable.timer(x, TimeUnit.SECONDS)
                .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
    });
    completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
            .blockingAwait();
    

    输出:

    Info: Processing of item "2" completed
    Info: Processing of item "1" completed
    Info: Processing of item "3" completed
    Info: Processing of all items completed
    
  • concatMapCompletableDelayError

    Observable<Integer> source = Observable.just(2, 1, 3);
    Completable completable = source.concatMapCompletableDelayError(x -> {
        if (x.equals(2)) {
            return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
        } else {
            return Completable.timer(1, TimeUnit.SECONDS)
                    .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
        }
    });
    
    completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
            .onErrorComplete()
            .blockingAwait();
    

    输出:

    Info: Processing of item "1" completed
    Info: Processing of item "3" completed
    Error: Processing of item "2" failed!
    
  • concatMapDelayError

    Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
            .concatMapDelayError(x -> {
                if (x.equals(1L))
                    return Observable.error(new IOException("Something went wrong!"));
                else return Observable.just(x, x * x);
            })
            .blockingSubscribe(
                    x -> System.out.println("onNext: " + x),
                    error -> System.out.println("onError: " + error.getMessage()));
    

    输出:

    onNext: 2
    onNext: 4
    onNext: 3
    onNext: 9
    onError: Something went wrong!
    
  • concatMapEager

    Observable.range(0, 5)
            .concatMapEager(i -> {
                long delay = Math.round(Math.random() * 3);
    
                return Observable.timer(delay, TimeUnit.SECONDS)
                        .map(n -> i)
                        .doOnNext(x -> System.out.println("Info: Finished processing item " + x));
            })
            .blockingSubscribe(i -> System.out.println("onNext: " + i));
    

    输出:

    Info: Finished processing item 2
    Info: Finished processing item 3
    Info: Finished processing item 1
    Info: Finished processing item 0
    Info: Finished processing item 4
    onNext: 0
    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    
  • concatMapEagerDelayError

    Observable<Integer> source = Observable.create(emitter -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onError(new Error("Fatal error!"));
    });
    
    source.doOnError(error -> System.out.println("Info: Error from main source " + error.getMessage()))
            .concatMapEagerDelayError(x -> {
                return Observable.timer(1, TimeUnit.SECONDS).map(n -> x)
                        .doOnSubscribe(it -> System.out.println("Info: Processing of item \"" + x + "\" started"));
            }, true)
            .blockingSubscribe(
                    x -> System.out.println("onNext: " + x),
                    error -> System.out.println("onError: " + error.getMessage()));
    

    输出:

    Info: Processing of item "1" started
    Info: Processing of item "2" started
    Info: Error from main source Fatal error!
    onNext: 1
    onNext: 2
    onError: Fatal error!
    
  • concatMapIterable

    Observable.just("A", "B", "C")
            .concatMapIterable(item -> Arrays.asList(item, item, item))
            .subscribe(System.out::print);
    

    输出:

    AAABBBCCC
    
  • concatMapMaybe

    Observable.just("5", "3,14", "2.71", "FF")
            .concatMapMaybe(v -> {
                return Maybe.fromCallable(() -> Double.parseDouble(v))
                        .doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed."))
                        // Ignore values that can not be parsed.
                        .onErrorComplete();
            })
            .subscribe(x -> System.out.println("onNext: " + x));
    

    输出:

    onNext: 5.0
    Info: The value "3,14" could not be parsed.
    onNext: 2.71
    Info: The value "FF" could not be parsed.
    
  • concatMapMaybeDelayError

    DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu");
    Observable.just("04.03.2018", "12-08-2018", "06.10.2018", "01.12.2018")
            .concatMapMaybeDelayError(date -> {
                return Maybe.fromCallable(() -> LocalDate.parse(date, dateFormatter));
            })
            .subscribe(
                    localDate -> System.out.println("onNext: " + localDate),
                    error -> System.out.println("onError: " + error.getMessage()));
    

    输出:

    onNext: 2018-03-04
    onNext: 2018-10-06
    onNext: 2018-12-01
    onError: Text '12-08-2018' could not be parsed at index 2
    
  • concatMapSingle

    Observable.just("5", "3,14", "2.71", "FF")
            .concatMapSingle(v -> {
                return Single.fromCallable(() -> Double.parseDouble(v))
                        .doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed."))
    
                        // Return a default value if the given value can not be parsed.
                        .onErrorReturnItem(42.0);
            })
            .subscribe(x -> System.out.println("onNext: " + x));
    

    输出:

    onNext: 5.0
    Info: The value "3,14" could not be parsed.
    onNext: 42.0
    onNext: 2.71
    Info: The value "FF" could not be parsed.
    onNext: 42.0
    
  • concatMapSingleDelayError

    DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu");
    Observable.just("24.03.2018", "12-08-2018", "06.10.2018", "01.12.2018")
            .concatMapSingleDelayError(date -> {
                return Single.fromCallable(() -> LocalDate.parse(date, dateFormatter));
            })
            .subscribe(
                    localDate -> System.out.println("onNext: " + localDate),
                    error -> System.out.println("onError: " + error.getMessage()));
    

    输出:

    onNext: 2018-03-24
    onNext: 2018-10-06
    onNext: 2018-12-01
    onError: Text '12-08-2018' could not be parsed at index 2
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容