RxJava2 复杂异步场景之——Token前置请求

RxJava的用武之地

Rxjava这个库和其他常见库不太一样,一般的库例如Glide,ButterKnife都是为了解决实际问题出现的,一定程度上是刚需。Glide库如果不用他,那么应用自己就要处理图片下载、压缩、内存管理、多级缓存等等复杂的逻辑。这类问题复杂而常见,而像Glide这类的轮子,Api的设计都比较友好,一个简单的api调用就能完成一个原本很复杂的功能,简直不要太爽。

Glide.with(context)
    .load(url)//图片加载
    .crossFade()//动画设置
    .placeholder(R.drawable.place_image)//占位图
    .error(R.drawable.error_image)//失败占位图
    .override(width,height)//图片裁剪
    .thumbnail(thumbnailRequest)//配置缩略图
    .diskCacheStrategy(DiskCacheStrategy.SOURCE)//缓存策略
    .into(imageView);

而Rxjava,你刚开始看起来,都不知道他是干什么的。“异步处理”?不是一般都使用观察者模式吗?AsyncTask,Handler也可以,要rxjava干嘛?如果你有兴趣研究过一点rxjava,会发现网上的教程都会说:"zip map flatmap debounce等操作符把异步回调变得‘简洁’‘优雅’",然后对比一下原来的代码和使用rxjava后的代码,最后感叹一下rxjava设计的鬼才和功能的强大。我自己在初次接触rxjava时也感觉,这些rxjava的优点描述比较空洞,这项技术的意义大于实用。
实际情况是这样么?在具体开发中,异步调用给我们的最大困扰是:异步回调的时间并不可控。当有多个异步回调时,这些调用相互联系和依赖,搞清楚每个回调何时返回是个重要的问题。在每个关键时间节点对‘分散的callback’做正确的事,有过类似编程经验的人都知道,是非常痛苦的事,如果还想代码容易看懂,简直是疯了。


image

rxjava号称异步调用的终极解决方案,能否解决以上困扰?随着学习和应用的深入,体会会更明显。以下会用一个稍复杂的例子,实操一个复杂异步场景,看看rxjava处理的怎么样。

典型复杂异步场景 -- Token的前置校验

经常遇到这种需求,接口的请求依赖token信息。一个请求需要先请求token(token如果存在缓存则使用缓存),依赖这个token才能进行正常网络请求。这个token有一定的时效性,在时效性内可以使用缓存,过期后需要重新请求token并重新发起一次请求。这个流程可以归纳如下图:


token前置请求.png

光看这些需求,是不是觉得已经够你喝一壶了,别忙,还有些潜在的逻辑这个图没有表现出来:
1 高并发网络请求时,如果token正在请求,需要对请求阻塞(token请求过程中,不再接受新的token请求)
2 阻塞的同时,要把这些请求记录下来,token请求成功后,再‘依次’发送这些阻塞的请求。
3 token失效情况下,网络请求限制重试次数。(防止递归调用)
4 token请求本身,重试策略需单独配置。

不使用rxjava,我们如何实现上述需求:

1、网络请求前,对token是否有缓存判断,如果没有先请求token,并把这个请求阻塞且缓存
2、token请求过程中,如果有新的token请求进来,加入阻塞队列
3、token请求后,通知阻塞的队列(广播等方式),依次进行阻塞的请求
4、对两种次数限制,分别做逻辑判断

以上就是传统实现方法,就不贴代码了,这样实现有以下特点:
1、要时刻维护一个阻塞队列 (注意其添加和清空的时机)
2、token请求结束后,有一个回调机制通知阻塞队列,(这个回调需要注册和反注册)
3、两处的次数限制,次数维护的变量,不好维护(一般动态秘钥为了便于使用会做成单例,单例内的变量类似static,维护较复杂)
4、请求重试的逻辑不好实现,

我们可以看到这里涉及到很多静态变量的维护,广播等异步回调的处理,这种情况一多,编程者会变得很被动。而且token的异步请求和真正的网络异步请求杂糅在一起,增大了问题的复杂性。

我们来看下rxjava如何处理:

一些代码网络请求部分与前一篇博客《基于RxJava Retrofit的网络框架》相关。

先看看完整的请求过程
public static <R> Observable send(final MapiHttpRequest request, final MapiTypeReference<R> t){
    return Observable.defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() throws Exception {
                    //传入token缓存
                    return Observable.just(Store.sToken);
                }
            }).flatMap(new Function<String, ObservableSource<R>>() {
                @Override
                public ObservableSource<R> apply(String key) throws Exception {
                    if(TextUtils.isEmpty(key) && !request.skipCheckKeyValid()){
                        //token没有缓存,需要请求Token
                        return Observable.<R>error(new KeyNotValidThrowable());
                    } else {
                        //Token存在缓存,直接请求
                        return sendRequestInternal(request,t);
                    }
                }
            })
            //进入失败重试流程
            .retryWhen(new Function<Observable<? extends Throwable>, ObservableSource<String>>() {
                private int retryCount = 0;
                @Override
                public ObservableSource<String> apply(Observable<? extends Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(Throwable throwable) throws Exception {
                            if (throwable instanceof KeyNotValidThrowable){
                                //同一Request,有过一次KeyNotValidThrowable,则不再重试
                                if (retryCount > 0){
                                    return Observable.error(throwable);
                                } else {
                                //token缓存不在,进入TokenLoader请求token
                                    retryCount++;
                                    return TokenLoader.getInstance().getNetTokenLocked();
                                }
                            } else if (throwable instanceof ApiException){
                                  //token过期的情况,重新获取token,并重试
                                  ApiException apiException = (ApiException)throwable;
                                  if (apiException.getCode() == MapiResultCode.SECRETKEY_EXPIRED.value()){
                                      if (retryCount > 0){
                                          return Observable.error(throwable);
                                      } else {
                                          //token缓存失效,进入TokenLoader请求token
                                          retryCount++;
                                          return DynamicKeyLoader.getInstance().getNetTokenLocked();
                                      }
                                  }
                            }
                            //其他类型错误,直接抛出,不再重试
                            return Observable.error(throwable);
                        }
                    });
                }
            });
}

也许你第一次看也挺晕,别怕,你顺着注释捋捋逻辑,是不是感觉代码的实现好像画了一个时序图。
除了注释以外,几点说明:
1、defer操作符的作用是在retry时,会重新创建新的Observable,否则会使用上次的Observable,不会重新获取Store.sToken
2、retryWhen操作符,与sendRequestInternal内部统一配置的retryWhen并不冲突,相当于二次retry
3、retryWhen中如果抛出error ,则不再重试;
4、重试请求,通过返回getNetTokenLocked这个subject实现。(下面详述)

阶段总结:

整体的流程被压缩到了一个函数中,rxjava本身的retrywhen和subject机制,已经替我们完成了这么几点:
1、自动重试的注册和反注册,subject被回调完直接失效,再次请求要重新注册。
2、高并发request,维护队列,通过mTokenObservable的回调自动解决了这个问题
3、retry次数的维护,由于每次request的retry都是重新创建的内部类,所以变量的维护变的简单。
4、重试的逻辑被retry操作符自动实现了,只要重写retry的返回值就可以控制重试的策略。

TokenLoader:Token的获取过程
public class TokenLoader {

    public static final String TAG = TokenLoader.class.getSimpleName();

    private AtomicBoolean mRefreshing = new AtomicBoolean(false);
    private PublishSubject<String> mPublishSubject;
    private Observable<String> mTokenObservable;

    private TokenLoader() {
        final TokenRequest request = new TokenRequest(CarOperateApplication.getInstance());
        mTokenObservable = Observable
                  .defer(new Callable<ObservableSource<TokenRequest>>() {
                      @Override
                      public ObservableSource<TokenRequest> call() throws Exception {
                          return Observable.just(request);
                      }
                  })
                  .flatMap(new Function<TokenRequest, ObservableSource<MapiHttpResponse<Boolean>>>() {
                      @Override
                      public ObservableSource<MapiHttpResponse<Boolean>> apply(RefreshKeyRequest refreshKeyRequest) throws Exception {
                          //Token请求接口
                          return ApiHelper.sendDynamicKey(refreshKeyRequest,new MapiTypeReference<MapiHttpResponse<Boolean>>(){});
                      }
                  })
                  .retryWhen(new Function<Observable<Throwable>, ObservableSource<TokenRequest>>() {
                      private int retryCount = 0;
                      @Override
                      public ObservableSource<TokenRequest> apply(Observable<Throwable> throwableObservable) throws Exception {
                          return throwableObservable.flatMap(new Function<Throwable, ObservableSource<TokenRequest>>() {
                              @Override
                              public ObservableSource<RefreshKeyRequest> apply(Throwable throwable) throws Exception {
                                  retryCount++;
                                  if (retryCount == 3){
                                      //失败次数达到阈值,更改请求策略
                                      request.setFlag(0);
                                      return Observable.just(request);
                                  } else if (retryCount > 3){
                                      //失败次数超过阈值,抛出失败,放弃请求
                                      mRefreshing.set(false);
                                      return Observable.error(throwable);
                                  } else {
                                      //再次请求token
                                      return Observable.just(request);
                                  }
                              }
                          });

                      }
                  })
    //                      .delay(6000, TimeUnit.MILLISECONDS) //模拟token请求延迟
                  .map(new Function<MapiHttpResponse<Boolean>,String>() {
                      @Override
                      public String apply(MapiHttpResponse<Boolean> response) throws Exception {
                          //成功,保存token缓存
                          if (response.getContent().booleanValue() == true){
                              setCacheToken(response.getToken());
                          } else if (response.getContent().booleanValue() == false){
                              setCacheToken(UcarK.getSign());
                          }
                          //请求完成标识
                          mRefreshing.set(false);
                          return getCacheToken();
                      }
                  });
    }

    public static TokenLoader getInstance() {
        return Holder.INSTANCE;
    }

    private static class Holder {
        private static final TokenLoader INSTANCE = new TokenLoader();
    }

    public String getCacheToken() {
        return Store.sToken;
    }

    public void setCacheToken(String key){
        Store.sToken = key;
    }

    /**
     *
     * @return
     */
    public Observable<String> getNetTokenLocked() {
        if (mRefreshing.compareAndSet(false, true)) {
            Log.d(TAG, "没有请求,发起一次新的Token请求");
            startTokenRequest();
        } else {
            Log.d(TAG, "已经有请求,直接返回等待");
        }
        return mPublishSubject;
    }

    private void startTokenRequest() {
        mPublishSubject = PublishSubject.create();
        mTokenObservable.subscribe(mPublishSubject);
    }

}

还是读注释,除了注释以外,几点说明:
1、mRefreshing的作用是在token请求过程中,不再允许新的token请求,
变量采用原子类,而非boolean;这样在多线程环境下,原子类的方法是线程安全的。
compareAndSet(boolean expect, boolean update)这个方法两个作用
1)比较expect和mRefresh是否一致
2)将mRefreshing置为update

2、startTokenRequest()方法开启token请求,注意Observable在subscribe时才正式开始

3、这里使用了PublishSubject较为关键,在rxjava中Subject既是observable,又是observer,在TokenLoader中,mPublishSubject是mTokenObservable的观察者,token请求的会由mPublishSubject响应,同时mPublishSubject也作为Observable返回给TokenLoader的调用者作为retryWhen的返回值返回。(所以这里PublishSubject的泛型与send()方法中Observable的泛型应该是一致的)

4、对于mRefreshing是true的情况,直接返回mPublishSubject,这样每个阻塞的请求retryWhen都会等待mPublishSubject的返回值,回调通知的顺序与加入阻塞的顺序是队列关系(先请求的接口,先回调),满足我们的需求。

最后:
感觉怎么样,是豁然开朗还是越陷越深,不管那样都没有关系,你需要的是了解还存在另一种处理异步任务的方法。在你下一次遇到同样让你头疼的问题时,你可以把这篇文章拿起来再看看,也许你的头疼会好一点了。。。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容