最近找实习面试中经常会被问到关于AsyncTask的一些内部机制的问题,之前也早有学习,但是还不够系统,没有形成一个体系,现在我们来完整的彻底的梳理一下吧,扫除一些知识盲点。
相信大多数Android开发者都接触过AsyncTask这个类,AsyncTask主要是用来处理一些后台的任务,说到底就是一个异步处理工具类,随着Android开发配套的开源项目越来越完善越来越好,比如网络请求时有了Volley配套OKHttp等非常好用的库,但是在一些例如和UI做交互的情况下还是需要AsyncTask来帮忙的。
那么AsyncTask到底是什么呢,阅读源码我们可以发现,AsyncTask就是一个Handler和线程池的封装,线程池用来异步处理后台任务,handler用来发送消息进行UI方面的交互。好,既然说到了源码,那么我们先来看看AsyncTask的源码。(基于API 23)
先看看AsyncTask的构造方法:
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
/**
* 创建一个新的异步任务,这个构造方法只能在主线程调用。
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
首先new一个mWorker,这是一个WorkerRunnable<Params,Result>,抽象类,实现了Callable的接口,本质上就是一个Callable。mFuture是一个FutureTask对象。
可以看到在WorkerRunnable中最主要是做了三步,将代表任务是否调用的原子boolean标记设为true,将这个线程的优先级设为后台线程优先级,并且丢出doInBackground处理的结果。
这个结果被丢到了postResult方法中,我们可以来看看postResult方法部分的代码。
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
可以看到其实这个方法并没有对丢过来的result做什么事,丢进来的result依然原封不动的丢了出去,只是在这个过程中捎带实例化了一下AsyncTaskResult,将丢进来的这个result以及当前
这个AsyncTask一起又抛给了新初始化的AsyncTaskResult, 并且将向handler发送了一个Message。注意这里message的 what是MESSAGE_POST_RESULT, 这里是有用到的,我们后面再讲。
好,我们继续往下走,看看result丢进去的AsyncTaskResult。
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
可以看到AsyncTaskResult只是一个static的内部类,只是起了一个保存当前的AsyncTask对象和后台处理的result data的作用。我们再继续往下看看这里保存的result和task在哪里使用到了。之前
已经提到了AsyncTask本质上就是一个线程池和Handler的封装,现在我们都已经获得了后台处理的结果了,消息也发送了,那么我们跟着result来看看这个重要组成部分吧。
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
这里就非常清楚了,之前也讲过在post中发送的消息的what内容是MESSAGE_POST_RESULT, 我们看看handler中,result也是消息中传递过来的AsyncTaskResult,里面保存了当前的task和处理的结果,
进入我们预想的case,通过对应的task,处理AsyncTaskResult中的data, 这里是走到了AsyncTask的finish方法。走到这儿可能有点远了,但是不怕,我们继续往下看,其实这份代码写的还是很有条理的。
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
注意,这里的finish方法是一个private,只有当前的task能调用,一定程度上保证了安全性。它通过对isCancelled的判断可以进入不同的结果处理流程,实际上onCancelled和onPostExecute就是我们平常使用AsyncTask经常使用的两个方法,到这一步就没什么好说了,task携带着之前一步步传过来的doInBackground的结果,来到了我们最熟悉的几个方法了。只是我们需要留意一下Status这个环节。
mStatus是一个volatile的Status对象,而Status是一个枚举类,只包括了
- PENDING, 等待状态,表明当前这个task还没有执行。
- RUNNING, 运行状态,当前task正在执行中。
- FINISHED, 结束状态,当前task已经运行结束了。
而mStatus在这个task初始时就直接设为了PENDING, 并且只有在finish方法中改变为FINISHED, 这已经是最后的环节了。那么是在哪里变为RUNNING呢,我们现在就来看看整个AsyncTask最核心的部分,线程池这一块。
看完了从task构造方法一路跟随到最后finish,我们现在再跟着我们平时的用法来一起学习线程池一部分。事实上我们在使用时,每次初始化后,会使用execute这个方法来让task跑起来。我们看看这里是发生了什么事。
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
代码很简单,主要就是携带参数,在主线程中启动起来。注意,这里只能是在UI线程也就是主线程中完成这一步,这也是AsyncTask一个比较大的软肋。
事实上task是通过一个队列完成的。别急,好饭不怕晚,我们先看看这里return 的executeOnExecutor方法干了什么。
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
这里是不是就和之前一直到finish的分析连接上了呢。这里执行task时,讲道理的话到这一步时当前的task一定还是PENDING状态,所以这里会先检查mStatus,如果不是PENDING就会抛出异常,否则就正常执行,把mStatus改为RUNNING状态。然后就到了我们也非常熟悉的方法了,onPreExecute,这里一般做一些后台任务之前的事情。
终于到了重点了,完成了准备工作后,注意exec.execute(mFuture)这一步,这个exec是什么? 一个Executor,Executor只是一个接口,定义了一个execute的方法,这个exec是在上一步execute时传入的全局sDefaultExecutor。sDefaultExecutor是一个默认线程池。
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
可以看到,sDefaultExecutor实际上是一个SerialExecutor. 但是这里为什么要分开写呢。可以发现SERIAL_EXECUTOR是一个final对象,也就是说sDefaultExecutor默认是使用系统初始化好的SerialExecutor,但是我们也可以手动给这个Task设置一个Executor。
/** @hide */
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
这样就清楚了,其实留给开发者的余地还是很大的。在这里还是只研究默认的Executor吧。一样,我们看看源码。
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
哈,可以看到这里的excute方法就是我们前面分析每次提到的excute,到最后就都到了这里。首先我们可以看到在SerialExecutor中维护了一个ArrayDeque, mTasks。还记得之前在executeOnExecutor中看到的exec.execute(mFuture)方法吗?mFuture是一个FutureTask,实现了Future和Runnable,这里就将丢过来的mFutuer封装成了一个Runnable对象,然后在把这个Runnable添加到队列。
接着包含一个scheduleNext() 方法。scheduleNext是先从mTasks队列中取出队首的一个Runnable任务叫做mActive,如果这个Runnable不为空,就将其添加到真正的线程池THREAD_POOL_EXECUTOR之中执行。
需要指出的是,由于第一次execute时,在将Runnable加进队列后,mActive初始化为null,所以会默认走进scheduleNext,这样也保证了一开始的自动启动。
以后的任务就是在try{}finally{}中可以看到,每个Runnable运行完后就进入finally执行队列中的下一个任务。
我们已经发现了这里的每个Runnable都是在THREAD_POOL_EXECUTOR中完成的。这是什么?一个ThreadPoolExecutor. 我们先看看一个ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
我们看看依次需要的参数:
- corePoolSize 核心线程池大小,也就是线程池中可以维持的线程数目,即使这些线城是空闲的,也不会终止(除非设置了allowCoreThreadTimeOut),因此可以理解为常驻线程池的线程数目。
- maximumPoolSize 线程池中允许的最大线程数目。因为一般来说线程数目越多,调度所用的花销越大,所以需要设置一个数目上限。
- keepAliveTime 当线程数目大于核心线程数目时,如果超过这个keepAliveTime时间,那么空闲的线程会被终止。
- unit keepAliveTime的时间单位。
- workQueue 一个保存尚未执行的线程的队列。这个队列只保存由execute方法提交的Runnable任务。
- threadFactory 用来构造线程池的工厂。
我们再看看ThreadPoolExecutor的execute方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl的int值。这个int值保存了线程池中任务数目和线程池的状态等信息
int c = ctl.get();
// 当线程池中的任务数量 < 核心线程数目时,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中,然后启动该线程来执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
我们可以看一下这个线程池调度的大致流程:
- 如果线程池中的线程数目小于corePoolSize,那么就新启动一个线程,并且将这个Runnable作为这个新线程的第一个任务添加到线程中进行执行
- 如果线程池中的线程数目大于等于corePoolSize,就将任务添加到workQueue队列中等待。这种情况下,会两次确认线程池的状态,如果第2次读到的线程池状态和第1次读到的线程池状态不同,就从队列中删除该任务。
- 如果这个队列已经满了,就在线程池中新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,就通过reject()拒绝该任务。
总之可以发现,线程池ThreadPoolExecutor通过workQueue来管理线程和任务,每个线程在启动后,会执行线程池中的任务;当一个任务执行结束后,它会从线程池workQueue中取出任务来继续执行。workQueue是管理线程池任务的队列,当添加到线程池中的任务超过线程池的最大线程数目时,这个任务就会进入阻塞队列进行等待。
好,我们现在来看看android中的THREAD_POOL_EXECUTOR是怎么走的。
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
我们看看传进去的参数:
//cpu的数量,Runtime获取
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心线程数目
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
//最大线程数目
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
//工作队列,一个阻塞队列,用来保存任务。
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
//创建新线程的一个工厂方法
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
可以看到,在构建THREAD_POOL_EXECUTOR时是基于android平台的特点来的。核心线程数是设为了CPU_COUNT+1。
但是实际上在之前的android API版本中,这些值是直接设为了固定的值。
/*4.3 版本的AsyncTask源码 */
private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 128;
private static final int KEEP_ALIVE = 1;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread More ...newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(10);
从4.4版本改成了目前的配置。另外可以看到,4.4之前的sPoolWorkQueue的大小也只有10。这些配置的更改给AsyncTask的动态调度还是带来了优势的。比如在4.3中,核心线程数目只有5,因此如果开了5个线程后,再继续开的话就只能等待了。
这是对线程池的初始化和调度分析了,之后就是线程的执行,并且接着我们最初的流程,最后走到task的finish方法,需要我们在AsyncTask实现时完成onPostExecute就可以了。关于Future部分的分析,我们以后再讲。
实际上,面试时面试官会经常问一个问题就是AsyncTask是可以并行的吗?
经过我们上面的分析,已经可以确定AsyncTask内部有一个线程池来进行线程的调度管理以及执行。那么AsyncTask是可以并行执行的吗?先卖个关子,看看2.3.7的AsyncTask部分的代码:
public final AsyncTask<Params, Progress, Result> More ...execute(Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
Worker.mParams = params;
sExecutor.execute(mFuture);
return this;
}
看到没,在2.3.7上execute(Params... params)这个方法和6.0上有什么区别?
我们在之前已经分析过了,execute(params)是走到了 return executeOnExecutor(sDefaultExecutor, params);
而sDefaultExecutor是一个volatile的对象。
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
我们可以得出结论,在6.0上不是并行的,在2.3上是并行的。实际上阅读多个版本的源码可以发现,这一变化是在3.0上做到的。因此我们可以理解为从3.0之后就变成串行了,1.5-2.3是并行的。最初也是串行的。
事实上,也可以自己写demo测试一下,看看每个task的执行的时间是不是同时开始。
那么面试官又会问了,为什么在目前的版本上要将AsyncTask设计成串行呢?
揣摩设计者的意图其实真的是一个很有意思的事情啊- -23333
我们已经提到了AsyncTask是线程池和Handler的一个封装好的工具,其实做到的功能在handler message和loop也能做到,那么为什么要设计一个这样的工具呢,主要还是方便开发者的使用,尤其是初级开发者。
而可能随着android版本的迭代开发,发现有开发者很少在doInBackground中做线程安全的考虑,既然很少有人会考虑最资源的并发访问的安全性,那么干脆就不开放这个功能,保证每个线程的串行执行。这样就是皆大欢喜。
好了,对AsyncTask的分析就到这儿了,大家应该清楚整个的流程以及其特点了。但是可能大家也看到一些大牛说了,并不推荐使用默认的AsyncTask,因为实在有一些不能忽视的缺点啦- -
首先,默认的AsyncTask的线程池中的核心线程数是有限的,不管是和CPU数目有关还是以前的固定的5, 还是有一个数量的限制,因此不适合大量的后台任务处理,例如瀑布流图片的加载等。
其次,AsyncTask类必须在主线程初始化,必须在主线程创建,因为return executeOnExecutor(sDefaultExecutor, params)这里也只能在UI线程走。
最后,我们也讲到了,AsyncTask在3.0后改成了串行的,因此想真正做一些并行的后台任务,就不太适合了。
总之,大家想要更好的使用AsyncTask,最好自己修改一下再使用啦。目前默认的AsyncTask确实还不是最佳状态。