RxJava+Retrofit Samples解析

扔物线大神写的示例代码:RxJavaSamples
apk下载地址:RxJavaSamples.apk
光看代码虽然看得懂,但是要自己写起来又感觉力不从心,所以决定将代码解析记录一下,加深印象。

1.基本使用

RxJava和Retrofit结合使用最基本的格式:用 subscribeOn()observeOn()来控制线程,并通过 subscribe()来触发网络请求的开始。代码大致形式:

api.getData()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

其中的observer是(后面例子中的observer基本相同不再列举):

Observer<List<Images>> observer = new Observer<List<Images>>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
              //失败处理
            swipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(List<ZhuangbiImage> images) {
             //成功处理
            swipeRefreshLayout.setRefreshing(false);
            adapter.setImages(images);
        }
    };

2.转换(map)

有些服务端的接口设计,会在返回的数据外层包裹一些额外信息,这些信息对于调试很有用,但本地显示是用不到的。使用 map()可以把外层的格式剥掉,只留下本地会用到的核心格式。当然,map() 也可以用于基于其他各种需求的格式转换。代码大致形式:

api.getData()
    .map(response->response.data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

3.压合(zip)

有的时候,app 中会需要同时访问不同接口,然后将结果糅合后转为统一的格式后输出(例如将第三方广告 API 的广告夹杂进自家平台返回的数据 List 中)。这种并行的异步处理比较麻烦,不过用了 zip() 之后就会简单得多。代码大致形式:

Observable.zip(api.getData(),adApi.getAds(),zipFunc())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

4.一次性 token(flatMap)

出于安全性、性能等方面的考虑,多数服务器会有一些接口需要传入 token 才能正确返回结果,而 token 是需要从另一个接口获取的,这就需要使用两步连续的请求才能获取数据(①token -> ②目标数据)。使用 flatMap() 可以用较为清晰的代码实现这种连续请求,避免 Callback 嵌套的结构。代码大致形式:

api.getToken().flatMap(token -> api.getData(token))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

5.非一次性 token(retryWhen)

有的 token 并非一次性的,而是可以多次使用,直到它超时或被销毁(多数 token 都是这样的)。这样的 token 处理起来比较麻烦:需要把它保存起来,并且在发现它失效的时候要能够自动重新获取新的 token 并继续访问之前由于 token 失效而失败的请求。如果项目中有多处的接口请求都需要这样的自动修复机制,使用传统的 Callback 形式需要写出非常复杂的代码。而使用 RxJava ,可以用 retryWhen() 来轻松地处理这样的问题。代码大致形式:

api.getData(token)
    .retryWhen(observable ->
        observable.flatMap( ->
            api.getToken()
                .doOnNext(->updateToken())))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

上面几个例子都比较简单,而这个有点小复杂,所以详细记录一下,具体代码如下:

 subscription = Observable.just(null)
                .flatMap(new Func1<Object, Observable<FakeThing>>() {
                    @Override
                    public Observable<FakeThing> call(Object o) {
                        //判断cachedToken是否为空
                        //是的话就发送一个error的observable
                        //否的话就用该token去取得数据
                        return cachedFakeToken.token == null
                                ? Observable.<FakeThing>error(new NullPointerException("Token is null!"))
                                : fakeApi.getFakeData(cachedFakeToken);
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            //这里接收到一个error的Observable
                            public Observable<?> call(Throwable throwable) {
                                if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {
                                    //执行获取token,并且再次执行上面的通过token获取数据操作
                                    return fakeApi.getFakeToken("fake_auth_code")
                                            .doOnNext(new Action1<FakeToken>() {
                                                @Override
                                                public void call(FakeToken fakeToken) {
                                                    tokenUpdated = true;
                                                    cachedFakeToken.token = fakeToken.token;
                                                    cachedFakeToken.expired = fakeToken.expired;
                                                }
                                            });
                                }
                                //其他错误
                                return Observable.just(throwable);
                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer);

6.缓存(BehaviorSubject)

RxJava 中有一个较少被人用到的类叫做 Subject,它是一种『既是 Observable,又是 Observer』的东西,因此可以被用作中间件来做数据传递。例如,可以用它的子类 BehaviorSubject 来制作缓存。代码大致形式:

api.getData().subscribe(behaviorSubject); // 判断cache为空则获取数据,网络数据会被缓存
behaviorSubject.subscribe(observer);// 之前的缓存将直接送达observer

具体例子:

BehaviorSubject<List<Item>> cache;
public Subscription subscribeData(@NonNull Observer<List<Item>> observer) {
         //判断内存缓存是否为空
        if (cache == null) {
            cache = BehaviorSubject.create();
            Observable.create(new Observable.OnSubscribe<List<Item>>() {
                @Override
                public void call(Subscriber< ? super List<Item>> subscriber) {
                    List<Item> items = Database.getInstance().readItems();
                    //判断硬盘缓存是否为空
                    if (items == null) {
                        //从网络读取数据
                        loadFromNetwork();
                    } else {
                        //发送硬盘数据
                        subscriber.onNext(items);
                    }
                }
            }).subscribeOn(Schedulers.io())
              .subscribe(cache);
        } 
        return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
    }

  subscription = subscribeData(new Observer<List<Item>>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                        swipeRefreshLayout.setRefreshing(false);
                        Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onNext(List<Item> items) {
                        swipeRefreshLayout.setRefreshing(false);
                        adapter.setItems(items);
                    }
                });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,570评论 7 62
  • 文章转自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物线在正...
    xpengb阅读 7,061评论 9 73
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 173,392评论 25 708
  • __IPHONE_OS_VERSION_MIN_REQUIRED:当前系统支持的最小版本 __IPHONE_OS_...
    不敢告诉她阅读 360评论 0 0
  • 咿呀学语 小脚蹒跚 被子就是一座山 翻滚过去 所有的欣喜瞬间绽放 青春来临 步履矫健 学业就是一座山 高考结束 应...
    小团子妈妈阅读 180评论 0 0