RxJava的错误处理一-retry

RxJava的错误处理主要分为两类,retry系列以及onErrorRetrun系列。retry系列是当错误的时候,重新subscribe。onErrorReturn系列则是当出错了返回数据到onNext中。本文介绍下retry系列相关用法。

retry

retry系列的操作符主要有retry()
, retry(long)
retry(Func2)
retry(n)当发生onError的时候会重试n次,例如如下代码:

@Test
public void testRetry() {
    final AtomicInteger atomicInteger = new AtomicInteger(0);
    Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext(String.valueOf(System.currentTimeMillis()));
            subscriber.onError(new Error("error"));
        }
    })
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            atomicInteger.incrementAndGet();
        }
    })
    .retry(2)
    .toBlocking()
    .subscribe(new TestSubscriber<String>());
    Assert.assertTrue(atomicInteger.intValue() == 3);
}

初始化atomicInteger为0,在doOnSubscribe加一,重试次数为2次,所以最终相当于onSubscribe执行了3次。

retryWhen

另外一个方法retryWhen的方法是根据得到的Throwable生成新的Observerable, 示例代码如下:

@Test
public void testRetryWhen() {
    final AtomicInteger atomicInteger = new AtomicInteger(3);
    Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext(String.valueOf(System.currentTimeMillis()));
            subscriber.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
        }
    })
    .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
        @Override
        public Observable<?> call(Observable<? extends Throwable> observable) {
            return observable.takeWhile(new Func1<Throwable, Boolean>() {
                @Override
                public Boolean call(Throwable throwable) {
                    return Integer.parseInt(throwable.getMessage()) > 0;
                }
            })
            .flatMap(new Func1<Throwable, Observable<?>>() {
                @Override
                public Observable<?> call(Throwable throwable) {
                    return Observable.timer(1, TimeUnit.SECONDS);
                }
            });
        }
    })
    .toBlocking()
    .subscribe(new TestSubscriber<String>());
    Assert.assertEquals(atomicInteger.intValue(), 0);
}

这里接受到throwableObserverable后,用takeWhile来判断thrwoable的属性,这里用一个AtomicInteger,设置最大重试次数为3,每次减1,当等于0则不再重试,再现实生活中,也可以判断Exception的类型等方式判断是否需要重试。接着用flatMap返回Observerable.timer来延迟重试到1秒以后。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 0. 概述 感谢:对RxJava中.repeatWhen()和.retryWhen()操作符的思考[http://...
    CokeNello阅读 753评论 3 1
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,656评论 7 62
  • 文章转自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物线在正...
    xpengb阅读 7,076评论 9 73
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,253评论 19 139
  • 早上睁开眼睛,看见手机屏幕上一条短信,“你的套餐语音通话时长已用完,流量剩余2.89G,请补充交费”,我去,脑残的...
    忧郁的老虎阅读 937评论 3 7