为了避免系统频繁的创建和销毁线程,可以使用线程池来管理线程,以实现线程的复用。同时,线程池还可以帮助管理系统中的线程数量,防止过多的并发线程耗尽系统的资源。
Executor框架
ThreadPoolExecutor
表示一个线程池,而Executors
可以当作是一个线程池工厂类,用于方便的生产线程池,可以通过Executors
的静态方法获得几种预先设定的线程池。从上图可知,ThreadPoolExecutor
类实现了Executor
接口,通过这个接口,线程池ThreadPoolExecutor
可以接受并运行任何实现Runnable
接口的对象。
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
······
1.几种经典线程池
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
通过Executors.FixedThreadPool(n)
方法生成的固定线程数量为n的线程池,该线程池中的线程数量始终不变,当有一个新的任务提交时,如果线程中有空闲的线程,则立即执行该任务;如果没有空闲线程,则新的任务会缓存在一个任务队列中,等到线程池中有空闲线程了再执行。线程池使用的是无界队列LinkedBlockingQueue
,任务可以无限的添加进队列中,但是也有耗尽系统资源的风险。
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通过Executors.newCachedThreadPool()
方法生成的可根据实际情况调整线程数量的线程池。线程中的数量不固定,如果有空闲线程则优先使用空闲线程,如果所有线程都忙碌则会创建一个新的线程处理任务。线程组将corePoolSize
设为0,而maximumPoolSize
设为无穷大,也就是说平时线程组中没有线程,如果有任务提交而所有线程繁忙时,线程组会无限制的创建线程,同样可能会耗尽系统资源。线程组使用的SynchronousQueue
容量为0,无法存储任务,只是提交任务的中介,因此所有任务都会立刻提交成功,不会被拒绝。
- SingleThreadExecutor
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通过Executors.newSingleThreadExecutor()
方法生成线程数量永远为1的线程池。
- SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
通过Executors.newSingleThreadScheduledExecutor()
方法生成的线程数量永远为1的线程池。该线程池可以控制线程的启动时间,达到延时启动的效果。
- ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
通过Executors.newThreadScheduledExecutor(n)
方法生成的固定线程数量为n的线程池。该线程池也可以控制线程的启动时间。
2.ScheduledThreadPool的使用
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
-
schedule()
方法会在给定时间后调用一次指定方法。 -
scheduleAtFixedRate()
会周期性的调用指定方法,在initialDelay
之后执行第一次方法,在上一次方法开始执行后period
时间后再执行下一次方法。上一次任务的执行时长不会影响下一次任务开始执行的时间。 -
scheduleWithFixedDelay()
会周期性的调用指定方法,在initialDelay
之后执行第一次方法,在上一次方法结束执行后delay
时间后再执行下一次方法。上一次任务的执行时长会影响下一次任务开始执行的时间。
3.创建自定义的ThreadPoolExecutor线程池
ThreadPoolExecutor
的构造方法如下,
public ThreadPoolExecutor(
// 线程池中稳定线程数量
int corePoolSize,
// 线程池中最大线程数量
int maximumPoolSize,
// 线程超过稳定数量时,多余的线程的存活时间,
// 超时的线程将被回收
long keepAliveTime,
// 超时时间的单位
TimeUnit unit,
// 任务队列,被提交但尚未被执行的任务存放的地方
BlockingQueue<Runnable> workQueue,
// 线程工厂,用于创建线程
ThreadFactory threadFactory,
// 拒绝策略,当任务太多来不及处理时,如何拒绝任务
RejectedExecutionHandler handler)
任务队列(workQueue)
-
SynchronousQueue
直接提交的队列:SynchronousQueue
没有容量,每一个插入操作都要等待一个删除操作,也就是说SynchronousQueue
只是任务提交的中介,提交的任务不会被真实的保存,而是直接提交给线程组执行,如果没有空闲线程则创建新线程,如果进程数量达到最大值则拒绝任务。因此使用SynchronousQueue
总要配合很大的maximumPoolSize
值以防止任务被拒绝。 -
ArrayBlockingQueue
有界的任务队列:使用ArrayBlockingQueue
时,如果线程池实际线程数 n 小于稳定数量corePoolSize
,则会优先创建新的线程;如果 n 大于corePoolSize
,则将任务存入队列中等待;如果队列存满了,而 n 小于最大线程数maximumPoolSize
,就创建新线程处理任务;如果队列已满,而线程数也达到最大值,则拒绝任务。也就是说,除非系统非常繁忙,否则ArrayBlockingQueue
会使线程数稳定在corePoolSize
。 -
LinkedBlockingQueue
无界的任务队列:除非系统资源耗尽,否则任务可以一直加入队列中。当有新任务到来时,如果线程池实际线程数 n 小于稳定数量corePoolSize
,则会创建新的线程;而如果 n 大于corePoolSize
,则将任务存入队列中等待。也就是说,线程中的数量永远不会超过corePoolSize
,而线程组的拒绝策略也永远不会触发。而如果队列中的任务数量持续增长,将会耗尽系统资源。 -
PriorityBlockingQueue
优先任务队列:优先队列可以控制任务的执行顺序,按照任务的优先级执行任务。
4.线程组执行逻辑
ThreadPoolExecutor
类的核心方法execute()
的执行逻辑如下,源码的英文注解也描述了线程组是如何执行任务的:
- 如果线程组线程数少于稳定数
corePoolSize
,方法会直接尝试调用addWorker()
方法创建一个新的线程来执行任务,此时会原子性的检查runState
和workerCount
的值,如果不满足则会返回false来阻止线程组创建新线程。 - 如果线程数高于
corePoolSize
或创建线程失败,则方法会尝试将任务加入等待队列中,如果加入成功,则任务在队列中等待执行。 - 如果加入队列失败,则方法会尝试再次调用
addWorker()
方法提交线程,这次如果线程数小于maximumPoolSize
则会为任务创建线程执行。而如果这次提交失败,则方法会根据拒绝策略拒绝任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
5.拒绝策略
JDK内置四种拒绝策略:
- AbortPolicy:该策略会直接抛出异常,阻止系统正常工作。
- CallerRunsPolicy:只要线程池未关闭,该策略会直接在调用者线程中运行被拒绝的任务。
- DiscardOldestPolicy:该策略将丢弃最老的一个任务,也就是即将被执行的那一个任务,然后尝试再次提交当前任务。
- DiscardOldestPolicy:该策略会直接丢弃当前这个被拒绝的任务。
6.线程工厂(ThreadFactory)
ThreadFactory
是一个接口,用于描述如何生成线程。Executor
中的线程池构造方法都有额外接受一个ThreadFactory
参数的版本。建议使用自定义的ThreadFactory
方法来为创建的线程起个名字,这是ThreadFactory
最简单也是最有用的功能,免得在排查线程问题时面对报错信息无从下手。
7.最优的线程池线程数
线程池的大小对系统性能的影响并不大,因此大小并不需要过分精确,只需要避免极大和极小的极端情况。
线程池线程数 = CPU数量 * CPU使用率 * (1 + 等待时间/计算时间)