注:本文摘自《RxJava Essentials》翻译中文版电子书
StrictMode
StrictMode是一种模式,为了获得更多出现在代码中的关于公共问题的信息。
StrictMode帮助我们侦测敏感的活动,如我们无意的在主线程执行磁盘访问或者网络调用。
为了在我们的App中激活StrictMode,我们只需要在MainActivity中添加几行代码,即onCreate()方法中这样:
@Override
public void onCreate() {
super.onCreate();
if (BuildConfig.DEBUG) {
StrictMode.setThreadPolicy(new StrictMode.ThreadPolicy.Builder().detectAll().penaltyLog().build());
StrictMode.setVmPolicy(new StrictMode.VmPolicy.Builder().detectAll().penaltyLog().build());
}
}
我们并不想它总是激活着,因此我们只在debug构建时使用。这种配置将报告每一种关于主线程用法的违规做法,并且这些做法都可能与内存泄露有关:Activities、BroadcastReceivers、Sqlite等对象。
选择了penaltyLog(),当违规做法发生时,StrictMode将会在logcat打印一条信息。
避免阻塞I/O的操作
我们激活StrictMode后,我们开始收到了关于我们的App错误操作磁盘I/O的不良信息。比如:
D/StrictMode StrictMode policy violation; ~duration=998 ms: android.os.StrictMode$StrictModeDiskReadViolation: policy=31 violation=2
at android.os.StrictMode$AndroidBlockGuardPolicy.onReadFromDisk (StrictMode.java:1135)
at libcore.io.BlockGuardOs.open(BlockGuardOs.java:106) at libcore.io.IoBridge.open(IoBridge.java:393)
at java.io.FileOutputStream.<init>(FileOutputStream.java:88)
at android.app.ContextImpl.openFileOutput(ContextImpl.java:918)
at android.content.ContextWrapper.openFileOutput(ContextWrapper. java:185)
at com.packtpub.apps.rxjava_essentials.Utils.storeBitmap (Utils.java:30)
上一条信息告诉我们Utils.storeBitmap()函数执行完耗时998ms:在UI线程上近1秒的不必要的工作和App上近1秒不必要的迟钝。这是因为我们以阻塞的方式访问磁盘。
Schedulers
Schedulers 是调度器。RxJava提供了5种调度器:
- .io()
- .computation()
- .immediate()
- .newThread()
- .trampoline()
Schedulers.io()
这个调度器时用于I/O操作。它基于根据需要,增长或缩减来自适应的线程池。我们将使用它来修复我们之前看到的StrictMode违规做法。由于它专用于I/O操作,所以并不是RxJava的默认方法;正确的使用它是由开发者决定的。
重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。一如既往的是,我们需要在性能和简捷两者之间找到一个有效的平衡点。
Schedulers.computation()
这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。
Schedulers.immediate()
这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。
Schedulers.newThread()
这个调度器正如它所看起来的那样:它为指定任务启动一个新的线程。
Schedulers.trampoline()
当我们想在当前线程执行一个任务时,并不是立即,我们可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()和retry()方法默认的调度器。
非阻塞I/O操作
假设blockingStoreBitmap方法是一个操作IO的方法,会阻塞线程,如何用RxJava解决:
public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
Schedulers.io().createWorker().schedule(() -> {
blockingStoreBitmap(context, bitmap, filename);
});
}
每次我们调用storeBitmap(),RxJava处理创建所有它需要从I / O线程池一个特定的I/ O线程执行我们的任务。所有要执行的操作都避免在UI线程执行并且我们的App比之前要快上1秒
SubscribeOn and ObserveOn
RxJava提供了subscribeOn()方法来用于每个Observable对象:
getApps()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
observeOn()方法将会在指定的调度器上返回结果:如例子中的UI线程。onBackpressureBuffer()方法将告诉Observable发射的数据如果比观察者消费的数据要更快的话,它必须把它们存储在缓存中并提供一个合适的时间给它们。
执行网络任务
作为网络访问的第一个案例,我们将创建下面这样一个场景:
- 加载一个进度条。
- 用一个按钮开始文件下载。
- 下载过程中更新进度条。
- 下载完后开始视频播放。
创建mDownloadProgress,用来管理进度的更新,它和download函数协同工作。
private PublishSubject<Integer> mDownloadProgress = PublishSubject.create();
private Observable<Boolean> obserbableDownload(String source, String destination) {
return Observable.create(subscriber -> {
try {
boolean result = downloadFile(source, destination);
if (result) {
subscriber.onNext(true);
subscriber.onCompleted();
} else {
subscriber.onError(new Throwable("Download failed."));
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}
现在我们需要触发下载操作,点击下载按钮:
@OnClick(R.id.button_download)
void download() {
mButton.setText(getString(R.string.downloading));
mButton.setClickable(false);
mDownloadProgress.distinct()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
App.L.debug("Completed");
}
@Override
public void onError(Throwable e) {
App.L.error(e.toString());
}
@Override
public void onNext(Integer progress) {
mArcProgress.setProgress(progress);
}
});
String destination = "sdcardsoftboy.avi";
obserbableDownload("http://archive.blender.org/fileadmin/movies/softboy.avi", destination)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(success -> {
resetDownloadButton();
Intent intent = new Intent(android.content.Intent.ACTION_VIEW);
File file = new File(destination);
intent.setDataAndType(Uri.fromFile(file),"video/avi");
intent.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK);
startActivity(intent);
}, error -> {
Toast.makeText(getActivity(), "Something went south", Toast.LENGTH_SHORT).show();
resetDownloadButton();
});
}
downloadFile:
private boolean downloadFile(String source, String destination) {
boolean result = false;
InputStream input = null;
OutputStream output = null;
HttpURLConnection connection = null;
try {
URL url = new URL(source);
connection = (HttpURLConnection) url.openConnection();
connection.connect();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
return false;
}
int fileLength = connection.getContentLength();
input = connection.getInputStream();
output = new FileOutputStream(destination);
byte data[] = new byte[4096];
long total = 0;
int count;
while ((count = input.read(data)) != -1) {
total += count;
if (fileLength > 0) {
int percentage = (int) (total * 100 / fileLength);
mDownloadProgress.onNext(percentage);
}
output.write(data, 0, count);
}
mDownloadProgress.onCompleted();
result = true;
} catch (Exception e) {
mDownloadProgress.onError(e);
} finally {
try {
if (output != null) {
output.close();
}
if (input != null) {
input.close();
}
} catch (IOException e) {
mDownloadProgress.onError(e);
}
if (connection != null) {
connection.disconnect();
mDownloadProgress.onCompleted();
}
}
return result;
}