线程池
程序启动一个新线程成本是比较高的,因为它涉及到要与操作系统进行交互
而使用线程池可以很好的提高性能,尤其是当程序中要创建大量生存期很短的线程时
更应该考虑使用线程池
线程池里的每个线程代码结束后并不会死亡
而是再次回到线程池中成为空闲状态,等待下一个对象再来使用
JDK5之前,要手动实现线程池,从JDK 5开始,Java内置支持线程池
Executor和ExecutorService
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池
而只是一个执行线程的工具
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
真正的线程池接口是ExecutorService,定义了各种方法
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ThreadPoolExecutor的构造方法
构造线程池的基本参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数,指保留的线程池大小(不超过maximumPoolSize值时,线程池中最多有corePoolSize 个线程工作)
- maximumPoolSize:指的是线程池的最大大小(线程池中最大有maximumPoolSize 个线程可运行)
- keepAliveTime :线程数大于核心时,空闲线程结束的超时时间(当一个线程不工作时,超过keepAliveTime 指定时间将停止该线程)
- unit:是一个枚举,表示 keepAliveTime 的单位(有NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS,7个可选值)
- workQueue:表示存放任务的队列(存放需要被线程池执行的线程队列)
- threadFactory:线程工厂
- handler:拒绝策略(添加任务失败后如何处理该任务)
corePoolSize、maximumPoolSize、BlockingQueue之间的关系
所有的BlockingQueue都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
- 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
- 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
- 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
corePoolSize一般不会设置的很大,会根据CPU核心数和需求场景来设置
其实线程池的配置主要在这三个参数了,他们之间是有相互关系的
看Demo:
class ThreadRunnable implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);//不加延时 可能出不来效果
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":正在执行");
}
}
创建线程池并添加任务
ExecutorService pool = new ThreadPoolExecutor(
4,
10,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(128)
);
for (int i = 0; i < 30; i++) {
pool.execute(new ThreadRunnable());
}
结果:
如果将 new LinkedBlockingDeque<>(128)的128改为5
再看结果:
错误详情:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadRunnable@7ea987ac rejected from java.util.concurrent.ThreadPoolExecutor@12a3a380[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at ThreadPoolDemo.main(ThreadPoolDemo.java:22)
那为什么会出现这样的情况呢?
首先看下线程池是如何工作的:
首先任务加入队列,创建线程,从任务队列获取任务并执行
- 如果任务队列只有一个任务的话,其实只创建一个线程就够了
- 如果任务大于数大于1,会创建多个线程,最多创建核心线程数个线程
- 如果任务数大于最大线程数,其他任务会加入到队列,等待核心线程数的任务执行完,再从队列获取任务执行,直到队列中没有任务为止,线程处于空闲状态,超过keepAliveTime,将会被销毁
- 如果任务数超过队列的限界后,将会继续创建线程数目到最大线程数,会执行拒绝策略
Demo设置的核心线程数4,最大线程数10,任务总数为20
- 在队列为128的时候,会将任务加入队列,等待核心线程的任务执行完,再去队列获取新的任务
如果没有任务了 - 在队列长度改为5的时候,任务来不及执行,且队列长度不够,最大线程数+队列长度=15 < 20
队列不够存放了,就会报错
源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
线程池排队策略
排队有三种通用策略:
-
直接提交。工作队列的默认选项是 [
SynchronousQueue
],
它将任务直接提交给线程而不保持它们。
在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,
因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。
当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
-
直接提交。工作队列的默认选项是 [
-
无界队列。使用无界队列(例如,不具有预定义容量的 [
LinkedBlockingQueue
]
将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。
这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)
当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;
例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,
当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
-
无界队列。使用无界队列(例如,不具有预定义容量的 [
-
有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 [
ArrayBlockingQueue
])
有助于防止资源耗尽,但是可能较难调整和控制。
队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、
操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。
如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。
使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,
这样也会降低吞吐量。
-
有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 [
4.PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序(ArrayBlockingQueue和LinkedBlockingQueue 都是采用FIFO原则来确定线程执行的先后顺序),
当然也可以通过构造函数来指定Comparator来对元素进行排序。
需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。