线程池架构
Java的线程池架构如下图所示:
Executor接口
该接口只提供了一个execute方法,该方法用于已提交的Runnable任务对象,该接口提供了任务提交和任务执行解耦的方法。
void execute(Runnable command);
ExecutorService接口
ExecutorService主要在Executor上做了一些扩展。
(1) 线程池关闭
// 关闭线程池,该方法执行后,拒绝接受新任务,但之前提交的任务继续执行
void shutdown();
// 立即关闭线程池,该方法执行后,停止所有正在执行及正在等待的任务,并返回等待执行的任务列表
List<Runnable> shutdownNow();
// 判断线程池是否为shutdown状态
boolean isShutdown();
// 判断线程池是否为terminated状态
boolean isTerminated();
// 在shutdown请求后,阻塞等待所有任务执行完毕
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
(2) 异步任务提交
// 提交可执行的任务,并返回Future
// 任务执行完毕后,Future#get()方法返回任务执行的结果
<T> Future<T> submit(Callable<T> task);
// 提交可执行的任务,并返回Future
// 任务执行完毕后,Future#get()方法返回任务执行的结果给result
<T> Future<T> submit(Runnable task, T result);
// 提交可执行的任务,并返回Future
// 任务执行完毕后,Future#get()方法返回null
Future<?> submit(Runnable task);
ScheduledExecutorService
// 在给定延时后,创建并执行一个Runnable任务
// 任务执行完毕后,ScheduledFuture#get()方法会返回null
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 在给定延时后,创建并执行一个Callable任务
// 任务执行完毕后,ScheduledFuture#get()方法会返回任务执行结果
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
// 在给定延时后,周期性创建并执行Runnable任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 在给定延时后,周期性创建并执行Runnable任务(上次任务结束时间和当前任务开始时间间隔相同)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。
- maximumPoolSize
线程池允许的最大线程数。
如果当前阻塞队列满了,且继续提交任务,则会创建新的非核心线程来执行任务,前提是当前线程数小于maximumPoolSize。
- keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。
- workQueue
线程池阻塞队列,当线程池中的线程数目超过corePoolSize时,任务会添加到阻塞队列。
- handler
如果当前线程数等于maximumPoolSize时,且阻塞队列也满了,若还有提交的任务,线程池需要采取一种策略处理该任务。
线程池提供4种策略:
(1) AbortPolicy: 直接抛出异常,默认策略;
(2) CallerRunsPolicy: 用调用者所在的线程执行任务;
(3) DiscardOldestPolicy: 丢弃阻塞队列中靠前的任务,并执行当前任务;
(4) DiscardPolicy: 直接丢弃任务。
常用线程池
说到线程池,首先想到的是Executors工具类,其提供了常用的几种线程池:
- 固定线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
该线程池中有固定数目(N)的工作线程,可以同时运行N个任务,当提交的任务数大于N时,后提交的任务将添加到阻塞队列,等待线程池中出现空闲的工作线程。
当线程池中有工作线程宕掉后,此时,线程池会新起一个线程添加到池中,以保证线程池中的工作线程数为N。
- 单一线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
线程池中仅有1个工作线程,当工作线程宕掉后,线程池会新起1个线程作为工作线程。
该线程池只能同时运行1个任务,其他任务只能在阻塞队列中等待。
- 缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如果线程池无可用线程,则会新建1个线程添加到线程池中。当线程池中的线程超过60秒未使用的话,会自动从线程池中移除。
可以发现,上述3种线程池均创建的是ThreadPoolExecutor。
- 调度线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
该线程池可以再指定的时间内周期性的执行所提交的任务。
线程池状态
- RUNNING
线程池处于RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
线程池一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。
- SHUTDOWN
线程池处于SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
调用线程池的shutdown()接口时,线程池由RUNNING-->SHUTDOWN。
- STOP
线程池处于STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN)-->STOP。
- TIDYING
当所有的任务已终止,ctl记录的任务数量为0,线程池会变为TIDYING状态。
当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()函数来实现。
当线程池处于SHUTDOWN或STOP状态下,阻塞队列为空,并且线程池中执行的任务也为空时,就会由(SHUTDOWN or STOP)-->TIDYING。
- TERMINATED
线程池彻底终止,就变成TERMINATED状态。
线程池处于TIDYING状态时,执行完terminated()之后,就会由TIDYING-->TERMINATED。
ThreadPoolExecutor使用一个int值c来表征线程状态和工作线程数目,前3位表征线程状态,后29位表征工作线程数目。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
将相关值转化为二进制如下:
RUNNING = 111 | 0 0000 0000 0000 0000 0000 0000 0000
SHUTDOWN = 000| 0 0000 0000 0000 0000 0000 0000 0000
STOP = 001| 0 0000 0000 0000 0000 0000 0000 0000
TIDYING = 010| 0 0000 0000 0000 0000 0000 0000 0000
TERMINATED = 011| 0 0000 0000 0000 0000 0000 0000 0000
CAPACITY = 000 | 1 1111 1111 1111 1111 1111 1111 1111
~CAPACITY = 111 | 0 0000 0000 0000 0000 0000 0000 0000
runStateOf和workerCountOf分别用于从c中解析出线程状态和工作线程数目,而ctlOf用于将线程状态值和工作线程数目值合成c。