本文参考自 Asynchronous Android Programming – the Good, the Bad, and the Ugly
有关 RxJava 2 的例子 RxJava2-Android-Samples
有关 RxSearchObservable 的参考博文 Implement Search Using RxJava Operators
编写 Android 代码时,很多情况下你需要进行网络调用,目前有几种不同的异步编程方式可以满足我们的需求。我们对这几种方式进行了比较,通过代码示例,将选出我们认为在 Android 中进行网络调用的最佳方法。
一、The Ugly:在 UI 线程上进行网络调用
进行网络调用的最简单方法是直接在 UI 线程上进行调用:
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// Insert network call here!
}
这可能适用于 Android 早期版本上的短网络调用,但这种方式是非常 ugly (丑陋的)
。当你在 UI 线程上运行耗时的代码段时,它同时会阻塞其他操作。对于用户来说,应用程序似乎停止运行了。如果用户在 UI 线程被阻塞时执行操作,并且应用在 5 秒内没有响应,则 Android 系统将显示 “应用程序无响应 (ANR)”
对话框:
如果你尝试在较新版本的 Android(3.0
及以上版本)上以这种方式进行网络调用,应用程序会崩溃,因为这些新版本对于 UI 线程的使用要求更为严格。
二、The Bad Part 1:使用 Runnable 和 Handler
为了避免 Android 应用程序停止运行甚至崩溃,我们需要在后台执行网络调用。一种方法是使用 Runnable
,它包含需要在后台运行的代码,并且在工作线程上运行该代码。
Runnable runnable = new Runnable() {
public void run() {
// Insert network call here!
}
};
Thread mythread = new Thread(runnable);
mythread.start();
现在可以放心地进行网络调用了,但如果你想在这之后更新用户界面,你是无法在 Runnable
中执行此操作的,因为它是在 UI 线程以外的线程上运行。但是,你可以在 UI 线程上创建一个 Handler
,并在 Runnable
中使用它来触发 Handler
的 handleMessage()
回调。
private Handler handler = new Handler() {
@Override
public void handleMessage(Message msg) {
// Update user interface
}
};
Runnable runnable = new Runnable() {
public void run() {
// Insert network call here!
handler.sendEmptyMessage(0);
}
};
Thread mythread = new Thread(runnable);
mythread.start();
Runnable
和 Handler
的主要风险是内存泄漏。如果 Activity 创建了一个 Handler
,此时在工作线程中执行耗时任务,并在 Runnable
仍在工作时离开 Activity,Handler
会持有创建它的 Activity 的引用,该引用将阻止 Activity 被垃圾回收。为了避免这种情况,需要静态创建 Handler
,因为它不包含对 Activity 的引用,并且使用 WeakReference
持有 Activity 的引用,这样就可以调用其方法来更新 UI 而不会发生内存泄漏。还有一个问题,就是如果同时执行多个更新用户界面的网络调用,则无法保证 Handler
按照你想要的顺序接收回调。
以下是优化过的 Handler:
private final MyHandler mHandler = new MyHandler(this);
private static class MyHandler extends Handler {
private final WeakReference<MainActivity> mActivity;
private MyHandler(MainActivity activity) {
mActivity = new WeakReference<MainActivity>(activity);
}
@Override
public void handleMessage(Message msg) {
MainActivity activity = mActivity.get();
if (activity != null) {
// Update user interface
}
}
}
Runnable runnable = new Runnable() {
public void run() {
// Insert network call here!
mHandler.sendEmptyMessage(0);
}
};
Thread mythread = new Thread(runnable);
mythread.start();
更多有关 Handler 导致内存泄漏的介绍请看这里 Android Handler 导致内存泄漏原因及解决方案
三、The Bad Part 2:使用 AsyncTask
在后台进行网络调用的另一种不太理想的方法是使用 AsyncTask
。它们的用法相当简单:创建 AsyncTask
的子类,覆盖 doInBackground()
方法,可能还有 onPreExecute()
,onProgressUpdate()
和 onPostExecute()
方法。
private class NetworkCallTask extends AsyncTask<String, Void, Integer> {
@Override
protected Integer doInBackground(String... strings) {
// Insert network call here!
return null;
}
@Override
protected void onPreExecute() {
// Perform setup - runs on user interface thread
super.onPreExecute();
}
@Override
protected void onProgressUpdate(Void... values) {
// Update user with progress bar or similar - runs on user interface thread
super.onProgressUpdate(values);
}
@Override
protected void onPostExecute(Integer integer) {
// Update user interface
super.onPostExecute(integer);
}
}
new NetworkCallTask().execute();
onPreExecute()
在 UI 线程上运行,用于在启动后台任务之前执行任何需要的设置,例如显示进度条。接下来,在后台调用 doInBackground()
来执行网络任务。此时,如果从 doInBackground()
内部调用 publishProgress()
方法,则将在 UI 线程上调用 onProgressUpdate()
方法,执行更新进度条等与用户相关的任务。最后,一旦后台任务完成,将在 UI 线程上调用 onPostExecute()
,执行诸如取消进度条和显示网络调用结果等操作。
这比在 UI 上进行网络调用要好得多,但仍有一些问题需要避免:如果要取消 AsyncTask
,需要等到 doInBackground()
方法完成后才能进行。这意味着即使你在异步任务上调用 cancel()
方法,它也会等到后台工作完成后再取消任务,除非你在 doInBackground()
方法中不断地检查 isCancelled()
的结果。
AsyncTask
的另一个问题是:如果你同时运行多个 AsyncTask
,你无法保证它们会以怎样的顺序完成,导致需要复杂的逻辑代码来检查。更糟糕的是你假设 A AsyncTask
将在 B AsyncTask
之前完成,直到遇到一个边缘情况使 A AsyncTask
调用变慢,使得它们以错误的顺序完成并且产生不希望的结果。
由于 AsyncTask
可以部分在 UI 线程上运行,部分在后台线程上运行,因此它们可能导致内存泄漏。因为 AsyncTask
经常在 onPostExecute()
方法中更新 UI,因此很容易使 AsyncTask
成为 Activity 的内部类,并且可以访问 Activity 的 UI 元素。这会创建一个对 Activity 的隐式引用,当 AsyncTask
仍在运行时,该引用会阻止对 Activity 进行垃圾回收,即使 Activity 已被销毁。
四、The Good:使用 RxJava
最后,终于到了我们最喜欢的后台网络调用方法:RxJava 库,一个 ReactiveX 的 Java 实现,这里我们使用的是专门为 Android 开发所设计的基于 RxJava 2 的 RxAndroid 库以及开源的网络请求库 OkHttp 3。使用 RxJava 的 Observable
,可以创建一个覆写了 onNext()
,onError()
和 onComplete()
方法的 Observer
。Observable
会将数据项发射给 Observer
,导致调用 onNext()
方法并访问所发射的数据项。如果 Observable
已发射完数据项,则调用 onCompleted()
。如果在任何时候抛出异常,则会利用该异常调用 onError()
,从而轻松处理错误。
以下代码是从网络加载一张图片并设置给 ImageView 显示出来:
private CompositeDisposable mCompositeDisposable = new CompositeDisposable();
private void setBitmapFromNet() {
getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<Bitmap> getObservable() {
return Observable.create(new ObservableOnSubscribe<Bitmap>() {
@Override
public void subscribe(final ObservableEmitter<Bitmap> emitter) throws Exception {
if (!emitter.isDisposed()) {
Request request = new Request.Builder()
.url("https://img3.doubanio.com/img/musician/large/35000.jpg")
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: " + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
InputStream inputStream = response.body().byteStream();
emitter.onNext(BitmapFactory.decodeStream(inputStream));
emitter.onComplete();
}
});
}
}
});
}
private Observer<Bitmap> getObserver() {
return new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 1 ");
mCompositeDisposable.add(d);
}
@Override
public void onNext(Bitmap bitmap) {
Log.d(TAG, "onNext: 2 ");
mImageView.setImageBitmap(bitmap);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: 3 " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 3 ");
}
};
}
Observable
还允许使用操作符来修改 Observable
。例如,subscribeOn()
声明 Observable
在哪个线程上执行工作,以及 observeOn()
声明观察者使用的线程。有许多操作符可用于修改 Observable
,例如 map
,它将发射的数据项转换为其它类型的数据项。
-
Map
— 映射,通过对序列的每一项都应用一个函数变换Observable
发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项。
/*
* Here we are getting ApiUser Object from api server
* then we are converting it into User Object because
* may be our database support User Not ApiUser Object
* Here we are using Map Operator to do that
*/
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<List<ApiUser>, List<User>>() {
@Override
public List<User> apply(List<ApiUser> apiUsers) {
return Utils.convertApiUserListToUserList(apiUsers);
}
})
.subscribe(getObserver());
}
private Observable<List<ApiUser>> getObservable() {
return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiUser>> e) {
if (!e.isDisposed()) {
e.onNext(Utils.getApiUserList());
e.onComplete();
}
}
});
}
private Observer<List<User>> getObserver() {
return new Observer<List<User>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(List<User> userList) {
textView.append(" onNext");
textView.append(AppConstant.LINE_SEPARATOR);
for (User user : userList) {
textView.append(" firstname : " + user.firstname);
textView.append(AppConstant.LINE_SEPARATOR);
}
Log.d(TAG, " onNext : " + userList.size());
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
另一个有用的操作符是 merge
,它使用多个 Observable
,并将所有 Observable
中的数据项发送到同一个 Observer
,为每个 Observable
发射的每个数据项调用 onNext()
。如果在此过程中 Observable
出现错误,则调用 onError()
并且不再发射任何数据项。如果不想这样,可以使用 mergeDelayError
,它与 merge
相同,但如果有错误,它会等到所有 Observable
完成或出现多次错误之后才会触发 onError()
。
-
Merge
— 将两个Observable
发射的数据组合并成一个。
/*
* Using merge operator to combine Observable : merge does not maintain
* the order of Observable.
* It will emit all the 7 values may not be in order
* Ex - "A1", "B1", "A2", "A3", "A4", "B2", "B3" - may be anything
*/
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.merge(aObservable, bObservable)
.subscribe(getObserver());
}
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
也可以组合操作符,对发射对象进行各种修改。组合运算符没有限制,使得使用 RxJava 完成复杂任务变得更加简单。以下是一个响应式搜索框的实现,用到了 Debounce
、Filter
、DistinctUntilChanged
和 SwitchMap
:
Debounce
— 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作,俗称去抖。Debounce
操作符与时间常数一起使用。当用户在非常短的时间内键入 “a”,“ab”,“abc” 时,会导致多次网络调用,但是用户最终只对搜索 “abc” 的结果感兴趣。所以,你必须丢弃 “a” 和 “ab” 的结果。理想情况下,当用户在非常短的时间内输入“a” 和 “ab” 时,不应该发生网络调用。所以,Debounce
操作符就是为此而生的。它使Observable
等待所提供的时间,如果在该时间内进行搜索,它将忽略之前的搜索并使用最新的搜索进行查询,然后再次等待相同的时间。如果在给定的时间内没有任何新输入,它将继续使用最新的搜索项进行搜索。Filter
:过滤器运算符用于过滤不需要的字符串,如空字符串,以避免不必要的网络调用。DistinctUntilChanged
—DistinctUntilChanged
操作符用于避免重复的网络调用。假设最后一个正在进行的搜索是 “abc”,用户删除了 “c” 并再次键入 “c”。所以它又是 “abc”。因此,如果已经在搜索 “abc” 了,则不会再次进行搜索。因此,distinctUntilChanged
会去除Observable
发射的重复连续项。SwitchMap
—SwitchMap
操作符用于避免那些结果对于用户不再需要的网络调用。假设最后一个搜索是 “ab”,并且正在进行 “ab” 的网络调用,此时用户键入 “abc”。那用户就不再对 “ab” 的结果感兴趣了,只对 “abc” 的结果感兴趣。SwitchMap
就是为此而生的,它仅提供最后一次搜索的结果并忽略其余的搜索。
public class RxSearchObservable {
public static Observable<String> fromView(SearchView searchView) {
final PublishSubject<String> subject = PublishSubject.create();
searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String s) {
subject.onComplete();
return true;
}
@Override
public boolean onQueryTextChange(String text) {
subject.onNext(text);
return true;
}
});
return subject;
}
}
private void setUpSearchObservable() {
RxSearchObservable.fromView(searchView)
.debounce(300, TimeUnit.MILLISECONDS)
.filter(new Predicate<String>() {
@Override
public boolean test(String text) {
if (text.isEmpty()) {
textViewResult.setText("");
return false;
} else {
return true;
}
}
})
.distinctUntilChanged()
.switchMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String query) {
return dataFromNetwork(query);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String result) {
textViewResult.setText(result);
}
});
}
使用 Observable
时需要注意的地方是确保在 Activity 的 onDestroy()
方法中取消订阅,可以使用 CompositeDisposable
来统一管理。这是一种防止内存泄漏的简单方法,并且是有意义的,因为它遵循 Android 的生命周期。
private final CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // do not send event after activity has been destroyed
}
/*
* Example to understand how to use disposables.
* disposables is cleared in onDestroy of this activity.
*/
void doSomeWork() {
disposables.add(sampleObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
}));
}
static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() {
// Do some long running operation
SystemClock.sleep(2000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}