引言
在Android开发中,只要是耗时的操作都需要开启一个线程来执行。例如网络访问必须放到子线程中执行,否则会抛异常(NetworkOnMainThreadException),这样做的目的也是为了防止用户在主线程中做耗时操作,这样很容易引起ANR。那么有了线程为什么还需要线程池呢?
线程的创建过程分为3步
- 创建线程 T1
- 执行线程 T2
- 销毁线程 T2
线程创建的总时间T=T1+T2+T3。 可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。
合理利用线程池能够带来三个好处。
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
线程池的使用
首先我们来看下Java中线程池的使用。
Executor
Java中线程池基于Executor接口。Executor 接口中定义了一个方法 void execute(Runnable command)
,该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。
ExecutorService
ExecutorService 继承自 Executor 接口,它提供了更丰富的实现多线程的方法。
-
shutdown()
方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。 -
submit(Runnable task)
方法与Executor的execute()
方法效果一样,但是submit有返回值,如果我希望task执行完后,每个task告诉我它的执行结果,是成功还是失败,如果是失败,原因是什么,这时候就需要用到submit,通过返回的Future来获取到结果
ThreadPoolExecutor
ThreadPoolExecutor 是正真线程池的实现类,我们创建线程的时候一般也是使用这个类。先看下构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 线程池的核心线程数,也可以理解为最小线程数,默认情况下,核心线程在线程池中是一直存活的,即使当前执行的线程数量小于核心线程数量,所有的核心线程均会被创建出来。如果将ThreadPoolExecutor的allowCoreThreadTimeOut设置为ture,那么核心线程就会有超时策略。
- maximumPoolSize 线程池所能容纳的最大线程数,包括核心和非核心线程,当活动线程到达最大值,后续的任务就会被阻塞。
- keepAliveTime 非核心线程闲置时的超时时长,当非核心线程空闲时间超过这个值,就会被终止。如果ThreadPoolExecutor的allowCoreThreadTimeOut设置为true,这个时间同样会作用在核心线程上;
- unit 时间单位
- workQueue 线程池中的任务队列,所有提交的Runable对象会存储在这个参数中
- threadFactory 创建新线程时使用的factory
- handler 当task出现异常时,会通过这个handler通知外界
那么线程池针对不同的线程数量又是怎么执行的呢,主要可以分为以下几种情况
- 线程数 < corePoolSize 所有核心线程都会用来处理任务。
- 线程数 = corePoolSize 缓冲队列workQueue未满,任务被放入缓冲队列。
- corePoolSize < 线程数 < maximumPoolSize 缓冲队列workQueue满,创建新的线程来处理被添加的任务。
- corePoolSize < maximumPoolSize < 线程数 缓冲队列workQueue满,通过 handler所指定的策略来处理此任务。
处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 handler又有以下四种选择
- CallerRunsPolicy 再次添加该task并执行execute方法
- AbortPolicy 放弃该task并抛出RejectedExecutionException
- DiscardPolicy 放弃当前task
- DiscardOldestPolicy 放弃等待时间最久的task
线程池的进一步封装
看了ThreadPoolExecutor的简单介绍是不是觉得使用过程比较复杂呢,没关系,Java针对这一点还可以通过Executors的工厂方法配置,handler均使用的是defaultHandler即AbortPolicy只要线程数量大于核心线程数就会抛出异常需要自行处理。主要有以下五种:
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数=最大线程数,并且没有超时策略。也就是说该线程池没有空闲状态,能最快速度响应。
- newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程数=最大线程数=1,没有超时策略。所有任务在一个线程中处理,不存在同步问题。handler为AbortPolicy。
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程=0 ,最大线程=Integer.MAX_VALUE,这就意味着没有线程数量的限制,超时时间为60毫秒。new SynchronousQueue<Runnable>
这个任务队列比较特殊,简单理解为一个无法存储的任务队列。也就是说,当有新的任务到来,如果有空闲的线程,则将任务给空闲的线程处理,否则直接创建一个新的线程来处理任务。也就是说一旦有任务来就会立刻执行,但是消耗较大,所以比较适合用来处理大量的耗时较短的任务。
- newScheduledThreadPool
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
核心线程为自定义 ,最大线程=Integer.MAX_VALUE,超时时间为10毫秒,也就是说一旦线程空闲便立即回收。适用于定时任务与周期性任务。
- newWorkStealingPool (Java8中新引入)
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
这个线程池和其它四个不太一样,是Java8中新引入的。线程数默认为主机CPU的可用核心数,且会动态的增加与减少,提交的任务执行顺序并不能得到保证。
Android中的线程池
了解了Java提供的线程池我们就来看下Android中是怎样使用线程池的。
AsyncTask是Android中异步处理的轻量级框架,其中的实现就是一个线程池。
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
在构造方法中初始化了两个变量mWorker与mFuture,这两个变量很重要
- mWorker是一个WorkerRunnable
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
WorkerRunnable是一个实现了Callable接口的类。
public interface Callable<V> {
V call() throws Exception;
}
Callable接口里面只定义了一个call方法,返回一个泛型对象。那么Callable到底有什么用呢?线程无论继承Thread类还是实现Runnable方法,执行完任务以后都无法直接返回结果。而Callable接口就弥补了这个缺陷,当call方法执行完毕以后会返回一个泛型对象。WorkerRunnable实现了Callable接口,也就是说我们可以调用mWorker.call()
来获取返回的结果
- mFuture是一个FutureTask
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
参数是一个Callable,也就是mWorker。FutureTask实现了RunnableFuture接口。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
这个接口又继承了Runable与Future<V>。
Future<V>接口主要是针对Runnable和Callable任务的。
提供了三种功能:
- 判断任务是否完成
boolean isDone()
- 能够中断任务
boolean cancel(boolean mayInterruptIfRunning);
- 能够获取任务执行的结果
V get();
也就是说FutureTask既能够被线程执行,又能提供线程执行任务后返回的结果。
看完了构造方法我们从execute()入手
public final AsyncTask<Params, Progress, Result> execute(Params... params)
{
return executeOnExecutor(sDefaultExecutor, params);
}
在执行execute方法时传入了初始化时的参数与sDefaultExecutor
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
sDefaultExecutor就是一个实现了Executor的基本类,execute中,在一个双端队列中不断的插入Runable对象。ArrayDeque的内部是使用数组形式来实现双端队列的,我们知道队列是FIFO的,只能在队头删除元素,队尾添加元素,而双端队列是在队头和队尾都能够删除和添加元素。需要注意的是ArrayDeque没有容量的限制,队列满了以后会自动进行扩充。
THREAD_POOL_EXECUTOR则是AsyncTask中的关键线程池
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
public static final Executor THREAD_POOL_EXECUTOR;
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
AsyncTask在静态代码块中初始化了一个ThreadPoolExecutor,核心线程数=[2,4/CPU核心数-1],最大线程数=CPU核心数*2+1,超时时间为30秒,队列长度为128,handler为AbortPolicy。并且设置了allowCoreThreadTimeOut(true),即核心线程超时可被回收。
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
在Asynck调用execute()时,mWorker获取到参数,sDefaultExecutor开始执行execute方法。
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
回过来看,第一次执行时,mActive=null这时候便会执行scheduleNext()
,当队列中有数据的时候便会把数据取出来放进线程池中执行。在执行THREAD_POOL_EXECUTOR.execute(mActive)
时,因为mFuture实现了Runable接口,这个时候便会调用run方法
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其中的callable就是mWorker,此时会调用mWorker的call()
方法
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
return postResult(doInBackground(mParams));
}
};
这个就是在构造方法中创建的,在call中实现了doInBackground方法,并把返回值给了postResult,postResult中的逻辑就是获取一个Message,然后发送该Message给Handler处理。
简单回顾下,AsyncTask每次将doInBackground()
中的操作添加到线程池中执行,执行完后由handler传递数据。
总结
在Android开发中对于线程池的使用可能仅仅只是一些异步的框架中有封装,我们很少真正使用,仅仅new一个Thread,对于一些频繁的请求与周期性的操作不如尝试试用下线程池。
举个例子:
我们需要每隔10秒进行某个操作,这时你可能会考虑用一个定时器,每次开一个线程去请求,这样T1与T2的时间开销是很大的,此时使用线程池便能大大优化性能。
如有错误的地方欢迎大家留言指正探讨。