扔物线大神写的示例代码:RxJavaSamples
apk下载地址:RxJavaSamples.apk
光看代码虽然看得懂,但是要自己写起来又感觉力不从心,所以决定将代码解析记录一下,加深印象。
1.基本使用
RxJava和Retrofit结合使用最基本的格式:用 subscribeOn()
和observeOn()
来控制线程,并通过 subscribe()
来触发网络请求的开始。代码大致形式:
api.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
其中的observer是(后面例子中的observer基本相同不再列举):
Observer<List<Images>> observer = new Observer<List<Images>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//失败处理
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(List<ZhuangbiImage> images) {
//成功处理
swipeRefreshLayout.setRefreshing(false);
adapter.setImages(images);
}
};
2.转换(map)
有些服务端的接口设计,会在返回的数据外层包裹一些额外信息,这些信息对于调试很有用,但本地显示是用不到的。使用 map()
可以把外层的格式剥掉,只留下本地会用到的核心格式。当然,map()
也可以用于基于其他各种需求的格式转换。代码大致形式:
api.getData()
.map(response->response.data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
3.压合(zip)
有的时候,app 中会需要同时访问不同接口,然后将结果糅合后转为统一的格式后输出(例如将第三方广告 API 的广告夹杂进自家平台返回的数据 List 中)。这种并行的异步处理比较麻烦,不过用了 zip()
之后就会简单得多。代码大致形式:
Observable.zip(api.getData(),adApi.getAds(),zipFunc())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
4.一次性 token(flatMap)
出于安全性、性能等方面的考虑,多数服务器会有一些接口需要传入 token 才能正确返回结果,而 token 是需要从另一个接口获取的,这就需要使用两步连续的请求才能获取数据(①token -> ②目标数据)。使用 flatMap()
可以用较为清晰的代码实现这种连续请求,避免 Callback 嵌套的结构。代码大致形式:
api.getToken().flatMap(token -> api.getData(token))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
5.非一次性 token(retryWhen)
有的 token 并非一次性的,而是可以多次使用,直到它超时或被销毁(多数 token 都是这样的)。这样的 token 处理起来比较麻烦:需要把它保存起来,并且在发现它失效的时候要能够自动重新获取新的 token 并继续访问之前由于 token 失效而失败的请求。如果项目中有多处的接口请求都需要这样的自动修复机制,使用传统的 Callback 形式需要写出非常复杂的代码。而使用 RxJava ,可以用 retryWhen() 来轻松地处理这样的问题。代码大致形式:
api.getData(token)
.retryWhen(observable ->
observable.flatMap( ->
api.getToken()
.doOnNext(->updateToken())))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
上面几个例子都比较简单,而这个有点小复杂,所以详细记录一下,具体代码如下:
subscription = Observable.just(null)
.flatMap(new Func1<Object, Observable<FakeThing>>() {
@Override
public Observable<FakeThing> call(Object o) {
//判断cachedToken是否为空
//是的话就发送一个error的observable
//否的话就用该token去取得数据
return cachedFakeToken.token == null
? Observable.<FakeThing>error(new NullPointerException("Token is null!"))
: fakeApi.getFakeData(cachedFakeToken);
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
//这里接收到一个error的Observable
public Observable<?> call(Throwable throwable) {
if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {
//执行获取token,并且再次执行上面的通过token获取数据操作
return fakeApi.getFakeToken("fake_auth_code")
.doOnNext(new Action1<FakeToken>() {
@Override
public void call(FakeToken fakeToken) {
tokenUpdated = true;
cachedFakeToken.token = fakeToken.token;
cachedFakeToken.expired = fakeToken.expired;
}
});
}
//其他错误
return Observable.just(throwable);
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
6.缓存(BehaviorSubject)
RxJava 中有一个较少被人用到的类叫做 Subject
,它是一种『既是 Observable,又是 Observer』的东西,因此可以被用作中间件来做数据传递。例如,可以用它的子类 BehaviorSubject
来制作缓存。代码大致形式:
api.getData().subscribe(behaviorSubject); // 判断cache为空则获取数据,网络数据会被缓存
behaviorSubject.subscribe(observer);// 之前的缓存将直接送达observer
具体例子:
BehaviorSubject<List<Item>> cache;
public Subscription subscribeData(@NonNull Observer<List<Item>> observer) {
//判断内存缓存是否为空
if (cache == null) {
cache = BehaviorSubject.create();
Observable.create(new Observable.OnSubscribe<List<Item>>() {
@Override
public void call(Subscriber< ? super List<Item>> subscriber) {
List<Item> items = Database.getInstance().readItems();
//判断硬盘缓存是否为空
if (items == null) {
//从网络读取数据
loadFromNetwork();
} else {
//发送硬盘数据
subscriber.onNext(items);
}
}
}).subscribeOn(Schedulers.io())
.subscribe(cache);
}
return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
}
subscription = subscribeData(new Observer<List<Item>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(List<Item> items) {
swipeRefreshLayout.setRefreshing(false);
adapter.setItems(items);
}
});