使用
AsyncTask的使用不是本文的重点,我相信读者也不是来了解这个的,所以就先简单的介绍下它的使用,帮助大家回忆。
AsyncTask的实现:
public class DownloadTask extends AsyncTask<Object, Integer, Boolean> {
/**
* 在执行任务之前调用,运行在主线程,可用来做布局更新等
*/
@Override
protected void onPreExecute() {
super.onPreExecute();
}
/**
* 执行后台任务,运行在子线程
*/
@Override
protected Boolean doInBackground(Object... objects) {
return false;
}
/**
* 更新进度,运行在主线程,在doInBackground方法中调用publishProgress(Integer... values)会自动调用该方法
*/
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
}
/**
* 后台任务结束后调用,运行在主线程,参数aBoolean是doInBackground()方法的返回值
*/
@Override
protected void onPostExecute(Boolean aBoolean) {
super.onPostExecute(aBoolean);
}
/**
* 主动调用cancel()方法,或者doInBackground()方法抛出异常
*/
@Override
protected void onCancelled() {
super.onCancelled();
}
}
调用:
DownloadTask downloadTask = new DownloadTask();
downloadTask.execute(object1, object2);
//这里的参数object1, object2最终会传给doInBackground(Object... objects);
源码思路
先说下整体的实现思路:创建线程池执行耗时任务,通过handler将进度、结果等送回主线程。
细节方面,我们需要研究下,该线程池是怎样的线程池,它的执行逻辑是怎样的:核心线程数,最大线程数,等待队列,拒绝策略等细节。顺便再了解下上面代码中的onPreExecute(),onProgressUpdate()等生命周期是在什么时候执行的。
开始研究
我们就从我们使用的地方开始下手:
DownloadTask downloadTask = new DownloadTask();
downloadTask.execute(object1, object2);
1.下面是其构造方法:
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
//设置标志,表示工作任务开始执行
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
结构很简单,一共初始化了三个全局变量mHandler,mWorker,mFuture。
mHandler,从上面的代码看,是创建了一个绑定callbackLooper的handler,所以讲道理如果callbackLooper是另一个子线程的Looper对象,它也可是实现两个子线程之间的数据交互。但是,由于该构造方法被@hide隐藏了,我们使用的默认无参构造器,最终会使callbackLooper为null,也就是说mHandler赋值为getMainHandler()绑定了主线程。
下面是该handler实现的handleMessage方法,一个是处理结果,一个是处理进度的,这里就先不展开细说了,后边分析到工作线程执行到各个阶段时再细说。
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
mWorker,WorkerRunnable是抽象类,implements自Callable接口,对线程池框架比较熟悉的可以知道它将在子线程中调用执行,了解到这点就够了。
在mWorker实现的call()方法中最主要的一行代码doInBackground(mParams);它在这里执行了我们的工作任务,然后在catch异常的时候设置了标志位:mCancelled.set(true),最终在finally中执行了postResult()方法。postResult()的逻辑:
private Result postResult(Result result) {
//getHandler()会返回mHandler,也就是上面介绍的绑定了主线程Looper对象的Handler
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
这就和我们上面讲到的handleMessage对Message的处理联系了起来,最终会执行finish()方法,并在这里调用我们实现的onCancelled()或者onPostExecute()生命周期:
private void finish(Result result) {
//isCancelled()方法会判断是否是取消导致,
//比如上面说的执行mWorker.call()时catch异常和我们做活动调用AsyncTask的cancel()方法会使isCancelled()返回ture
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
WorkerRunnable除了implements Callable接口还添加了个mParams数组,也就是doInBackground(mParams),执行时会传入的参数
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
mFuture FutureTask本篇不做深入的了解,它的目的是获取子线程运算结果和提供子线程的中断方法。上面代码对于mFuture的初始化可以看到它将mWorker封装在了内部,所以以后的操作对象都变为了mFuture。mWork只是提供了call()方法(Callable接口)和参数mParams。
2.execute:
构造方法就是初始化了三个变量,mHandler,mWorker,mFuture。接下来我们看下execute()执行了哪些步奏:downloadTask.execute(object1, object2);
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
//内部调用了executeOnExecutor方法
return executeOnExecutor(sDefaultExecutor, params);
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
//在这里将mWorker的mParams变量赋值
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
可以看到execute()就是调用了executeOnExecutor()方法。executeOnExecutor()的前部分就是判断了当前AsyncTask的状态只有在Status.PENDING的状态才可以正常运行,并且在运行后立即将状态改为了Status.RUNNING,也就是说同一个AsyncTask对象只能execute()一次。
接着就是调用了onPreExecute();也就是我们执行任务前在主线程做一些初始化的操作,比如加载界面。
最后最重要的工作exec.execute(mFuture)来执行我们的工作任务。毋庸置疑,该方法最终会执行到mWorker的call()方法。我们现在要研究的就是这个Executor对象(即调用executeOnExecutor()时传入的sDefaultExecutor),它是怎样的线程池,我们传入的FutureTask对象将被如何调用。
sDefaultExecutor
终于我们要开始研究AsyncTask的核心内容了sDefaultExecutor(其实不是)。为什么说不是呢,因为sDefaultExecutor只是一个任务安排者,并不真正的创建线程执行任务。看下sDefaultExecutor的源码:
/**
*sDefaultExecutor最终会指向SerialExecutor对象
**/
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
//SerialExecutor并没有任何地方创建线程,execute也只是将任务放进ArrayDeque队列中
//之前我们传入的是FutureTask对象,而这里需要的参数是Runnable对象是因为,
//FutureTask也实现了Runnable接口,并将Callable接口(即mWorker)的call()方法在run()方法中调用
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
//THREAD_POOL_EXECUTOR才是最终执行任务的线程池
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
ok,我们先看下它是怎样安排任务的。1.将任务runnable稍作包装提交到mTasks中并插入到队列尾部。2.判断mActive是否为null,如果不为null则执行scheduleNext()。
第一次运行AsyncTask时肯定为null,所以会立即执行scheduleNext(),可以看到mTasks.poll()会从队列首部取出任务赋值给mActive,并且通过真正的线程池THREAD_POOL_EXECUTOR去执行。
当任务完成后会在finally中调用scheduleNext()方法去查看mTasks队列中是否有剩余任务,如果有则安排,没有则mActive置为null。保证下次创建新的AsyncTask时,mActive为正确值(因为sDefaultExecutor为static变量,全局只有一个sDefaultExecutor对象),这也表示如果短时间内创建多个AsyncTask,并且每个AsyncTask执行时间还比较长,那后执行AsyncTask.execute()方法的任务将需要等待前面的任务执行完成才能开始。也就是说AsyncTask其实是串行执行的。
若想并行执行也是有方法的,直接调用executeOnExecutor(THREAD_POOL_EXECUTOR, params);跳过sDefaultExecutor的代理。
THREAD_POOL_EXECUTOR
终于到了研究线程池的这一步了
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), sThreadFactory);
threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
核心线程数:CORE_POOL_SIZE = 1,最大线程数:MAXIMUM_POOL_SIZE = 20,空闲线程持续时间KEEP_ALIVE_SECONDS = 3(秒),SynchronousQueue这是等待队列,线程工厂: sThreadFactory,最后设置了拒绝策略sRunOnSerialPolicy。
SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列。也就是说最多同时执行20个线程任务。再有新的任务则会执行拒绝策略sRunOnSerialPolicy,我们看下sRunOnSerialPolicy的源码:
private static final int BACKUP_POOL_SIZE = 5;
private static final RejectedExecutionHandler sRunOnSerialPolicy =
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
android.util.Log.w(LOG_TAG, "Exceeded ThreadPoolExecutor pool size");
// As a last ditch fallback, run it on an executor with an unbounded queue.
// Create this executor lazily, hopefully almost never.
synchronized (this) {
if (sBackupExecutor == null) {
sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
sBackupExecutor = new ThreadPoolExecutor(
BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
sBackupExecutor.allowCoreThreadTimeOut(true);
}
}
sBackupExecutor.execute(r);
}
};
很简单,又创建了个备用线程池,只有BACKUP_POOL_SIZE个核心线程(5个),然后核心线程也会超时,毕竟备用线程池,逻辑上来讲需要也不需要长时间保留。sBackupExecutorQueue的容量是Integer.Max,所以说若短时间执行大量任务,则会将后续的任务都添加在sBackupExecutorQueue中,然后等待备用线程的调用,这时候,主线程池的即使结束了任务,也不会去执行新的任务,sBackupExecutorQueue中的任务将由备用线程池的5个线程去执行。这点上没有注意到可能会影响AsyncTask的使用效率:
总结:
整体来说AsyncTask思路很简单,就是创建线程池,在子线程执行任务前调用onPreExecute(),在子线程中调用doInBackground(),然后执行过程中调用publishProgress()通过handler发送进度到主线程执行onProgressUpdate(),最后执行完成后再通过handler发送Result到主线程执行onPostExecute()或者onCancelled()
需要多注意的就是对于线程池要有一定的了解,否则读起源码来将有些困难。