concat操作符可以接收若干个Observables,并且保证发射的数据是有序的。
官方文档:Returns an Observable that emits the items emitted by three Observables, one after the other, without interleaving them.
onNext、onComplete的触发顺序
//关键代码示例
Observable.concat(firstObservable, secondObservable)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(concatObserver);
以上面的代码为例,总结一下onNext、onComplete的执行顺序。
1、concatObserver按顺序接收到firstObservable的onNext传递的数据,secondObservable的onNext传递的数据,最后再触发onComplete。
2、firstObservable必须要执行emitter.onComplete后,secondObservable的emitter.onNext才能传递到concatObserver的onNext方法。
3、firstObservable和secondObservable必须都要调用emitter.onComplete才能执行concatObserver的onComplete方法。
4、firstObservable、secondObservable在emitter.onComplete方法后调用的emitter.onNext并不会抵达concatObserver的onNext方法。emitter.onError方法后的emitter.onNext方法同上。但不要再emitter.onComplete后调用emitter.onError,否则出现io.reactivex.exceptions.UndeliverableException
onNext、onError的触发顺序
1、firstObservable执行emitter.onError后,secondObservable的emitter.onNext不会触发,且secondObservable的subscribe都没有触发。
onNext、onError的坑
一般情况,eg不切换线程,secondObservable必须等firstObservable的onComplete之后才会触发。但是在开发中遇到一个场景,firstObservable查询db缓存正常,触发emitter.onNext,emitter.onComplete方法。但是secondObservable因网络异常立即返回了Exception触发emitter.onError。这时concatObserver竟然没有触发onNext,只触发了一次onError。
各种排查后,看到stackoverflow上一个提问,ReactiveX concat doesn't produce onNext from first observable if second fails immediately恍然大悟。
根据observerOn默认方法的javadoc说明,onError事件可能插队到onNext之前执行。说明如下:
Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous.
可以用observerOn的一个重载方法,增加一个delayError参数为true。
indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream
我理解的是firstObservable在其observeOn的线程准备触发concat的observer;secondObservable在其observeOn线程触发;二者最终都需要在concat的observeOn上运行。在这个过程中,如果firstObservable和secondObservable还有concat的Observer都不在一个线程,就可能出现时序问题,导致onError截断到onNext之前。
所以另一种方案也生效:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
参考文档:
1、Rxjava2中Concat操作符onNext,OnError,OnComplte的执行顺序
2、 ReactiveX concat doesn't produce onNext from first observable if second fails immediately