转载请以链接形式标明出处:
本文出自:103style的博客
转换相关的操作符 以及 官方介绍
RxJava 之 concatMap 系列 转换操作符 官方介绍 :Transforming Observables
concatMapconcatMapCompletableconcatMapCompletableDelayErrorconcatMapDelayErrorconcatMapEagerconcatMapEagerDelayErrorconcatMapIterableconcatMapMaybeconcatMapMaybeDelayErrorconcatMapSingleconcatMapSingleDelayError
以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析。
concatMap
- 官方示例:
输出:Observable.range(0, 5) .concatMap(i -> { long delay = Math.round(Math.random() * 2); return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i); }) .blockingSubscribe(System.out::print);01234 - 返回对象的
ObservableConcatMap的subscribeActual方法:
单参数的concatMap操作符默认的delayErrors为ErrorMode.IMMEDIATE。public void subscribeActual(Observer<? super U> observer) { if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) { return; } if (delayErrors == ErrorMode.IMMEDIATE) { SerializedObserver<U> serial = new SerializedObserver<U>(observer); source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize)); } else { source.subscribe(new ConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END)); } } - 继续看
SourceObserver的onNext(T t):public void onNext(T t) { if (done) { return; } if (fusionMode == QueueDisposable.NONE) { queue.offer(t); } drain(); } public void onComplete() { if (done) { return; } done = true; drain(); } void drain() { ... for (;;) { ... if (!active) { ... if (!empty) { ObservableSource<? extends U> o; try { //1.0 o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); dispose(); queue.clear(); downstream.onError(ex); return; } active = true; o.subscribe(inner);//2.0 } } ... } }-
(1.0)在这里我们看到通过concatMap操作符传入Function的apply重新构建了一个ObservableSource对象。 -
(2.0)然后新建的ObservableSource对象来subscribe(observer)。
-
concatMapXXX
concatMapCompletable、concatMapCompletableDelayError、concatMapDelayError、concatMapEager、concatMapEagerDelayError、concatMapIterable、concatMapMaybe、concatMapMaybeDelayError、concatMapSingle、concatMapSingleDelayError
实现逻辑和concatMap类似,就不再赘述了。
官方示例:
-
concatMapCompletableObservable<Integer> source = Observable.just(2, 1, 3); Completable completable = source.concatMapCompletable(x -> { return Completable.timer(x, TimeUnit.SECONDS) .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed")); }); completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed")) .blockingAwait();输出:
Info: Processing of item "2" completed Info: Processing of item "1" completed Info: Processing of item "3" completed Info: Processing of all items completed -
concatMapCompletableDelayErrorObservable<Integer> source = Observable.just(2, 1, 3); Completable completable = source.concatMapCompletableDelayError(x -> { if (x.equals(2)) { return Completable.error(new IOException("Processing of item \"" + x + "\" failed!")); } else { return Completable.timer(1, TimeUnit.SECONDS) .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed")); } }); completable.doOnError(error -> System.out.println("Error: " + error.getMessage())) .onErrorComplete() .blockingAwait();输出:
Info: Processing of item "1" completed Info: Processing of item "3" completed Error: Processing of item "2" failed! -
concatMapDelayErrorObservable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS) .concatMapDelayError(x -> { if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!")); else return Observable.just(x, x * x); }) .blockingSubscribe( x -> System.out.println("onNext: " + x), error -> System.out.println("onError: " + error.getMessage()));输出:
onNext: 2 onNext: 4 onNext: 3 onNext: 9 onError: Something went wrong! -
concatMapEagerObservable.range(0, 5) .concatMapEager(i -> { long delay = Math.round(Math.random() * 3); return Observable.timer(delay, TimeUnit.SECONDS) .map(n -> i) .doOnNext(x -> System.out.println("Info: Finished processing item " + x)); }) .blockingSubscribe(i -> System.out.println("onNext: " + i));输出:
Info: Finished processing item 2 Info: Finished processing item 3 Info: Finished processing item 1 Info: Finished processing item 0 Info: Finished processing item 4 onNext: 0 onNext: 1 onNext: 2 onNext: 3 onNext: 4 -
concatMapEagerDelayErrorObservable<Integer> source = Observable.create(emitter -> { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Error("Fatal error!")); }); source.doOnError(error -> System.out.println("Info: Error from main source " + error.getMessage())) .concatMapEagerDelayError(x -> { return Observable.timer(1, TimeUnit.SECONDS).map(n -> x) .doOnSubscribe(it -> System.out.println("Info: Processing of item \"" + x + "\" started")); }, true) .blockingSubscribe( x -> System.out.println("onNext: " + x), error -> System.out.println("onError: " + error.getMessage()));输出:
Info: Processing of item "1" started Info: Processing of item "2" started Info: Error from main source Fatal error! onNext: 1 onNext: 2 onError: Fatal error! -
concatMapIterableObservable.just("A", "B", "C") .concatMapIterable(item -> Arrays.asList(item, item, item)) .subscribe(System.out::print);输出:
AAABBBCCC -
concatMapMaybeObservable.just("5", "3,14", "2.71", "FF") .concatMapMaybe(v -> { return Maybe.fromCallable(() -> Double.parseDouble(v)) .doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed.")) // Ignore values that can not be parsed. .onErrorComplete(); }) .subscribe(x -> System.out.println("onNext: " + x));输出:
onNext: 5.0 Info: The value "3,14" could not be parsed. onNext: 2.71 Info: The value "FF" could not be parsed. -
concatMapMaybeDelayErrorDateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu"); Observable.just("04.03.2018", "12-08-2018", "06.10.2018", "01.12.2018") .concatMapMaybeDelayError(date -> { return Maybe.fromCallable(() -> LocalDate.parse(date, dateFormatter)); }) .subscribe( localDate -> System.out.println("onNext: " + localDate), error -> System.out.println("onError: " + error.getMessage()));输出:
onNext: 2018-03-04 onNext: 2018-10-06 onNext: 2018-12-01 onError: Text '12-08-2018' could not be parsed at index 2 -
concatMapSingleObservable.just("5", "3,14", "2.71", "FF") .concatMapSingle(v -> { return Single.fromCallable(() -> Double.parseDouble(v)) .doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed.")) // Return a default value if the given value can not be parsed. .onErrorReturnItem(42.0); }) .subscribe(x -> System.out.println("onNext: " + x));输出:
onNext: 5.0 Info: The value "3,14" could not be parsed. onNext: 42.0 onNext: 2.71 Info: The value "FF" could not be parsed. onNext: 42.0 -
concatMapSingleDelayErrorDateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu"); Observable.just("24.03.2018", "12-08-2018", "06.10.2018", "01.12.2018") .concatMapSingleDelayError(date -> { return Single.fromCallable(() -> LocalDate.parse(date, dateFormatter)); }) .subscribe( localDate -> System.out.println("onNext: " + localDate), error -> System.out.println("onError: " + error.getMessage()));输出:
onNext: 2018-03-24 onNext: 2018-10-06 onNext: 2018-12-01 onError: Text '12-08-2018' could not be parsed at index 2