为什么要用线程池?
- 降低资源消耗
- 通过重复利用已创建的线程, 降低线程创建和销毁的造成的消耗
- 提高相应速度
- 任务到达时,可以不需要等待线程创建就能立即执行
- 提高线程的可管理性
- 线程池对线程进行统一的分配、调优可监控
线程池种类
- newSingleThreadExecutor
//只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newWorkStealingPool
//可以设置并行级别(数量) 并行级别决定了同一时刻最多有多少个线程在执行 默认是cpu个数
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
- newFixedThreadPool
//固定大小 可重用的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newCachedThreadPool
// 创建没有数量上线的线程池 线程不够就创建 不用的60s就销毁
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newScheduledThreadPool
// 维持一定数量的线程 线程即使空闲 也不会被回收
//可以定时或者周期执行任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
Executor框架
image
- 接口
- Executor 一个运行新任务的简单接口
- ExecutorService 扩展了Executor接口,添加了一些管理执行器和任务生命周期的方法
- ScheduledExecutorService 扩展了ExecutorService。支持Future和定时执行任务
public interface Executor {
//只有一个方法
void execute(Runnable command);
}
- ThreadPoolExecutor分析
public class ThreadPoolExecutor extends AbstractExecutorService {
ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// ctl 控制线程池的运行状态(runState) 和线程池有效线程(workCount)的数量进行控制的字段
//获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取运行状态
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取运行状态和活动线程数的值。
private static int ctlOf(int rs, int wc) { return rs | wc; }
runState 线程池的运行状态 看下图
// 能接受新提交的任务,也能处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
//关闭状态,不接收新任务,却能继续处理阻塞队列中的任务。调用shutdown()使线程进入该状态
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接收新任务,不处理队列中任务,中断正在处理任务的线程。调用shutdownNow()使线程进入该状态
private static final int STOP = 1 << COUNT_BITS;
//所有任务都终止了,workerCount=0,可以进入该状态
private static final int TIDYING = 2 << COUNT_BITS;
//在terminated() 方法执行完后进入该状态
private static final int TERMINATED = 3 << COUNT_BITS;
}
-
线程池运行状态图
image
构造函数 参数很重要
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
……
//核心线程数量
this.corePoolSize = corePoolSize;
//最大线程数量
this.maximumPoolSize = maximumPoolSize;
//等待队列
this.workQueue = workQueue;
// 线程池维护线程所允许的空闲时间
this.keepAliveTime = unit.toNanos(keepAliveTime);
//用来创建新线程
this.threadFactory = threadFactory;
//拒绝策略
this.handler = handler;
}
-
执行流程
image
或者这个图
image
调用ThreadPoolExecutor的execute提交线程,首先检查corePool
- 如果CorePool内的线程<CorePoolSize,新创建线程执行任务,即使线程池中其他线程是空闲的。
- 如果>=CorePoolSize且<maximumPoolSize,
- 且Queue不满,将线程加入workQueue
- 如果满了,创建新线程处理任务
- 如果>=maximumPoolSize
- 如果Queue不满,放入Queue
- Queue满了,执行拒绝策略。
所以判断的顺序为 corePoolSize --> workQueue --> maximumPoolSize。
- 拒绝策略
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
阻塞队列
- ArrayBlockingQueue
- 基于数组实现的阻塞队列
- 创建时必须指定容量大小
- 可以指定公平性或非公平性,默认非公平,即不保证等待时间长的最优先访问队列
- LinkedBlockingQueue
- 基于链表实现
- 如果不指定容量大小,默认是Integer.MAX_VALUE
- PriorityBlockingQueue
- 无界阻塞队列
- 按照元素优先级进行排序,优先级高的先出队
- SynchronousQueue
- 同步阻塞队列
- 队列大小为1,一个元素要放入该队列中必须有一个线程在等待获取元素
-
DelayQueue
- DelayedWorkQueue
- BlockingDeque
- 双向阻塞队列
- TransferQueue