【译】对RxJava中.repeatWhen()和.retryWhen()操作符的思考

第一次见到.repeatWhen().retryWhen()这两个操作符的时候就非常困惑了。不得不说,它们绝对是“最令人困惑弹珠图”的有力角逐者。

然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable。我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们)。

Repeat与Retry的对比

首先,来了解一下.repeat().retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重订阅。

.repeat()接收到.onCompleted()事件后触发重订阅。

.retry()接收到.onError()事件后触发重订阅。

然而,这种简单的叙述尚不能令人满意。试想如果你要实现一个延迟数秒的重订阅该如何去做?或者想通过观察错误来决定是否应该重订阅呢?这种情况下就需要.repeatWhen().retryWhen()的介入了,因为它们允许你为重试提供自定义逻辑。

Notification Handler

你可以通过一个叫做notificationHandler的函数来实现重试逻辑。这是.retryWhen()的方法签名(译者注:方法签名,指方法名称、参数类型和参数数量等):

retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler) 

签名很长,甚至不能一口气读完。我发现它很难理解的原因是因为存在一大堆的泛型约定。

简化后,它包括三个部分:

  1. Func1像个工厂类,用来实现你自己的重试逻辑。
  2. 输入的是一个Observable<Throwable>
  3. 输出的是一个Observable<?>

首先,让我们来看一下最后一部分。被返回的Observable<?>所要发送的事件决定了重订阅是否会发生。如果发送的是onCompleted或者onError事件,将不会触发重订阅。相对的,如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件)。这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。

source每次一调用onError(Throwable)Observable<Throwable>都会被作为输入传入方法中。换句话说就是,它的每一次调用你都需要决定是否需要重订阅。

当订阅发生的时候,工厂Func1被调用,从而准备重试逻辑。那样的话,当onError被调用后,你已经定义的重试逻辑就能够处理它了。

这里有个例子展示了我们应该在哪些场景下订阅source,比如,只有在ThrowableIOException的情况下请求重订阅,否则不(重订阅)。

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
          @Override public Observable<?> call(Observable<? extends Throwable> errors) {

            return errors.flatMap(new Func1<Throwable, Observable<?>>() {
              @Override public Observable<?> call(Throwable error) {

                // For IOExceptions, we  retry
                if (error instanceof IOException) {
                  return Observable.just(null);
                }

                // For anything else, don't retry
                return Observable.error(error);
              }
            });
          }  
        })

由于每一个error都被flatmap过,因此我们不能通过直接调用.onNext(null)触发重订阅或者.onError(error)来避免重订阅。

经验之谈

这里有一些关于.repeatWhen().retryWhen()的要点,我们应该牢记于心。

  • .repeatWhen().retryWhen()非常相似,只不过不再响应onError作为重试条件,而是onCompleted。因为onCompleted没有类型,所有输入变为Observable<Void>

  • 每一次事件流的订阅notificationHandler(也就是Func1)只会调用一次。这也是讲得通的,因为你有一个可观测的Observable<Throwable>,它能够发送任意数量的error。

  • 输入的Observable必须作为输出Observable的源。你必须对Observable<Throwable>做出反应,然后基于它发送事件;你不能只返回一个通用泛型流。

换言之就是,你不能做类似的操作:

 .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return Observable.just(null);}
            })

因为它不仅不能奏效,而且还会打断你的链式结构。你应该做的是,而且至少应该做的是,把输入作为结果返回,就像这样:

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors;
              }
            })

(顺便提一下,这在逻辑上与单纯使用.retry()操作符的效果是一样哒)

  • 输入Observable只在终止事件发生的时候才会触发(对于.repeatWhen()来说是onCompleted,而对于.retryWhen()来说是onError)。它不会从源中接收到任何onNext的通知,所以你不能通过观察被发送的事件来决定重订阅。如果你真的需要这样做,你应该添加像.takeUntil()这样的操作符,来拦截事件流。

使用方式

现在,假设你已大概了解了.repeatWhen().retryWhen(),那么你能将一些什么样的精简逻辑放入到notificationHandler中呢?

使用.repeatWhen() + .delay()定期轮询数据:

source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Void> completed) {

                return completed.delay(5, TimeUnit.SECONDS);
              }
            })

直到notificationHandler发送onNext()才会重订阅到source。因为在发送onNext()之前delay了一段时间,所以优雅的实现了延迟重订阅,从而避免了不间断的数据轮询。

非此即彼,使用.flatMap() + .timer()实现延迟重订阅:
(译者注:在RxJava 1.0.0及其之后的版本,官方已不再提倡使用.timer()操作符,因为.interval()具有同样的功能)

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors.flatMap(new Func1<Throwable, Observable<?>>() {
                  @Override public Observable<?> call(Throwable error) {

                    return Observable.timer(5, TimeUnit.SECONDS);
                  }
                });
              }
            })

当需要与其他逻辑协同的时候,这种替代方案就变得非常有用了,比如。。。

使用.zip() + .range()实现有限次数的重订阅

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                  @Override public Integer call(Throwable throwable, Integer i) {

                    return i;
                  }
                });
              }
            })

最后的结果就是每个error都与range中一个输出配对出现,就像这样:

zip(error1, 1) -> onNext(1)  <-- Resubscribe  
zip(error2, 2) -> onNext(2)  <-- Resubscribe  
zip(error3, 3) -> onNext(3)  <-- Resubscribe  
onCompleted()                <-- No resubscription  

因为当第四次error出现的时候,range(1,3)中的数字已经耗尽了,所以它隐式调用了onCompleted(),从而导致整个zip的结束。防止了进一步的重试。

将可变延迟策略与次数限制的重试机制结合起来

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                  @Override public Integer call(Throwable throwable, Integer i) {

                    return i;
                  }
                }).flatMap(new Func1<Integer, Observable<? extends Long>>() {
                  @Override public Observable<? extends Long> call(Integer retryCount) {
                    
                    return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
                  }
                });
              }
            })

在这种用例的比较上,我认为.flatMap()+.timer()的组合比单纯使用.delay()更可取,因为我们可以通过重试次数来修改延迟时间。重试三次,并且每一次的重试时间都是5 ^ retryCount,仅仅通过一些操作符的组合就帮助我们实现了指数退避算法(译者注:可参考二进制指数退避算法)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容