ThreadPoolExecutor
实现了生产者/消费者模式,
任务队列:线程池自己维护
消费者:线程池的工作者线程
生产者: 任务提交者
当我们碰到类似生产者/消费者问题时,应该优先考虑直接使用线程池,
Java并发包中 线程池的实现类是ThreadPoolExecutor
,
↑它继承自AbstractExecutorService,实现了ExecutorService
,是一个任务执行器
主要好处: 节约线程创建销毁的时间
2 更快开始
3 统一管理
构造方法& 主要参数
public ThreadPoolExecutor(
//控制线程池中线程的个数
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
//任务队列
BlockingQueue<Runnable> workQueue)
//不怎么用 ThreadFactory RejectedExecutionHandler 一般用默认值
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
//对创建的线程进行一些配置
ThreadFactory threadFactory,
//拒绝策略
RejectedExecutionHandler handler)
线程池大小& 执行流程
线程池的大小主要与四个参数有关:
-
corePoolSize
有新任务到来的时候,如果当前线程个数小于corePoolSiz,就会创建一个新线程来执行该任务,需要说明的是,即使其他线程现在也是空闲的,也会创建新线程。
不过,如果线程个数大于等于corePoolSiz,那就不会立即创建新线程了,它会先尝试排队
数量建议: IO等待多的, 核心线程数可以设大点
计算密集型的:cpu+1
IO密集型的:cpu2
公式:Ncpu(1+等待时间)/总时间
maximumPoolSize
最大值,不管有多少任务,都不会创建比这个值大的线程个数。
如果队列满了或其他原因不能立即入队,会继续创建线程,直到线程数达到maximumPoolSize。-
keepAliveTime
线程个数大于corePoolSize时,
非核心线程,最长等待时间,
如果该值为0,表示所有线程都不会超时终止。
这几个参数除了可以在构造方法中进行指定外,还可以通过getter/setter方法进行查看和修改。
除了这些静态参数,ThreadPoolExecutor还可以查看关于线程和任务数的一些动态数字:
//返回当前线程个数
public int getPoolSize()
//返回线程池曾经达到过的最大线程个数
public int getLargestPoolSize()
//返回线程池自创建以来所有已完成的任务数
public long getCompletedTaskCount()
//返回所有任务数,包括所有已完成的加上所有排队待执行的
public long getTaskCount()
等待队列
只要是是阻塞队列 (实现BlockingQueue
接口的队列)都行
队列
比如:
- LinkedBlockingQueue:基于链表,可以指定最大长度,默认是无界的, 传了参数可以有界
- PriorityBlockingQueue:基于堆,无界,优先
无界:线程个数最多只能达到corePoolSize,队列永远不会满,,新的任务总会排队,参数maximumPoolSize无效
- ArrayBlockingQueue:基于数组,有界
- SynchronousQueue:没有实际存储空间
没有实际存储空间: 就是队列里一个都没得存,
当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,
否则,总是会创建新线程,直到达到maximumPoolSize。
如何创造一个线程
构造参数ThreadFactory,它是一个接口:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
这个接口根据Runnable创建一个Thread,
一般不自己弄,用默认的
默认实现是Executors类中的静态内部DefaultThreadFactory,
主要就是创建一个线程,给线程设置一个名称( pool-<线程池编号>-thread-<线程编号>),设置daemon属性为false,设置线程优先级为标准默认优先级。
如果排队满了,线程也达到最大,怎么办?任务拒绝策略
是一个限流的思想, 防止全线崩溃, 一般可以自定义加点日志,
如果关闭了还来提交任务, 也是会用拒绝策略
默认情况下,抛出异常,类型为RejectedExecutionException
。
拒绝策略是可以自定义的(构造参数RejectedExecutionHandler
),
RejectedExecutionHandler接口,这个接口的定义为:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
ThreadPoolExecutor实现了四种处理方式:
- ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常
private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛异常,也不执行
ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队
ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行
而AbortPolicy的rejectedExecution实现就是抛出异常,如下所示:
拒绝策略只有在队列有界,且maximumPoolSize有限的情况下才会触发。
如果队列无界,服务不了的任务总是会排队,请求处理队列可能会消耗非常大的内存,甚至引发内存不够的异常。
如果队列有界但maximumPoolSize无限,可能会创建过多的线程,占满CPU和内存,使得任何任务都难以完成。
所以,在任务量非常大的场景中,让拒绝策略有机会执行是保证系统稳定运行很重要的方面。
注意: 依赖关系任务提交同一个线程,可能死锁
如下,任务A被提交到线程池,任务A里面提交了任务B到同个线程池,要等B完成才往下走,但是任务B可能在线程池里面排队,等A任务完成
//里面是有界队列
static ExecutorService executor = Executors.newFixedThreadPool(5);
static class TaskA implements Runnable {
@Override
public void run() {
//其他事
//Thread.sleep(100);
Future<?> future = executor.submit(new TaskB());
try {
future.get();//要任务B完成才能完成 可能任务B在排队等A完成
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("finished task A");
}
}
线程池终止
都是会改变线程池的状态,:
-
shutdown
优雅关闭, 一般是用这个, 只是不让提交新的, 已经提交的执行完 shutdownNow
后者, 连队列里面都不做了, 正进行的线程也尽力阻止,
尽力是说interrupt
, 至于任务是不是会响应就不一定了, 也可能不响应, 还是执行完
unable to create new native thread
线程总数超过操作系统限制了
- 是不是真的要那么多线程?线程太多了, 小一点
- 加机器