使用
提供了简单使用线程的方式,必须使用AsyncTask子类,然后调用execute执行任务即可。
AsyncTask<Params, Progress, Result> {
//在任务执行前调用,在UI线程中运行
protected void onPreExecute() {
}
//抽象方法,在工作线程中执行
protected abstract Result doInBackground(Params... params);
//在UI线程中运行,在任务执行完成后,调用,并返回结果
protected void onPostExecute(Result result) {
}
//在UI线程中运行,每次调用返回任务进度
protected void onProgressUpdate(Progress... values) {
}
//用于提交任务进度,可以在doInBackground中调用
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
}
源码分析
主要采用 线程池 + handler 的机制, 看一下源码分析
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask";
//如果看过java 线程池实现的与源码,看到地下这几个参数,应该会很熟悉,这就是线程池子的基本参数
//核心保留的线程数量就一个
private static final int CORE_POOL_SIZE = 1;
//运行中最大的线程数量
private static final int MAXIMUM_POOL_SIZE = 20;
//???
private static final int BACKUP_POOL_SIZE = 5;
//线程空闲之后的存活时间
private static final int KEEP_ALIVE_SECONDS = 3;
//这个工厂方法就是用来创建线程的
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
//不要小看这个原子性的操作,实际上包含很多并发的知识点
private final AtomicInteger mCount = new AtomicInteger(1);
//创建一个线程
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//执行被拒绝的时候,使用的线程池
private static ThreadPoolExecutor sBackupExecutor;
//这是一个任务队列,存储一些,待执行的任务
private static LinkedBlockingQueue<Runnable> sBackupExecutorQueue;
//这是当任务不能被执行时,调用的处理方法
private static final RejectedExecutionHandler sRunOnSerialPolicy =
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
android.util.Log.w(LOG_TAG, "Exceeded ThreadPoolExecutor pool size");
// As a last ditch fallback, run it on an executor with an unbounded queue.
// Create this executor lazily, hopefully almost never.
synchronized (this) {
if (sBackupExecutor == null) {
//创建一个任务队列
sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
//创建一个线程池,用于处理被拒绝的任务
sBackupExecutor = new ThreadPoolExecutor(
BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
//核心线程不超时, 会一直运行,不会终止,即使没有任务
sBackupExecutor.allowCoreThreadTimeOut(true);
}
}
//执行任务
sBackupExecutor.execute(r);
}
};
//执行任务的(并行)线程池
public static final Executor THREAD_POOL_EXECUTOR;
static {
//创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), sThreadFactory);
//设置一下,任务被拒绝的时候,调用的方法,就是我们上面的sRunOnSerialPolicy 这个东西
threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
//串行执行的线程池
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
//来分析一下,串行线程池的实现
private static class SerialExecutor implements Executor {
//一个简单的任务队列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//当前活跃的任务
Runnable mActive;
public synchronized void execute(final Runnable r) {
//在队列尾部添加任务
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
//运行后,执行下一条
scheduleNext();
}
}
});
//当前如果没有活跃的任务,触发一下任务
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//从队列中获取一个任务
if ((mActive = mTasks.poll()) != null) {
//从这里也能看出来实际上还是借用并行的线程池去运行
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
//这两个应该是消息post出去的code,用来标记的
private static final int MESSAGE_POST_RESULT = 0x1;
private static final int MESSAGE_POST_PROGRESS = 0x2;
//默认使用的线程池是串行的
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
//这个是用于处理从子线程中抛出的数据
private static InternalHandler sHandler;
//很明了先直接继承了Handler类
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
//获取抛出的数据,到这里就是UI主线程了
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
//开始分类处理不同的数据, 任务结果 或者是 任务进度
switch (msg.what) {
case MESSAGE_POST_RESULT:
//调用AsyncTask的finihs接口表示任务结束了
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
//这里是进度的回调
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
//看一下抛到主线程的数据结构
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
//包含两个成员变量,一个是任务实例的引用, 一个是抛出的数据
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
//枚举变量,标识任务的状态
public enum Status {
/**
* 指示任务还没执行
*/
PENDING,
/**
* 指示任务开始运行
*/
RUNNING,
/**
* 指示任务执行完成
*/
FINISHED,
}
//看一下构造函数
public AsyncTask(@Nullable Looper callbackLooper) {
//从这里可以看到,实际上如果没有指定特定线程的Loop, 使用的是主线程的Loop
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
//设置进程优先级,没看懂, 但是不影响接下来的阅读
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//执行一下,重载的任务代码
result = doInBackground(mParams);
//这个也是看不明白,看资料这个代码有可能堵塞线程
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
//将结果抛出
postResult(result);
}
return result;
}
};
//创建一个Future处理一下相应的错误
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
//触发任务
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
//调用下面的方法executeOnExecutor
return executeOnExecutor(sDefaultExecutor, params);
}
//这里算是比较核心的代码了,
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
//看一下这里,使用默认的串行线程池去执行任务
exec.execute(mFuture);
return this;
}
}
总结
高并发的时候,不适合用,毕竟是串行执行的。串行的设计可以借用一下。