声明:本文只讲封装,基本用法请参考官方文档或者其他文章~
0. 依赖
2.0已出,但是暂时还没有来得及去看;所以还是那目前项目中在用的来讲吧。
compile 'io.reactivex:rxjava:1.2.4'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.artemzin.rxjava:proguard-rules:1.2.7.0'
compile 'com.trello:rxlifecycle:1.0'
compile 'com.trello:rxlifecycle-components:1.0'
compile 'com.jakewharton.rxbinding:rxbinding:1.0.0'
以下展示目前项目中最常用的几个例子:
1. View相关
点击事件
- 使用
Rx.clicks(mBtn, this, v -> doAction());
- 功能点
- 过滤500ms内的重复点击;
- 绑定当前activity或者fragment的生命周期,避免内存泄漏;
- 封装
//view点击事件,500毫秒过滤重复点击
public static void clicks(View view, BaseActivity activity, final Action1<Void> onNext) {
RxView.clicks(view)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.compose(activity.bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNext, e -> e.printStackTrace());
}
输入框文字变动
- 使用
Rx.afterTextChangeEvents(this, mEditText, event -> doAction());
- 功能点
- 过滤500ms内的请求,尤其是当输入框文字变动后需要进行网络请求时,可以有效避免产生大量请求;
- 绑定当前activity或者fragment的生命周期,避免内存泄漏;
- 封装
//TextView watcher,间隔500毫秒
public static void afterTextChangeEvents(BaseActivity activity, TextView textView, Action1<TextViewAfterTextChangeEvent> onNext) {
RxTextView.afterTextChangeEvents(textView)
.throttleLast(500, TimeUnit.MILLISECONDS)
.compose(activity.bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNext, e -> e.printStackTrace());
}
2. 网络请求(划重点!)
- 使用
API.uploadImg(picInfo) //以上传图片为例
.compose(RxTransformers.doApi(this))
.subscribe((Result result) -> {
//to do
});
简洁到爆炸有木有!!!
- 功能点
- 保留原有的链式调用方式;
-
subscribe()
中可以仅仅传入onNext
,onError
和onComplete
可选,所以再加上lamda加持,做到代码最简洁~(原先若只传入onNext
而不传入onError
,当网络异常或者onNext
执行发生异常时,会抛出OnErrorNotImplementedException
); - 线程切换;
- 绑定当前页面生命周期,当onPause时,停止未完成的请求(抛掉已经请求来的response,不进行处理);
- 请求结果统一预处理:
5.1 网络异常处理与上报;
5.2 接口请求错误信息展示;
5.3 loading UI的显示和隐藏;
5.4 token失效后的统一处理;
- 封装
若想理解以下封装原理,请先通读compose()
和lift()
两个操作符的源码;
API.uploadImg(picInfo)
是基于retrofit的封装
此处省略
compose()
操作符中传入的自定义Transformer
public class RxTransformers {
public static <T> Observable.Transformer<T, T> io_main() {
return (Observable<T> observable) -> observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public static <T> Observable.Transformer<T, T> doApi(BaseActivity activity, HttpResultInterceptor.Type type) {
return (Observable<T> observable) -> observable
.compose(io_main())//线程切换
.compose(activity.bindToLifecycle())//生命周期
.lift(HttpResultInterceptor.get(activity, type));//请求结果预处理
}
public static <T> Observable.Transformer<T, T> doApi(BaseActivity activity) {
return doApi(activity, HttpResultInterceptor.Type.ALL);
}
}
线程切换和页面生命周期绑定没啥好讲的,重点是lift()
中传入的自定义operator
lift(HttpResultInterceptor.get(activity, type));
请求结果预处理
/**
* Created by Jessewo on 2017/7/25.
* <p>
* 1.API链式调用
* 2.lamda表达式(第一层subscriber,可以只实现onNext(),onError/onComplete非必须)
*/
public class HttpResultInterceptor {
private static final String TAG = "HttpResultInterceptor";
public enum Type {
/**
* 1. filter exception and show exception msg when onError<br/>
* 2. show loading when onStart, and hide when onComplete or onError<br/>
* 3. show error message when onNext
*/
ALL,
/**
* 1. filter exception and show exception msg when onError<br/>
* 2. show error message when onNext
*/
ERROR_MSG,
/**
* 1. filter exception and show exception msg when onError<br/>
* 2. show loading when onStart, and hide when onComplete or onError
*/
LOADING,
/**
* 1. filter exception and show exception msg when onError
*/
NONE
}
public static Observable.Operator get(HttpResultHandler handler, Type type) {
return new OperatorHttpResult(handler, type);
}
private static class OperatorHttpResult<T> implements Observable.Operator<T, T>, Subscription {
private SoftReference<HttpResultHandler> mHandler;
private Type mType;
OperatorHttpResult(HttpResultHandler httpResultHandler) {
mHandler = new SoftReference<>(httpResultHandler);
}
OperatorHttpResult(HttpResultHandler httpResultHandler, Type type) {
mHandler = new SoftReference<>(httpResultHandler);
mType = type;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
HttpResultSubscriber<? super T> parent = new HttpResultSubscriber<>(subscriber, mHandler, mType);
//parent subscriber 独立控制 unSubscribe行为
parent.add(this);
return parent;
}
@Override
public void unsubscribe() {
mHandler.clear();
}
@Override
public boolean isUnsubscribed() {
return mHandler.get() == null;
}
}
/**
* 异常处理原则与safeSubscriber稍有不同:<br/>
* 1. onNext所有异常抓取->onError;(同)<br/>
* 2. onComplete所有异常抛出;(同)<br/>
* 3. onError抛出除了OnErrorNotImplementedException之外的所有异常;(不同)
*/
private static class HttpResultSubscriber<T> extends Subscriber<T> {
private Subscriber<? super T> mChild;
private SoftReference<HttpResultHandler> mHandler;
private boolean showProgress;
private boolean showError;
private boolean done;
HttpResultSubscriber(Subscriber<? super T> child,
SoftReference<HttpResultHandler> preHandler,
Type type) {
mChild = child;
mHandler = preHandler;
switch (type) {
case ERROR_MSG:
showProgress = false;
showError = true;
break;
case LOADING:
showProgress = true;
showError = false;
break;
case NONE:
showProgress = false;
showError = false;
break;
default://all
showProgress = true;
showError = true;
break;
}
}
@Override
public void onStart() {
//main thread
showProgress();
}
@Override
public void onCompleted() {
if (done || isUnsubscribed() || mChild.isUnsubscribed())
return;
done = true;
try {
dismissProgress();
mChild.onCompleted();
} finally {
try {
unsubscribe();
} catch (Throwable e) {
RxJavaHooks.onError(e);
throw new OnCompletedFailedException(e.getMessage(), e);
}
}
}
@Override
public void onError(Throwable e) {
if (done || isUnsubscribed() || mChild.isUnsubscribed())
return;
done = true;
try {
dismissProgress();
if (e instanceof OnErrorNotImplementedException) {
e = e.getCause();
}
e.printStackTrace();
String error = App.getInstance().getString(R.string.error_network);
if (e instanceof SocketTimeoutException) {
ToastUtil.show(error + "(" + NETWORK_ERROR_TIMEOUT + ")");
} else if (e instanceof ConnectException) {
ToastUtil.show(error + "(" + NETWORK_ERROR_INTERRUPTION + ")");
} else if (e instanceof UnknownHostException
|| (!TextUtils.isEmpty(e.getMessage()) && e.getMessage().contains("No address associated with hostname"))) {
ToastUtil.show(error + "(" + NETWORK_ERROR_UNKNOWN_HOST + ")");
} else {
ToastUtil.show(error + "(" + NETWORK_ERROR_UNKNOWN + ")");
Analytics.getInstance().onError(e);//对于非常规异常上报后台监控
}
mChild.onError(e);
} catch (OnErrorNotImplementedException e2) {
//ignore
LOG.d(TAG, "onError: OnErrorNotImplementedException");
} finally {
try {
unsubscribe();
} catch (Throwable e3) {
RxJavaHooks.onError(e3);
throw new OnErrorFailedException(e3.getMessage(), e3);
}
}
}
@Override
public void onNext(T t) {
if (done || isUnsubscribed() || mChild.isUnsubscribed())
return;
try {
if (showError) {
if (t instanceof Result) {
Result result = (Result) t;
checkResult(result);
} else if (t instanceof MixResult) {
MixResult mixResult = (MixResult) t;
Result result1 = mixResult.getResult1();
Result result2 = mixResult.getResult2();
if (result1 != null && result2 != null) {
checkResult(result1);
checkResult(result2);
}
}
}
mChild.onNext(t);
} catch (Throwable e) {
onError(e);
}
}
private void checkResult(Result result) {
int status = result.getStatus();
switch (status) {
case API.SUCCESS_CODE:
case API.SKIP_CODE:
break;
case API.RELOGIN_CODE:
//token失效
String msg = result.getMsg();
HttpResultHandler preHandler = mHandler.get();
if (preHandler != null) {
preHandler.onTokenInvalid(msg);
} else {
ToastUtil.show(msg);
}
break;
default:
ErrorMsg.getInstance().show(result);
break;
}
}
private void showProgress() {
if (showProgress && mHandler.get() != null)
mHandler.get().showProgress();
}
private void dismissProgress() {
if (showProgress && mHandler.get() != null)
mHandler.get().dismissProgress();
}
}
//BaseActivity或者BaseFragment需要实现此接口
public interface HttpResultHandler {
void showProgress();
void dismissProgress();
void onTokenInvalid(String msg);
//其他功能可扩展
// void showMessage(String msg);
// void lowerVersion();
}
}