概述
在android里面多线程编程的话肯定是会用到handler的,因为需要通过handler机制来进行线程间的通信,比如我们经常用的耗时操作都是必须在子线程中处理的像网络访问这类的等等,使用handler的话需要编写的代码量就相对较多,使用android给拱了另外一总简便的方式AsyncTask,它里面消息传递也是通过handler实现的,我们不用去创建这个handler而已,下面就记录一下自己学习AsyncTask源码实现的笔记。
它的优点:
- 方便代码量少,可读性更强,内部采用的了线程池机制,能有效的管理线程。
- 可以定义线程池的运行方式,是串行还是多线程同时并行,之前在项目里面就遇到过这类的问题,由于默认是串行的,如果某一个线程卡住了的话就会导致下一个线程一直得不到运行的,比如在listview里面解码图片的时候,如果某一张大图解码耗时的话会导致后面的图片也刷不出来。也跟android版本有关,默认是使用AsyncTask默认的异步线程池THREAD_POOL_EXECUTORAndroid 3.1 之后使用的是?SERIAL_EXECUTOR
- 提供了回调机制,可以很方便的传递在子线程中运行的结果。
AsyncTask 是一个抽象类(public abstract class AsyncTask<Params, Progress, Result>),使用时需要需要你自己写一个类然后去继承它并实现它的抽象方法,并且AsyncTask中定义了3个泛型,这3个类型的是通过你使用的时候传进来的,分别表示的意思是:你传进任务的参数,任务进行的进度、任务执行的结果。
asynctask 中提供了2中线程池:分别是THREAD_POOL_EXECUTOR 和 SERIAL_EXECUTOR分别表示的意思是:
- THREAD_POOL_EXECUTOR 多个任务可以在线程池中异步并发执行。
- SERIAL_EXECUTOR 把多个线程按串行的方式执行,所以是同步执行的。 也就是说,只有当一个线程执行完毕之后,才会执行下个线程。
SERIAL_EXECUTOR 其实就是 自己implements Executor 实现了里面的 execute方法采用了队列的形式,
下面 从源码分析开始。
放一段我们使用的例子,再从例子中切入:
private class MyAsyncTask extends AsyncTask<Void, Void, Bitmap> {
private String mUrl;
private ImageView mImageView;
public MyAsyncTask(String url, ImageView imageView) {
mUrl = url;
mImageView = imageView;
}
@Override
protected Bitmap doInBackground(Void... params) {
return getBitmapFromUrl(mUrl);
}
@Override
protected void onPostExecute(Bitmap bitmap) {
super.onPostExecute(bitmap);
// 处理结果
}
}
从new 这个实例看起,也就是构造函数:
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
//WorkerRunnable 定义,继承Callable 接口
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams;}
再看看FutureTask , FutureTask<V> implements RunnableFuture<V>,
而 public interface RunnableFuture<V> extends Runnable, Future<V> ,所以 它同时要实现这2个interface的方法,FutureTask 传进去的参数是 mWorker,所以mWorker的call 就是FutureTask 会掉,在哪呢?在FutureTask 的run方法里面,
FutureTask public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 这里call 回调被调用
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
接着就是 FutureTask的run方法怎么时候会被调用呢,那就是在execute里面了,下面看 execute这个方法
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);// 再去看看这个方法里面
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture); // 这个是时候就调用了 Executor 的execute方法,Executor 是一个interface,谁实现了它就执行真正的实现, 默认是 SerialExecutor,
return this;
}
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
// sDefaultExecutor 默认是串行方式也就是 SerialExecutor,
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
//还有一个 Executor 的实现类
public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
这个时候走到了 SerialExecutor的execute 方法里面
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run(); //调用了 Runnable 的run 也就是 FutureTask的run ,上面讲的call也就被调用了
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
看下这段代码是如何保证了串行化的
SerialExecutor中有两个很重要的成员变量,一个是ArrayDeque<Runnable>类型的mTasks,它是一个双端队列,存储了我们要执行的任务。还有一个是Runnable类型的mActive,初始值为null。
接下来就到了最为关键的execute方法了。execute方法接收的参数是一个Runnable对象,到这里,有的同学可能感觉到有点奇怪,在executeOnExecutor方法中,我们的Executor的execute方法接收的是一个FutureTask的对象,为什么两个execute方法接收的参数不匹配呢?其实是这样的,FutureTask本身就实现了RunnableFuture接口,RunnableFuture接口又继承了Runnable, Future接口,
所以,FutureTask完全可以当作一个Runnable来用。我们继续去看SerialExecutor的execute方法,首先,新建了一个Runnable任务,在其run方法中有一个try,finally结构,try中直接去调用了传入的Runnable对象的run方法,finally中调用了scheduleNext方法。之后将这个新建立的Runnable任务放入双端队列的尾部。接下来会去判断mActive这个变量是否为null,若为null,执行scheduleNext方法。第一个任务到来时mActive肯定是null,所以肯定会去执行scheduleNext方法。我们再去看一下scheduleNext方法,它会从双端队列的队头取出一个元素,赋给mActive变量,如果此时的mActive变量不为null,则利用线程池THREAD_POOL_EXECUTOR来执行这个任务。
想象一下,如果在第一个任务执行的过程中,又来了第二个任务,会发生什么事情呢?首先,依然是对我们传入的Runnable任务进行重新封装,入队,之后会再去判断mActive是否为null,此时,mActive是不为null的,所以不会再去执行scheduleNext方法。那我们第二个任务就永远得不到执行了吗?其实不是的,我们回到之前的try,finally结构,我们发现,当try中的任务逻辑执行完成之后,会在finally中调用scheduleNext方法,也就是说,当我们第一个任务执行完成之后,会再去调用scheduleNext方法,在scheduleNext方法中,会从双端队列中取出第二个任务,交给线程池去执行,由此,任务的执行变成串行化了。
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
下面继续看看 当执行到了 mWorker 的 call 里面 ,这个是就开始 执行 doInBackground这个方法了,注意这个时候已经是在子线程中运行了,并且doInBackground是带有一个返回值的,最后执行 return postResult(result) ,它就是通过handler发了一个消息出去,
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//看看这消息是谁来接收的
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper()); //主线程的loop,那么接收这个消息的肯定在主线程中执行的了
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
//主动通知主线程更新,我们平时用的通知进度条之类的
@WorkerThread
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
//再看看 finished方法里面
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
以上基本就看出来看我们在应用中使用的额几个回调是怎么会执行的了,
如果我们要使用THREAD_POOL_EXECUTOR的话可以用
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params)
exec 设置为AsyncTask.THREAD_POOL_EXECUTOR,
也可以自定义线程池:Executor executor = new ThreadPoolExecutor(10,50,10, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(100));
private static final int CORE_POOL_SIZE = CPU_COUNT + 1; //实例化线程池时传进去的参数
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
项目当中的使用各自取舍吧。