工作流程简述
AsyncTask 涉及的知识点有 Handler,Thread,Callable,FutureTask等。
大体流程图如下:
简单介绍下:
AsyncTask 的构造器创建了一个 mFuture(FutureTask) 和 mWorker(WorkerRunnable),mFuture 重写了 done 方法,mWorker 实现了 call 方法,将 mWorker 传入 mFuture。
构建完 AsyncTask 对象,就可以执行 execute 方法,execute 方法中调用了 executeOnExecutor,并传入了默认的 sDefaultExecutor ,sDefaultExecutor 默认是模拟的单线程(下面会详细说怎么模拟的),AsyncTask 的任务只会一个一个执行。也可以传入自定义的线程,并发执行任务。在 executeOnExecutor 方法中,执行下载前的准备方法 onPreExecute(); 。然后 Executor 执行 mFuture(FutureTask),此时会调用 mFuture 的 run();
。run()
方法中会调用 mWorker 中的 call();
,此时在子线程中会调用 doInBackground(),我们的耗时任务就是在这里实现的。耗时任务结束后,拿到返回值调用 postResult()
,这里创建了一个 Message (what = MESSAGE_POST_RESULT)带上结果 result 发送给了处理消息的 InternalHandler 对象。sHandler 收到消息之后如果 what 是 MESSAGE_POST_RESULT,则判断任务是否被取消 isCancelled()
。如果取消了则调用 onCancelled(result) 没有取消则调用 onPostExecute(result)。
在 doInBackground()
中,我们还可以调用 publishProcess()
方法,在主线程中刷新进度,方法中创建了一个 Message (what = MESSAGE_POST_PROGRESS)带上进度值 value,同样发送给 sHandler 处理,sHandler 处理消息,如果 what 是 MESSAGE_POST_PROGRESS 则调用 onProgressUpdate(Progress... values) 方法。
onPreExecute();
,onCancelled(result)
,onPostExecute(result)
,onProgressUpdate(Progress... values)
这些方法都是在主线程中执行,只有 doInBackground()
是子线程中执行。
AsyncTask 概念
AsyncTask ,(以下翻译自官方文档)能够正确,容易的使用 UI 线程。这个类允许你执行后台操作并将结果呈现在 UI 线程上,不用你去操作 threads 和 handlers。
AsyncTask 被设计为一个 Thread 和 Handler 的辅助类,并不是构建线程的框架。理想情况下,AsyncTask 被用于短时间操作(大部分是几秒钟的),如果你需要保持线程长时间运行,非常推荐你使用 java.util.concurrent 包的一些 APIs,如 Executor,ThreadPoolExecutor 和 FutureTask。
异步任务由在后台线程运行,在 UI 线程呈现结果的计算定义。异步任务由三个普通类型,Params,Progress,Result 和四个步骤,分别是 onPreExecute, doInBackground, onProgressUpdate and onPostExecute 来定义。
代码示例:
private class DownloadFilesTask extends AsyncTask<URL, Integer, Long> {
@Override
protected void onPreExecute() {
//准备工作,在主线程。
}
protected Long doInBackground(URL... urls) {
//耗时操作,在子线程。
int count = urls.length;
long totalSize = 0;
for (int i = 0; i < count; i++) {
totalSize += Downloader.downloadFile(urls[i]);
publishProgress((int) ((i / (float) count) * 100));
// Escape early if cancel() is called
if (isCancelled()) break;
}
return totalSize;
}
@Override
protected void onProgressUpdate(Integer... progress) {
//显示进度,在主线程
setProgressPercent(progress[0]);
}
@Override
protected void onCancelled(Float result) {
//任务被取消会调用这个方法,不会调用 onPostExecute ,在主线程。
showDialog("Cancelled " + result + " bytes");
}
@Override
protected void onPostExecute(Long result) {
//任务完成,没有被取消,调用这个方法,在主线程。
showDialog("Downloaded " + result + " bytes");
}
}
//使用
new DownloadFilesTask().execute(url1, url2, url3);
接下来结合这段代码示例,前面的工作流程图和源代码,看下 AsyncTask 如何在 UI 线程和子线程之间切换,如何使用三个范型,四个步骤如何调用。
先看 AsyncTask 的构造方法:
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
构造方法中创建了一个 mWorker(WorkerRunnable),它实现的是 Callable 接口,和 Runnable 接口的区别是,实现方法有返回值 V call() throws Exception;
,call() 方法的代码后面会做说明。
同时还创建了一个 mFuture(FutureTask),并将 mWorker 做为参数传入。mFuture 重写了 done() 方法,这里也是后面调用到再做说明。
AsyncTask 创建出来后,需要调用 public final AsyncTask<Params, Progress, Result> execute(Params... params)
方法来这执行这个任务。
这个方法里调用的是 executeOnExecutor(sDefaultExecutor, params);
,传入的是 sDefaultExecutor 和 params 对应的参数。我们先看 sDefaultExecutor 到底是什么?
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
sDefaultExecutor 的值是一个内部类 SerialExecutor 的对象 SERIAL_EXECUTOR,这个类实现 Executor 接口。为什么说它是模拟的单线程呢?一开始我以为它是一个单线程的线程池,看了代码之后发现并不是,任务是 THREAD_POOL_EXECUTOR 这个自定义线程池执行的。任务维护在 ArrayDeque 这个队列中,SERIAL_EXECUTOR 执行 execute 方法就会创建一个任务放入这个队列。当 mActive == null
的时候,说明之前队列中还没有任务,然后执行 scheduleNext();
,从队列中取出任务赋值给 mActive,并由 THREAD_POOL_EXECUTOR 来执行任务,这时会执行任务的 run() 方法,而 run() 方法中又会执行传进来的那个任务 final Runnable r
,执行完后,同样调用 scheduleNext();
再去取下一个任务,如此循环,直到队列中没有任务为止。
THREAD_POOL_EXECUTOR 这个自定义线程的创建过程看如下代码:
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
介绍完 sDefaultExecutor 后,回到主线,看方法 public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params)
。
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
方法中,先是校验任务状态,如果是 RUNNING 已运行或是 FINISHED 已完成都会抛出异常。然后,修改任务状态为 RUNNING。之后再执行 onPreExecute(); 方法,此时重要的四个步骤的第一步就被执行了。之后将传入的参数 params 交给 mWorker,执行 mFuture。
此时,和我们之前介绍的 sDefaultExecutor 执行过程 和 AsyncTask 的构造方法就要联系起来了。调用 exec.execute(mFuture);
方法将 mFuture 包装成任务放入队列(前面说过的),随后被执行 mFuture.run()。因为 FutureTask 实现了 Runnable 接口,所以会有对应的 run() 方法。
我们找到 FutureTask 类来看下它的 run() 方法是如何实现的。
public class FutureTask<V> implements RunnableFuture<V> {
...
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
...
}
代码中的 callable 就是前面构造器中传入 mFuture 的 mWorker。Callable<V> c = callable; result = c.call();
可以看到 mWorker 被调用了,随后又调用 set(result);
,将结果赋值给
outcome,调用 finishCompletion()
,最后调用 done()。done 方法中主要是调用 postResultIfNotInvoked(get());
来校验(mTaskInvoked标识位)如果任务没有执行,也保证执行postResult(result);
方法,把结果返回给主线程。
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
此时 mWorker 的 call() 和 mFuture 的 done() 方法都被调用了。
我们在翻回去看 mWorker 的 call() 方法。
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
首先设置任务被调用的标识为 true,设置线程优先级了 THREAD_PRIORITY_BACKGROUND,再调用方法 doInBackground(mParams),这是四个步骤的第二步方法被调用,耗时任务开始执行。获取到返回的结果后,调用 postResult(result);
方法。
对 Binder.flushPendingCommands();
作用并不清楚。我们先看 postResult。
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
创建一个 Message,what 是 MESSAGE_POST_RESULT,用返回值创建一个 AsyncTaskResult 对象做为message 的 obj,并发送出去,交给 sHandler 来处理,我们看 Handler 处理消息的代码。
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
handleMessage 方法中处理收到的消息,如果消息是 MESSAGE_POST_RESULT,调用 finish 方法。 finish 方法中根据任务是否被取消,来执行不同的方法,如果取消则执行 onCancelled(result);,没有取消执行 onPostExecute(result);,此时 AsyncTask 的第四步骤就走完了。整个任务执行完毕。可以看到 onCancelled 和 onPostExecute 只会执行一个。
如果是 MESSAGE_POST_PROGRESS 的消息,则执行 onProgressUpdate(result.mData) 方法,这个是第三个步骤,这个消息是怎么来的呢?
@WorkerThread
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
原来我们可以在工作线程中调用这个方法,也就是在 doInBackground 中使用。如果任务没有取消,就会创建一个 what 是 MESSAGE_POST_PROGRESS,将进度值封装成一个 AsyncTaskResult 对象做为 obj 的 Message 给 Handler 处理。
最后看下,任务是怎么被取消的,调用如下方法:
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
修改是否取消的标识位,调用 mFuture 的 cancel 方法。mFuture 的 cancel 方法中,如果有当前任务的线程则会调用这个线程的 interrupt 的方法并也会调用 finishCompletion();
,判断任务是否调用,发送 postResult。
总结
AsyncTask 不仅能帮我们简单的完成异步任务的操作,还就如何更好更准确的使用 Handler 和 Thread 做出了示范,从其中能学到很多有用的知识。