近段时间发现公司项目出现一个如下的错误
RxCachedThreadScheduler-2
java.io.InterruptedIOException
interrupted
io.reactivex.exceptions.UndeliverableException:java.io.InterruptedIOException: interrupted
经过Google发现针对该问题的如下总结解释,分享出来供大家交流
我想分享我以前没有注意到的 UndeliverableException 的经验。
TLDR
即使在处理 ObservableSource、SingleSource 的 onError() 函数时,我们也需要处理 RxJavaPlugins.setErrorHandler()
今天,从 Firebase Testlab 收到一个关于UndeliverableException的错误,如下所示
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling |
com.testapp: Waiting for the face detection model to be downloaded. Please wait.
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
错误很明显,但我无法快速找出问题,因为每次我重新运行测试流程时,总是出现在异常另一个测试中,但我认为在 handleException() 中“我已经处理了错误”。
observable.subscribe({
TODO()
}, {
handleException(it)
}, {
TODO()
})
但是在访问错误处理文档后才明白了原因。 文档描述了可抛出的错误在 observable 处理/终止后不会被静默处理
在stream中,如果在流终止或disposed后发生错误,仍然可以获得异常。 那我们应该怎么处理?让我用简单的例子来展示它
Observable.create<String> {
it.onNext("test")
it.onComplete()
}.subscribe({
println(it)
}, {
println(it)
}, {
println("onCompleted")
})
它按预期打印“test”和“onCompleted”。 但是如果我们在调用 onComplete() 函数之后调用 onError() 会发生什么?
Observable.create<String> {
it.onNext("test")
it.onComplete()
it.onError(IOException()) // added this line
}.subscribe({
println(it)
}, {
println(it)
}, {
println("onCompleted")
})
UndeliverableException thrown 🎉🎉🎉🎉
到目前为止一切顺利,让我们看看 CreateEmitter 源代码以了解它发生的原因。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
tryOnError implementation
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
所以在 tryOnError()函数中,如果没有被处理,尝试调用观察者的 onError()函数。 在另一种情况下,它调用 RxJavaPlugins.onError 导致抛出 UndeliverableException。
在分享解决方法之前,我还想展示 RxJavaPlugins.onError 函数的实现
因此,如果在调用emitter的disposed之后出现error,那么onError()函数将会被调用,并且如果 throwable 不是 bug,则 throwable 用 UndeliverableException 包装并调用当前线程的**uncaughtException() **函数。
PS:isBug()函数对于以下异常类型返回 true;
OnErrorNotImplementedException、MissingBackpressureException、IllegalStateException、NullPointerException、IllegalArgumentException、CompositeException
那么我们应该如何处理这种问题?
我们需要重写RxJavaPlugins.setErrorHandler()函数来处理这种异常。
RxJavaPlugins.setErrorHandler {
TODO("handle exception")
}
如果您不想处理且忽略它,您还可以使用库提供的 emptyConsumer()函数。
通过库的文档,您可以处理如下错误