一,merge操作符
可以合并多个Observable的输出,它们的数据可能会交错发射(concat可以保持顺序)。如果某个原始Observable出现onError,merge后的Observable就会onError
二,mergeDelayError
在合并和交错输出上和merge一样,但是mergeDelayError的原始observable出现onError时,错误通知会被保留,直到所有数据发射完毕后才执行onError。
如果有多个原始observable出现了onError,这些onError通知会被合并成一个CompositeException ,保留在它的 Listexceptions异常列表里。如果只有一个原始observable出现了onError,则会直接使用这个onError通知,而不会生成CompositeException。
见MergeSubscriber里的代码
private void reportError() {
ArrayList list = new ArrayList(this.errors);
if(list.size() == 1) {
this.child.onError((Throwable)list.get(0));
} else {
this.child.onError(new CompositeException(list));
}
}
如果要有针对性地处理错误,需要将异常信息和A,B原始Observable对应起来,可以在原始Observable后面加上doOnError方法来处理错误。mergeDelayError(A.doOnError,B)
或者在doOnError里把异常信息保存到全局变量里,然后推迟到在最终出口处的onError里处理异常。
如果只根据异常类型来进行处理,不考虑来自哪个原始Observable,
可以
public void onError(Throwable e) {
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
e = compositeException.getExceptions().get(0);
}
super.onError(e);
}
mergeDelayError与线程切换
Observable.mergeDelayError(Observable.error(new Exception("错误")),Observable.just("数据"))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
这样先mergeDelayError再指定线程的话,mergeDelayError没有起到延迟通知onError的作用,第一个observable出现错误的时候,整个合并的observable也onError了,第二个observable无法输出。
但是如果改成每个observable单独subscribeOn和observeOn,然后再mergeDelayError,就正常进行了。
Observable.mergeDelayError(Observable.error(new Exception("错误"))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()),
Observable.just("数据").subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribe(new Subscriber<String>() {