线程池和队列相关概念 部分概念参考链接
1:核心线程:简单来讲就是线程池中能否允许同时并发运行的线程的数量。
2:线程池大小:线程池中最多能够容纳的线程的数量。
3:队列:对提交过来的任务的处理模式,可用于存放当前无法处理的任务。
三种队列:
直接提交:工作队列的默认选项是 SynchronousQueue,它将任务直接提 交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有 内部依赖性的请求集合时出现锁定。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数 连续到达时,此策略允许无界线程具有增长的可能性。
无界队列:使用无界队列(例如,不具有预定义容量 的 LinkedBlockingQueue)将导致此时超过corePoolSize的新任务全部加入队列等待。这样,创建的线程就不会超 过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合 于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有 增长的可能性。这种方法相当于控制了并发的线程数量。
有界队列:当使用有限的 maximumPoolSizes 时,此时超过核心线程后的任务先加入队列等待,超出队列范围后的任务就生成线程,但创建的线程最多不超过线程池的最大允许值。有界队列 (如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型 池可以最大限度地降低CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边 界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降 低吞吐量。
队列和新任务的处理方式
如果队列发过来的任务,发现线程池中正在运行的线程的数量
threadPool.getPoolSize()
小于核心线程corePoolSize(即myQueue.size())
,则立即创建新的线程,无需进入队列等待。-
如果正在运行的线程等于或者大于核心线程,则必须参考提交的任务能否加入队列中去,分为:
-
提交的任务能加入队列中
- 如果提交的任务能加入队列,考虑下队列的值是否有设定,如果没有设定,那么也就是不能创建新的线程,只能在队列中等待,因为理论上队列里面可以容纳无穷大的任务等待。换句话说,此时的线程池中的核心线程数就是池中能否允许的最大线程数。那么池的最大线程数就没有任何意义了。
- 如果提交的任务能加入队列,队列的值是有限定的,那么首先任务进入队列中去等待,一旦队列中满了,则新增加的任务就进入线程池中创建新的线程。一旦线程池中的最大线程数超过了,那么就会拒绝后面的任务。
-
如果提交的任务不能加入队列
- 提交的任务不能加入队列,此时就会创建新的线程加入线程池中,一旦超过线程池中最大的数量,则任务被拒绝。
-
个人理解:初始化完成之后(线程池中的线程此时处于空闲状态),向线程池添加任务。
- 如果正在运行的任务数量小于核心线程数,则立即创建新线程,无需进入队列
- 如果正在运行的任务数量大于等于核心线程数,则判断能否添加任务到队列
- 1)能添加到队列(条件:队列设置容量
capacity
且队列未满或者未设置容量)- 任务添加到队列等待
- 2)不能添加到队列(条件:队列满了)
- 则线程池创建新线程执行任务。一旦超过线程池中最大的数量,则任务被拒绝
- 1)能添加到队列(条件:队列设置容量
- 注: 线程池拒绝处理新任务的情况:
1、当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务
2、当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
简单模拟多线程发邮件
public class SendMail {
private static final int corePoolSize = 2; // 线程池中所保存的核心线程数。
private static final int maximumPoolSize = 4; //线程池允许创建的最大线程数。
private static final long keepAliveTime = 0; //当前线程池线程总数大于核心线程数时,终止多余的空闲线程的时间。
//邮件队列(线程共享)
private static BlockingQueue<Runnable> emailQueue = new LinkedBlockingDeque<>(6);
//实例化线程池(线程共享)
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.SECONDS, emailQueue, new ThreadPoolExecutor.AbortPolicy());
//发送邮件方法,每次新建一个任务
public static void send(String content, int time) {
System.out.println("--当前线程池大小【" + threadPool.getPoolSize() + "】,当前队列大小【" + emailQueue.size() + "】");
threadPool.execute(new SendThread(content, time));//加入队列
System.out.println("--当前线程池大小【" + threadPool.getPoolSize() + "】,当前队列大小【" + emailQueue.size() + "】");
System.out.println("\n");
}
public static void main(String[] args) {
for (int i = 0; i < 15; i++) {
send(i + "", i + 1);
}
threadPool.shutdown();
}
}
//定义发邮件线程
public class SendThread implements Runnable {
private String content;
private int seconds;
public SendThread(String content, int seconds) {
this.content = content;
this.seconds = seconds;
}
@Override
public void run() {
try {
//模拟发送邮件。。。。每条5秒钟
Thread.sleep(5 * 1000);
System.out.println("sendMail:" + content);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}