1. 导言
Android应用在运行时,所有UI相关的操作,必须放在主线程里,子线程(或者称之为工作线程)中如果进行UI操作会抛出异常。Android的UI模型被设计为单线程模型,即所有的UI操作都在同一个线程里(MainThread),这样做的好处,一是实现里边比较简单,不需要考虑线程安全的问题;二是对UI的所有操作都省去了加锁同步(否则会造成视图的不确定性)的过程,提高了效率,使得视图相关的操作都能尽快得到相应。
在Android异步消息处理机制源码剖析里,我们详细分析了Android异步消息处理机制,知道通过这一套机制可以完成线程间的通信,但如果涉及到相对比较复杂的逻辑,我们自己用这套机制实现起来还是比较麻烦,所以Google给我们提供了一个对异步消息机制封装好的工具类,即AsyncTask。具体用法这里不多做赘述,不了解的同学可以自行查阅资料,下面详细剖析下,AsyncTask是怎么实现的(基于Android 8.0的源码,各版本可能有所不同,比如线程池的核心线程数的设置等,感兴趣的同学可以自行查阅)。
2. 成员变量
private static final String LOG_TAG = "AsyncTask";
//获取CPU的核心数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
//静态常量域,设置线程池的核心线程数,从这里可以看出,核心线程数介于2~4之间
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//静态常量域,设置线程池最大线程数,与CPU核心线程数有关,cpu核心数的2倍+1
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//静态常量域,设置线程池中的线程存活时间
private static final int KEEP_ALIVE_SECONDS = 30;
//静态常量域,线程工厂类
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
//线程计数器
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
//给新创建的线程绑定特定标记
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//静态常量域,阻塞队列采用链式有界队列,队列大小为128
private static final BlockingQueue < Runnable > sPoolWorkQueue =
new LinkedBlockingQueue < Runnable > (128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
//静态常量域,全局线程池
public static final Executor THREAD_POOL_EXECUTOR;
//静态代码块
static {
//初始化线程池
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
//允许核心线程超时
threadPoolExecutor.allowCoreThreadTimeOut(true);
//把创建的线程池,赋值给常量THREAD_POOL_EXECUTOR
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
//标识是否取消Task的执行
private final AtomicBoolean mCancelled = new AtomicBoolean();
//标识Task是否开始执行
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
- 线程池的核心线程数介于2~4之间;线程池最大线程数,与CPU核心线程数有关,cpu核心数的2倍+1;线程保活时间是30s;阻塞队列采用链式有界队列,上限128;
- 在AsyncTask类被加载的时候,根据上述设置的参数,初始化了一个线程池THREAD_POOL_EXECUTOR,它其实就是Task任务的真正执行者;
3. SerialExecutor
AsyncTask中定义了SerialExecutor来串行的调度任务,来保证任务虽然可以异步提交,但是得到的是同步执行。
//静态常量域,类加载时把SERIAL_EXECUTOR初始化为SerialExecutor的实例
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
//静态域,volatile修饰,保证线程间变量的可见证,默认Executor的对象为初始化过的SERIAL_EXECUTOR,即串行调度器
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
//静态内部类,实现了Executor接口,需实现execute(Runnable command)方法
private static class SerialExecutor implements Executor {
//定义了一个双端队列,用于入队和出队Runnable对象
final ArrayDeque < Runnable > mTasks = new ArrayDeque < Runnable > ();
//记录当前被执行的Runnable对象
Runnable mActive;
//加锁同步,同一时刻只能有一个线程调用该方法
public synchronized void execute(final Runnable r) {
//execute方法将根据传入的Runnale构建的新的Runnalbe对象进行入队,
//不直接将传入的Runnable入队的原因是要在这控制任务的串行执行
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
//在r.run()执行完或者抛出异常时,才会调用scheduleNext()方法
scheduleNext();
}
}
});
//如果aActive等于空的时候,会调用scheduleNext()方法,真正调度任务的执行
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//从队列中进行出队操作,如果出队成功,把出队的Runnable交给初始化的线程池来执行
//mActivit记录的是出队的Runnable对象
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
/** @hide */
//可以调用这个方法设置Executor,但这个方式hide方法,用户正常情况下无法调用,所以只能用默认的串行调度器(SerialExecutor)
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
- 默认的Executor对象是SerialExecutor的实例,对提交的任务先进行入队,然后一个一个的从队列中取出Runnable对象,然后再提交给线程池THREAD_POOL_EXECUTOR来执行;
- 我们无法设置Executor对象,因为setter方法是hide方法,系统可以调用;
- 在Runnable执行的过程中,如果遇到异常,或者执行完,可以调度下一个Runnable对象进行执行,所以这里注意,AsyncTask当有Task遇到死循环的时候,会阻塞任务的执行;
- AsyncTask中的异步任务,只会执行一次,在调度执行的时候,会将对象出队,所以用AsyncTask执行的异步任务不具备多次执行的能力;
4. AsyancTask中的Handler
//发布结果的Message的what值
private static final int MESSAGE_POST_RESULT = 0x1;
//发布阶段性Message的what值
private static final int MESSAGE_POST_PROGRESS = 0x2;
//静态域,绑定了主线程的Looper和MessageQueue的Handler对象
private static InternalHandler sHandler;
//常量域,标识当前AsyncTask的mHandler,一般与sHandler是同一个,下文分析
private final Handler mHandler;
//静态内部类
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
//处理消息,在Looper所在线程处理(一般是主线程)
@SuppressWarnings({ "unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
//数据通过Message的obj域进行传递
AsyncTaskResult < ?>result = (AsyncTaskResult < ?>) msg.obj;
switch (msg.what) {
//根据what值,看是发布最后执行结果,还是发布阶段性的结果,执行不同的操作
case MESSAGE_POST_RESULT:
// There is only one result
//mTask对象是一个AsyacTask对象,即执行本对象的finish方法。最终结果的返回值只有一个,而非数组
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
//如果是发布阶段性结果,执行本对象的onProgressUpdate方法。
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
//AsyncTaskResult对象记录结果和AsyancTask对象,将任务的结果和任务本身绑定在一起
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
- AsyncTask中有两个Handler对象,一个是全局的sHandler,它绑定了主线程的Looper和MessageQueue,下文我们将会分析;另一个Hander对象是私有成员变量mHandler,它是AscyncTask对象自身的Handler对象,但是一般两者是一个对象;
- sHandler对象是静态内部类InternalHandler对象的实例,它覆写了消息处理方法handleMessage(),从源码里可以知道, result.mTask.finish(result.mData[0]);和 result.mTask.onProgressUpdate(result.mData);的执行,与Handler所在的线程是同一个,由于sHandler是在主线程,所以finish方法和onProgressUpdate方法是在主线程执行的,这也就是为什么我们可以在onProgressUpdate方法和onPostExecute方法中去更新UI,另外两个抽象方法onPreExecute和doInBackgroud方法的执行,在接下来的源码进行分析;
5. 构造方法
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
//该方法是开发者唯一可以调用的构造方法
public AsyncTask() {
this((Looper) null);
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
* @hide
*/
//hide方法,可以把传递Looper参数用以初始化Handler方法,不过只能系统来调用
public AsyncTask(@Nullable Handler handler) {
this(handler != null ? handler.getLooper() : null);
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
* @hide
*/
//hide方法,开发者调用的第一个构造方法,最后会调用到这里,但是callbackLooper==null
public AsyncTask(@Nullable Looper callbackLooper) {
//应用里创建的AsyncTask的mHandler = sHandler,是绑定了主线程Looper和MessageQueue的Handler
mHandler = callbackLooper == null ||
callbackLooper == Looper.getMainLooper() ?
getMainHandler() : new Handler(callbackLooper);
mWorker = new WorkerRunnable < Params, Result > () {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch(Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask < Result > (mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch(InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch(ExecutionException e) {
throw new RuntimeException("An error occurred
while executing doInBackground()", e.getCause());
} catch(CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
//单例,初始化sHandler
private static Handler getMainHandler() {
synchronized(AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler(Looper.getMainLooper());
}
return sHandler;
}
}
- 我们开发的应用程序只能调用到无参的构造方法,另外两个有参构造方法属于hide方法,应用程序无法调用;
- 应用程序里的mHander == sHandler,sHandler是InternalHandler的单例对象,它绑定了主线程的Looper和MessageQueue,所以用mHandler发送和接收消息,都是在主线程中;
- 构造方法还初始化了一个WorkRunnable对象和FutureTask对象,我们接下来对它们进行分析;
6. WorkRunnable和FutureTask对象
在AsyncTask里,定义了一个WordRunnable变量和一个FutureTask变量,并在构造函数里对它们进行了初始化。
private final WorkerRunnable <Params,Result> mWorker;
private final FutureTask <Result> mFuture;
public AsyncTask(@Nullable Looper callbackLooper) {
...
//初始化mWorker,实现匿名内部抽象类,覆写call()方法
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
//标志Task任务开始被执行
mTaskInvoked.set(true);
Result result = null;
try {
//将执行当前call方法所在线程的优先级为后台线程
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
//在子线程中执行doInBackgroud()方法,并将结果放在result变量中
result = doInBackground(mParams);
Binder.flushPendingCommands();
}
catch (Throwable tr) {
//如果遇到异常,将mCancelled变量设置为true
mCancelled.set(true);
throw tr;
}
finally {
//doInBackgroud()方法执行完毕或者遇到异常,执行postResult(result)方法去post执行结果
postResult(result);
}
return result;
}
};
//初始化Future对象,需要把一个实现Callable接口的对象作为参数,这里传的是初始化的mWorker
mFuture = new FutureTask<Result>(mWorker) {
//当Runnable对象执行完或者被取消,执行done()方法
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
}
catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
}
catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
}
catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
}
;
}
//WorkRunnable内部记录传递的参数,但是没有实现接口Callable中的call方法,故为抽象类
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
- 由于AsyncTask能够取消任务,所以AsyncTask使用了FutureTask以及与其相关的Callable。
- FutureTask、Callable在Java的并发编程中是比较常见的,可以用来获取任务执行完之后的返回值,也可以取消线程池中的某个任务。Callable是一个接口,其内部定义了call方法,在call方法内需要编写代码执行具体的任务,在这一点上Callable接口与Runnable接口很类似,不过不同的是Runnable的run方法没有返回值,Callable的call方法可以指定返回值。FutureTask类同时实现了Callable接口和Runnable接口,FutureTask的构造函数中需要传入一个Callable对象以对其进行实例化。Executor的execute方法接收一个Runnable对象,由于FutureTask实现了Runnable接口,所以可以把一个FutureTask对象传递给Executor的execute方法去执行。当任务执行完毕的时候会执行FutureTask的done方法。在任务执行的过程中,我们也可以随时调用FutureTask的cancel方法取消执行任务,任务取消后也会执行FutureTask的done方法。我们也可以通过FutureTask的get方法阻塞式地等待任务的返回值(即Callable的call方法的返回值),如果任务执行完了就立即返回执行的结果,否则就阻塞式等待call方法的完成。
- mWorker其实是一个Callable类型的对象。实例化mWorker,实现了Callable接口的call方法。call方法是在线程池的某个线程中执行的,而不是运行在主线程中。在线程池的工作线程中执行doInBackground方法,执行实际的任务,并返回结果。当doInBackground执行完毕后,将执行完的结果传递给postResult方法。postResult方法我们后面会再讲解。
- mFuture是一个FutureTask类型的对象,用mWorker作为参数实例化了mFuture。在这里,其实现了FutureTask的done方法,我们之前提到,当FutureTask的任务执行完成或任务取消的时候会执行FutureTask的done方法。done方法中调用了postResultIfNotInvoked()方法,我们后面再讲解。
7. Status
//在初始化的时候就初始化为等待状态
private volatile Status mStatus = Status.PENDING;
public enum Status {
/**
* Indicates that the task has not been executed yet.
*/
PENDING,//等待状态
/**
* Indicates that the task is running.
*/
RUNNING,//运行状态
/**
* Indicates that {@link AsyncTask#onPostExecute} has finished.
*/
FINISHED,//结束状态
}
- AsyncTask定义了三种状态,等待、运行和结束状态,并在初始化的时候把mStatus变量置为等待状态,在执行的时候把mStatus置为运行状态,并在任务执行完或者取消执行的时候置为结束状态;
8.execute()方法和executeOnExecutor()方法
AsyncTask的执行有3个方法:
@MainThread
//静态方法,在不需要与主线程交互的情况下,可直接执行一个Runnable对象
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
@MainThread
//传递执行所需参数,实际调用的是executeOnExecutor方法,Executor采用的是默认的SerialExecutor,串行调度器
public final AsyncTask <Params,Progress,Result> execute(Params...params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask <Params,Progress,Result> executeOnExecutor(Executor exec, Params...params) {
//执行前先对mStatus状态进行判断,如果不是等待状态,抛出相应异常
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:" + " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)");
}
}
//把mStatus置为运行状态
mStatus = Status.RUNNING;
//执行onPreExecute方法,由于当前方法是在主线程,所以它也是在主线程得到执行
onPreExecute();
mWorker.mParams = params;
//用传入的Executor去执行mFuture(FutureTask实现了Runnable接口)
exec.execute(mFuture);
return this;
}
- AsyncTask提供了静态方法execute(Runnable runnable)方法,让我们可以用它直接去开启一个后台异步任务;
- 调用 execute(Params...params)方法时,会先执行onPreExecute()方法,然后用系统实现的SerialExecutor去执行任务,在工作线程中执行实现了Callable接口的WorkRunnable对象的call方法,该方法上述分析过了,方法内部会执行doInBackground()方法。所以,doInBackgroud方法禁止执行与UI相关的操作;
- 调用execute(Params...params)方法,整个执行过程是:
- 通过SerialExecutor内部的execute(final Runnable r)方法进行任务的调度,前边分析过,会将Runnable进行入队(此时入队的是一个FutureTask对象),并在mActive为空的情况下出队非空的Runnable进行执行;所以多个AsyncTask同时被执行时,会依次入队,但是执行的是通过SerialExecutor进行串行执行;
- 当出队的非空Runnable得到执行时,由于入队的其实是FutureTask对象,会去调用WorkRunnable中实现的call方法,call方法内部会执行doInBackground()方法;
- call方法执行完毕后调用postResult()方法将执行的结果发布回主线程;
- 虽然我们不能设置默认的Executor的对象sDefaultExecutor的值(它的setter方法是hide方法),但是可以直接调用executeOnExecutor(Executor exec,Params... params)方法来用自己实现的Executor来进行线程执行的调度,而不是采用系统默认实现的串行调度。但是不建议自己去改,如果AscyncTask的任务执行改成并发的话,如果同时有多个任务提交执行,系统的压力会比较大,这也是为什么守护线程的核心数只是0~4之间,也是为什么AsyncTask由原来的串行改为并行又改回串行执行的原因吧(不同系统实现不同,感兴趣的同学可以找不同版本的源码看看)。
- 在任务执行之前,会对mStatus进行判断,如果不是等待(Status.PENDING)状态,会抛异常,而且状态的转变是不可逆的;
9. 任务执行结果的更新
执行结果的发布,是先经过WorkRunnable和FutureTask的处理,通过Handler将包含了执行结果的Message发送到主线程,最后在Handler的handleMessage中调用onProgressUpdate()方法\onCancelled()方法\onPostExecute()方法,下面来梳理源码。
public AsyncTask(@Nullable Looper callbackLooper) {
...
mWorker = new WorkerRunnable <Params,Result> () {
public Result call() throws Exception {
...
try {
...
result = doInBackground(mParams);
...
} catch(Throwable tr) {
....
} finally {
//任务执行完会先调用postResult方法
postResult(result);
}
return result;
}
};
mFuture = new FutureTask < Result > (mWorker) {
@Override
////任务执行完或者被取消,会执行done方法
protected void done() {
Result result = null; //执行结果
try {
//get方法会阻塞的等待任务(WordRunnable)执行完后返回执行结果
postResultIfNotInvoked(get());
} catch(InterruptedException e) {
...
} catch(ExecutionException e) {
...
} catch(CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
//任务执行完毕,把结果post到主线程进行处理
private Result postResult(Result result) {
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult < Result > (this, result));
message.sendToTarget();
return result;
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
//只有AsyncTask还没有开始执行就被取消或其他原因结束后才会执行postResult,此时的result为null
if (!wasTaskInvoked) {
postResult(result);
}
}
- getHandler获取到的是InternalHandler的单例,绑定了主线程的Looper和MessageQueue;
- 在AsyncTask还没有开始执行就被取消或其他原因结束的情况下,会把null对象post到主线程处理,所以在onPostExecute和onCalled方法中,注意要对Result对象进行判空处理;
- 通过上述分析可以知道,如果WorkRunnable被执行完毕,会调用postResult方法将执行结果post到主线程进行处理,前边分析过InternalHandler的源码,知道会根据Message的what值进行不同的处理:
private static class InternalHandler extends Handler {
...
@Override
public void handleMessage(Message msg) {
AsyncTaskResult < ?>result = (AsyncTaskResult < ?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
- 如果是发布结果,还会根据是否取消而进行不同的处理,如果取消执行,则调用onCancelled()方法,否则执行onPostExecute()方法。这两个方法默认都是空实现,我们可以在子类中进行覆写;
- 如果是发布阶段性执行结果,则执行调用onProgressUpdate(),同样,它默认也是空实现;
- handler处理是在主线程中,所以onCancelled(),onPostExecute(),onProgressUpdate()的执行都是在主线程中;
- 在发布执行结果后,会将执行状态变为Satus.FINISH,AsyncTask执行结束,不可以再进行执行;
10.总结
AsyncTask的原理:
- 内部维护了一个线程池负责实际任务的执行,用Callable和FutureTask实现任务执行的可取消和有返回值,线程的调用,用系统默认实现的串行调度器SerialExecutor来执行。即由SerialExecutor来把FutureTask对象交给线程池来执行。
- 在发布结果时,首先由实现了Callable接口的WorkRunnable进行处理,然后分情况发布执行结果和阶段性结果,用绑定了主线程的Looper的InternalHandler把Message发送到主线程,然后再分情况进行处理;
注意事项: - onPreExecute()、onPostExecute()、onProgressUpdate()、onCancelled()的默认实现都为空,且运行都是在主线程,所以在这几个方法中不要执行耗时的操作;
- doInBackground()方法的执行是在子线程中,所以不能执行UI相关的操作;
- AsyncTask的执行不可逆,只能执行一次,所以需要多次执行的子线程任务,不适合用AsyncTask来实现。
如果有理解不正确的地方,欢迎指正交流。