一、最基本的使用格式:
用subscribeOn()和observerOn()来控制线程,并通过subscribe()来触发网络请求的开始。代码大致形式:
disposable = api.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
样例:
1、导入库Rxjava和Retrofit库:
//okHttp log拦截器
implementation 'com.squareup.okhttp3:logging-interceptor:3.4.1'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
2、第二步:网络请求接口api转成接口interface
public interface ZhuangbiApi {
@GET("search")
Observable<List<ZhuangbiImage>> search(@Query("q") String query);
}
3、第三步:api接口创建实现类
public class NetWork {
private static ZhuangbiApi zhuangbiApi;
private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();
public static OkHttpClient getOkHttpClient() {
//新建log拦截器
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
@Override
public void log(String message) {
Log.d("zcb", "OkHttp====Message:" + message);
}
});
//日志显示级别
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
//定制OkHttp
OkHttpClient.Builder httpClientBuilder = new OkHttpClient
.Builder();
//OkHttp进行添加拦截器loggingInterceptor
httpClientBuilder.addInterceptor(loggingInterceptor);
return httpClientBuilder.build();
}
public static ZhuangbiApi getZhuangbiApi() {
if (zhuangbiApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(getOkHttpClient())
.baseUrl("http://www.zhuangbi.info/")
.addConverterFactory(gsonConverterFactory)
.addCallAdapterFactory(rxJavaCallAdapterFactory)
.build();
zhuangbiApi = retrofit.create(ZhuangbiApi.class);
}
return zhuangbiApi;
}
}
4、第四步使用
disposable = Network.getZhuangbiApi()
.search(key)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ZhuangbiImage>>() {
@Override
public void accept(@NonNull List<ZhuangbiImage> images) throws Exception {
swipeRefreshLayout.setRefreshing(false);
adapter.setImages(images);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
二、转换(map):
有些服务端的接口设计,会在返回的数据外层包裹一些额外信息,这些信息对于调试很有用,但本地显示是用不到的。使用map()可以把外层的格式剥掉,只留下本地会用到的核心格式。(当然,map()也可以用于基于其他各种需求的格式转换)代码大致形式:
disposable = api.getData()
.map(response->response.data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
需实现Function接口重写apply方法进行转换
public class GankBeautyResultToItemsMapper implements Function<GankBeautyResult, List<Item>> {
private static GankBeautyResultToItemsMapper INSTANCE = new GankBeautyResultToItemsMapper();
private GankBeautyResultToItemsMapper() {
}
public static GankBeautyResultToItemsMapper getInstance() {
return INSTANCE;
}
@Override
public List<Item> apply(GankBeautyResult gankBeautyResult) {
List<GankBeauty> gankBeauties = gankBeautyResult.beauties;
List<Item> items = new ArrayList<>(gankBeauties.size());
SimpleDateFormat inputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS'Z'");
SimpleDateFormat outputFormat = new SimpleDateFormat("yy/MM/dd HH:mm:ss");
for (GankBeauty gankBeauty : gankBeauties) {
Item item = new Item();
try {
Date date = inputFormat.parse(gankBeauty.createdAt);
item.description = outputFormat.format(date);
} catch (ParseException e) {
e.printStackTrace();
item.description = "unknown date";
}
item.imageUrl = gankBeauty.url;
items.add(item);
}
return items;
}
}
disposable = Network.getGankApi()
.getBeauties(10, page)
.map(GankBeautyResultToItemsMapper.getInstance())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Item>>() {
@Override
public void accept(@NonNull List<Item> items) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
});
三、压合(zip):
有的时候,app中会需要同时访问不同接口,然后将结果糅合后转为统一的格式后输出(例如将第三方广告的API的广告夹杂自家平台返回的数据List中)。这种并行的异步处理比较麻烦,不过用了zip()之后就会简单得多。代码大致形式:
Observable.zip( api.getData(),adApi.getAds(),zipFunc())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
disposable = Observable.zip(Network.getGankApi().getBeauties(200, 1).map(GankBeautyResultToItemsMapper.getInstance()),
Network.getZhuangbiApi().search("装逼"),
new BiFunction<List<Item>, List<ZhuangbiImage>, List<Item>>() {
@Override
public List<Item> apply(List<Item> gankItems, List<ZhuangbiImage> zhuangbiImages) {
List<Item> items = new ArrayList<Item>();
for (int i = 0; i < gankItems.size() / 2 && i < zhuangbiImages.size(); i++) {
items.add(gankItems.get(i * 2));
items.add(gankItems.get(i * 2 + 1));
Item zhuangbiItem = new Item();
ZhuangbiImage zhuangbiImage = zhuangbiImages.get(i);
zhuangbiItem.description = zhuangbiImage.description;
zhuangbiItem.imageUrl = zhuangbiImage.image_url;
items.add(zhuangbiItem);
}
return items;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Item>>() {
@Override
public void accept(@NonNull List<Item> items) throws Exception {
swipeRefreshLayout.setRefreshing(false);
adapter.setItems(items);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
四、Token(flatMap):
出于安全性、性能等方面的考虑,多数服务器会有一些接口需要传入 token 才能正确返回结果,而 token 是需要从另一个接口获取的,这就需要使用两步连续的请求才能获取数据(①token -> ②目标数据)。使用 flatMap() 可以用较为清晰的代码实现这种连续请求,避免 Callback 嵌套的结构。代码大致形式:
disposable = api.getToken()
.flatMap(token->api.getData(token))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
final FakeApi fakeApi = Network.getFakeApi();
disposable = fakeApi.getFakeToken("fake_auth_code")
.flatMap(new Function<FakeToken, Observable<FakeThing>>() {
@Override
public Observable<FakeThing> apply(FakeToken fakeToken) {
return fakeApi.getFakeData(fakeToken);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FakeThing>() {
@Override
public void accept(FakeThing fakeData) {
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
});
五、Token_高级(retryWhen):
有的 token 并非一次性的,而是可以多次使用,直到它超时或被销毁(多数 token 都是这样的)。这样的 token 处理起来比较麻烦:需要把它保存起来,并且在发现它失效的时候要能够自动重新获取新的 token 并继续访问之前由于 token 失效而失败的请求。如果项目中有多处的接口请求都需要这样的自动修复机制,使用传统的 Callback 形式需要写出非常复杂的代码。而使用 RxJava ,可以用 retryWhen() 来轻松地处理这样的问题。代码大致形式:
disposable = api.getData(token)
.retryWhen(observable->
observable.flatMap(->
api.getToken()
.doOnNext(->updateToken())))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
final FakeApi fakeApi = Network.getFakeApi();
disposable = Observable.just(1)
.flatMap(new Function<Object, Observable<FakeThing>>() {
@Override
public Observable<FakeThing> apply(Object o) {
return cachedFakeToken.token == null
? Observable.<FakeThing>error(new NullPointerException("Token is null!"))
: fakeApi.getFakeData(cachedFakeToken);
}
})
.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> apply(Observable<? extends Throwable> observable) {
return observable.flatMap(new Function<Throwable, Observable<?>>() {
@Override
public Observable<?> apply(Throwable throwable) {
if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {
return fakeApi.getFakeToken("fake_auth_code")
.doOnNext(new Consumer<FakeToken>() {
@Override
public void accept(FakeToken fakeToken) {
tokenUpdated = true;
cachedFakeToken.token = fakeToken.token;
cachedFakeToken.expired = fakeToken.expired;
}
});
}
return Observable.error(throwable);
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FakeThing>() {
@Override
public void accept(FakeThing fakeData) {
swipeRefreshLayout.setRefreshing(false);
String token = cachedFakeToken.token;
if (tokenUpdated) {
token += "(" + getString(R.string.updated) + ")";
}
tokenTv.setText(getString(R.string.got_token_and_data, token, fakeData.id, fakeData.name));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
五、缓存(BehaviorSubject):
RxJava 中有一个较少被人用到的类叫做 Subject,它是一种『既是 Observable,又是 Observer』的东西,因此可以被用作中间件来做数据传递。例如,可以用它的子类 BehaviorSubject 来制作缓存。代码大致形式:
disposable = api.getData()
.subscribe(behaviorSubject);// 网络数据会被缓存
behaviorSubject.subscribe(observer); // 之前的缓存将直接送达 observer。
public Disposable subscribeData(@NonNull Consumer<List<Item>> onNext, @NonNull Consumer<Throwable> onError) {
if (cache == null) {
cache = BehaviorSubject.create();
Observable.create(new ObservableOnSubscribe<List<Item>>() {
@Override
public void subscribe(ObservableEmitter<List<Item>> e) throws Exception {
List<Item> items = Database.getInstance().readItems();
if (items == null) {
setDataSource(DATA_SOURCE_NETWORK);
loadFromNetwork();
} else {
setDataSource(DATA_SOURCE_DISK);
e.onNext(items);
}
}
})
.subscribeOn(Schedulers.io())
.subscribe(cache);
} else {
setDataSource(DATA_SOURCE_MEMORY);
}
return cache.doOnError(new Consumer<Throwable>() {
@Override
public void accept(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception {
cache = null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNext, onError);
}
disposable = Data.getInstance()
.subscribeData(new Consumer<List<Item>>() {
@Override
public void accept(@NonNull List<Item> items) throws Exception {
swipeRefreshLayout.setRefreshing(false);
int loadingTime = (int) (System.currentTimeMillis() - startingTime);
loadingTimeTv.setText(getString(R.string.loading_time_and_source, loadingTime, Data.getInstance().getDataSourceText()));
adapter.setItems(items);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
throwable.printStackTrace();
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
详细代码请参考凯哥RxJavaSamples案例