线程池ThreadPoolExecutor
JDK5带来的一大改进就是Java的并发能力,它提供了三种并发武器:并发框架Executor,并发集合类型如ConcurrentHashMap,并发控制类如CountDownLatch等;圣经《Effective Java》也说,尽量使用Exector而不是直接用Thread类进行并发编程。
AsyncTask内部也使用了线程池处理并发;线程池通过ThreadPoolExector
类构造,这个构造函数参数比较多,它允许开发者对线程池进行定制,我们先看看这每个参数是什么意思,然后看看Android是以何种方式定制的。
ThreadPoolExecutor的其他构造函数最终都会调用如下的构造函数完成对象创建工作:
1234567
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize: 核心线程数目,即使线程池没有任务,核心线程也不会终止(除非设置了allowCoreThreadTimeOut参数)可以理解为“常驻线程”
maximumPoolSize: 线程池中允许的最大线程数目;一般来说,线程越多,线程调度开销越大;因此一般都有这个限制。
keepAliveTime: 当线程池中的线程数目比核心线程多的时候,如果超过这个keepAliveTime的时间,多余的线程会被回收;这些与核心线程相对的线程通常被称为缓存线程
unit: keepAliveTime的时间单位
workQueue: 任务执行前保存任务的队列;这个队列仅保存由execute提交的Runnable任务
threadFactory: 用来构造线程池的工厂;一般都是使用默认的;
handler: 当线程池由于线程数目和队列限制而导致后续任务阻塞的时候,线程池的处理方式。
那么,当一个新的任务到达的时候,线程池中的线程是如何调度的呢?(别慌,讲这么一大段线程池的知识,是为了理解AsyncTask;Be Patient)
如果线程池中线程的数目少于corePoolSize,就算线程池中有其他的没事做的核心线程,线程池还是会重新创建一个核心线程;直到核心线程数目到达corePoolSize(常驻线程就位)
如果线程池中线程的数目大于或者等于corePoolSize,但是工作队列workQueue没有满,那么新的任务会放在队列workQueue中,按照FIFO的原则依次等待执行;(当有核心线程处理完任务空闲出来后,会检查这个工作队列然后取出任务默默执行去)
如果线程池中线程数目大于等于corePoolSize,并且工作队列workQueue满了,但是总线程数目小于maximumPoolSize,那么直接创建一个线程处理被添加的任务。
如果工作队列满了,并且线程池中线程的数目到达了最大数目maximumPoolSize,那么就会用最后一个构造参数handler
处理;**默认的处理方式是直接丢掉任务,然后抛出一个异常。
总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于 corePoolSize,再看缓冲队列 workQueue 是否满,最后看线程池中的线程数量是否大于 maximumPoolSize。另外,当线程池中的线程数量大于 corePoolSize 时,如果里面有线程的空闲时间超过了 keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。
我们以API 22为例,看一看AsyncTask里面的线程池是以什么参数构造的;AsyncTask里面有“两个”线程池;一个THREAD_POOL_EXECUTOR
一个SERIAL_EXECUTOR
;之所以打引号,是因为其实SERIAL_EXECUTOR
也使用THREAD_POOL_EXECUTOR
实现的,只不过加了一个队列弄成了串行而已,那么这个THREAD_POOL_EXECUTOR
是如何构造的呢?
123456789
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;private static final int KEEP_ALIVE = 1;private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
可以看到,AsyncTask里面线程池是一个核心线程数为CPU + 1
,最大线程数为CPU * 2 + 1
,工作队列长度为128的线程池;并且没有传递handler
参数,那么使用的就是默认的Handler(拒绝执行).
那么问题来了:
如果任务过多,那么超过了工作队列以及线程数目的限制导致这个线程池发生阻塞,那么悲剧发生,默认的处理方式会直接抛出一个异常导致进程挂掉。假设你自己写一个异步图片加载的框架,然后用AsyncTask实现的话,当你快速滑动ListView的时候很容易发生这种异常;这也是为什么各大ImageLoader都是自己写线程池和Handlder的原因。
这个线程池是一个静态变量;那么在同一个进程之内,所有地方使用到的AsyncTask默认构造函数构造出来的AsyncTask都使用的是同一个线程池,如果App模块比较多并且不加控制的话,很容易满足第一条的崩溃条件;如果你不幸在不同的AsyncTask的doInBackgroud里面访问了共享资源,那么就会发生各种并发编程问题。
在AsyncTask全部执行完毕之后,进程中还是会常驻corePoolSize个线程;在Android 4.4 (API 19)以下,这个corePoolSize是hardcode的,数值是5;API 19改成了cpu + 1
;也就是说,在Android 4.4以前;如果你执行了超过五个AsyncTask;然后啥也不干了,进程中还是会有5个AsyncTask线程;不信,你看:
Handler
AsyncTask里面的handler很简单,如下(API 22代码):
12345
private static final InternalHandler sHandler = new InternalHandler();public InternalHandler() { super(Looper.getMainLooper());}
注意,这里直接用的主线程的Looper;如果去看API 22以下的代码,会发现它没有这个构造函数,而是使用默认的;默认情况下,Handler会使用当前线程的Looper,如果你的AsyncTask是在子线程创建的,那么很不幸,你的onPreExecute
和onPostExecute
并非在UI线程执行,而是被Handler post到创建它的那个线程执行;如果你在这两个线程更新了UI,那么直接导致崩溃。这也是大家口口相传的AsyncTask必须在主线程创建的原因。
另外,AsyncTask里面的这个Handler是一个静态变量,也就是说它是在类加载的时候创建的;如果在你的APP进程里面,以前从来没有使用过AsyncTask,然后在子线程使用AsyncTask的相关变量,那么导致静态Handler初始化,如果在API 16以下,那么会出现上面同样的问题;这就是AsyncTask必须在主线程初始化 的原因。
事实上,在Android 4.1(API 16)以后,在APP主线程ActivityThread的main函数里面,直接调用了AscynTask.init
函数确保这个类是在主线程初始化的;另外,init这个函数里面获取了InternalHandler
的Looper,由于是在主线程执行的,因此,AsyncTask的Handler用的也是主线程的Looper。这个问题从而得到彻底的解决。
AsyncTask是并行执行的吗?
现在知道AsyncTask内部有一个线程池,那么派发给AsyncTask的任务是并行执行的吗?
答案是不确定。在Android 1.5刚引入的时候,AsyncTask的execute
是串行执行的;到了Android 1.6直到Android 2.3.2,又被修改为并行执行了,这个执行任务的线程池就是THREAD_POOL_EXECUTOR
,因此在一个进程内,所有的AsyncTask都是并行执行的;但是在Android 3.0以后,如果你使用execute
函数直接执行AsyncTask,那么这些任务是串行执行的;(你说蛋疼不)源代码如下:
123
public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params);}
这个sDefaultExecutor
就是用来执行任务的线程池,那么它的值是什么呢?继续看代码:
1
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
因此结论就来了:Android 3.0以上,AsyncTask默认并不是并行执行的;
为什么默认不并行执行?
也许你不理解,为什么AsyncTask默认把它设计为串行执行的呢?
由于一个进程内所有的AsyncTask都是使用的同一个线程池执行任务;如果同时有几个AsyncTask一起并行执行的话,恰好AysncTask的使用者在doInbackgroud
里面访问了相同的资源,但是自己没有处理同步问题;那么就有可能导致灾难性的后果!
由于开发者通常不会意识到需要对他们创建的所有的AsyncTask对象里面的doInbackgroud
做同步处理,因此,API的设计者为了避免这种无意中访问并发资源的问题,干脆把这个API设置为默认所有串行执行的了。如果你明确知道自己需要并行处理任务,那么你需要使用executeOnExecutor(Executor exec,Params... params)
这个函数来指定你用来执行任务的线程池,同时为自己的行为负责。(处理同步问题)
实际上《Effective Java》里面有一条原则说的就是这种情况:不要在同步块里面调用不可信的外来函数。这里明显违背了这个原则:AsyncTask这个类并不知道使用者会在doInBackgroud
这个函数里面做什么,但是对它的行为做了某种假设。
如何让AsyncTask并行执行?
正如上面所说,如果你确定自己做好了同步处理,或者你没有在不同的AsyncTask里面访问共享资源,需要AsyncTask能够并行处理任务的话,你可以用带有两个参数的executeOnExecutor
执行任务:
1234567
new AsyncTask<Void, Void, Vo @Override protected Void doInBackground(Void... params) { // do something return null; }}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
更好的AsyncTask
从上面的分析得知,AsyncTask有如下问题:
默认的AsyncTask如果处理的任务过多,会导致程序直接崩溃;
AsyncTask类必须在主线程初始化,必须在主线程创建,不然在API 16以下很大概率崩溃。
如果你曾经使用过AsyncTask,以后不用了;在Android 4.4以下,进程内也默认有5个AsyncTask线程;在Android 4.4以上,默认有CPU + 1
个线程。
Android 3.0以上的AsyncTask默认是串行执行任务的;如果要并行执行需要调用低版本没有的API,处理麻烦。
因此我们对系统的AsyncTask做了一些修改,在不同Android版本提供一致的行为,并且提高了使用此类的安全性,主要改动如下:
添加对于任务过多导致崩溃的异常保护;在这里进行必要的数据统计上报工作;如果出现这个问题,说明AsyncTask不适合这种场景了,需要考虑重构;
移植API 22对于Handler的处理;这样就算在线程创建异步任务,也不会有任何问题;
提供串行执行和并行执行的execute
方法;默认串行执行,如果明确知道自己在干什么,可以使用executeParallel
并行执行。
在doInbackgroud
里面频繁崩溃的地方加上try..catch
;自己处理数据上报工作。
完整代码见gist,BetterAsyncTask
原文地址:http://weishu.me/2016/01/18/dive-into-asynctask/
大体流程
• excute()方法中首先直接调用preExcute()方法
• AsyncTask的构造方法构造了mWorkder这个WorkerRunnable对象.再用mWorker构造一个FutureTask对象丢到线程池里面去执行
• AsyncTask的成员变量有个InternalHandler,构造的时候进行初始化.
• FutureTask转调mWorker的run方法最后在线程池的子线程调用doInBackGround()方法.
• 然后FutureTask调用setResult()方法把运行结果返回,完成后调用FutureTask的done方法.这里面用InternalHandler发了个消息给主线程,最后拿到结果调用 finish()
- FutureTask类
它需要Callable接口类型的参数,在AsyncTask类中,创建了WorkerRunnable的实现类和FutureTask类,在run方法中调用
private static abstract class WorkerRunnable<Params, Result>
implements Callable<Result> {
Params[] mParams;
}
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
//调用熟悉的doInBackground方法,在子线程中执行,
return postResult(doInBackground(mParams));
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occured
while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
//执行execute方法时
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
onPreExecute();
mWorker.mParams = params;
//将任务添加到Executor的实现类ThreadPoolExecutor里面
exec.execute(mFuture);
}
//FutureTask任务
public class FutureTask<V> implements RunnableFuture<V>{
private volatile int state;//任务运行的状态
//构造函数java.util.concurrent.Callable
public FutureTask(Callable<V> callable) {
}
//取消任务通过线程执行interrupt
public boolean cancel(boolean mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
//此方法调用awaitDone,它会阻塞线程,一直等到线程执行完成
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
//变量c就是构造函数传的Callable<V> callable对象
public void run() {
if (c != null && state == NEW) {
V result;
result = c.call();
//call里面的任务执行完毕之后,在set方法里面调用finishCompletion方法,
之后调用done方法表示任务执行完成。
set(result);
}
}
//消除和信号,所有等待的线程,调用done()
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
//异步任务执行完成,回调
protected void done() {
}
}
ThreadPoolExecutor
任务执行在这个类里面用到了ReentrantLock锁,实现了ExecutorService接口。
//addWorker方法第二个参数core含义:true表示核心线程5,false表示非核心线程128
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
//调用handler.rejectedExecution抛出异常
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
for (;;) {
int wc = workerCountOf(c);//当前work线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
Worker w = new Worker(firstTask);
Thread t = w.thread;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.add(w);
} finally {
mainLock.unlock();
}
t.start();
return true;
}
InternalHandler
任务执行完时在子线程,此时会发送一个handler消息,handler接收到这个消息就会调用AsyncTask的finish方法,接着调用onPostExecute。
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
private static class InternalHandler extends Handler {
public void handleMessage(Message msg) {
AsyncTaskResult result = (AsyncTaskResult) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
异步任务执行的结果,主线程无法轻易的获取
Java FutureTask 异步任务操作提供了便利性
1.获取异步任务的返回值
2.监听异步任务的执行完毕
3.取消异步任务
doBackground(call)
call的返回值在Future的done方法中获取
->onPostExecute
new MyTask().execute();
实例化:
new AsyncTask() -> new FutureTask()
执行:
Executor.execute(mFuture) -> SerialExecutor.myTasks(队列)
-> (线程池)THREAD_POOL_EXECUTOR.execute
线程池中的所有线程,为了执行异步任务
CORE_POOL_SIZE 核心线程数
MAXIMUM_POOL_SIZE 最大线程数量
KEEP_ALIVE 1s闲置回收
TimeUnit.SECONDS 时间单位
sPoolWorkQueue 异步任务队列
sThreadFactory 线程工厂
如果当前线程池中的数量小于corePoolSize,创建并添加的任务。
如果当前线程池中的数量等于corePoolSize,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。
当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。
public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
1.线程池容量不够抛出异常
2.内存泄露
3.一个线程,一个异步任务(?)
测试代码
public class FutureTest1 {
public static void main(String[] args) {
System.out.println("main ID: " + Thread.currentThread().getId());
Task work = new Task();
FutureTask<Integer> future = new FutureTask<Integer>(work) {
// 异步任务执行完成,回调
@Override
protected void done() {
try {
System.out.println("done:" + get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
// 线程池(使用了预定义的配置)
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(future);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// 取消异步任务
// future.cancel(true);
try {
// 阻塞,等待异步任务执行完毕
System.out.println(future.get()); // 获取异步任务的返回值
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 异步任务
static class Task implements Callable<Integer> {
// 返回异步任务的执行结果
@Override
public Integer call() throws Exception {
int i = 0;
for (; i < 10; i++) {
try {
System.out.println(Thread.currentThread().getName() + "_"
+ i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("ID: " + Thread.currentThread().getId());
return i;
}
}
}