一、线程池类图
特别鸣谢programmer_at的图
二、线程池ThreadPoolExecutor的基本介绍
(1)成员变量
// 核心线程数,线程池在阻塞获取任务时可以保持永久存活的线程的最大值。
// 当线程池中线程数大于corePoolSize 时,之后创建的线程在getTask()时调用workQueue.poll(timeOut),等待timeOut时间没有新任务则返回null。退出线程。
// 当线程池中线程数小于corePollSize且allowCoreThreadTimeOut设置核心线程不允许退出,那么getTask()时调用workQueue.take()方法,会阻塞线程获取任务。
// 具体workQueue #poll() #take()方法源码解析,详见下文。
private volatile int corePoolSize;
// 线程池中允许的最大的线程数,这里使用volatile修饰,保证多线程下的可见性
private volatile int maximumPoolSize;
// Woker从workQueue获取任务的最大等待时间,超过这个时间后,worker会被回收掉(run方法执行完毕,线程不可复生)
private volatile long keepAliveTime;
// 提交的任务的阻塞队列,通过不同的策略实现不同的线程池机制。
// LinkBlockingQueue 为例:
// #poll(timeOut)会等待timeout的时间,如果超时会返回null
// #take()会一直等待,直到返回数据。实现原理是通过ReenTranLock重入锁的await() / signal()实现。
private final BlockingQueue<Runnable> workQueue;
// 线程池中最大的pool size,只会增加不会减少,其是一个统计信息
private int largestPoolSize;
// 内部运行的Worker存放的地方,通过ReentranLock保证线程安全。
// 存入到这个数组中的Worker才是真正的线程,里边run()方法会不断调用getTake(),从workQueue队列里拿出新的runnable执行,
// workQueue就是存储的所有任务队列。Worker是线程,workers是线程set
private final HashSet<Worker> workers = new HashSet<Worker>();
// 内部的一个独占锁,主要保证线程池的一些统计信息(最大的线程数、完成的任务数)和worker添加到集合的安全性
private final ReentrantLock mainLock = new ReentrantLock();
// 线程安全类型,最高位为符号位,次高3位为状态值,低28位为当前的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 是否允许核心线程从阻塞队列获取任务时销毁。默认为false
private volatile boolean allowCoreThreadTimeOut;
// 内部为worker提供任务执行的线程的生成工厂。我们通过自定义的工厂来使得业务日志更为清晰或者执行不同的业务逻辑
private volatile ThreadFactory threadFactory;
// 拒绝策略,默认拒绝策略为抛出异常。线程池的拒绝策略是策略模式在JDK中的一个应用点。可以自定义拒绝策略,在生产者的速度远远大于消费者时将超出的任务持久化到外部存储。
private volatile RejectedExecutionHandler handler;
(2)参数说明
public ThreadPoolExecutor(
//核心线程数
int corePoolSize,
//最大线程数
int maximumPoolSize,
//线程存活时间,如果allowCoreThreadTimeOut不设置为true,那么这个参数只作用于非核心线程。
long keepAliveTime,
//线程存活时间单位
TimeUnit unit,
//阻塞工作队列(任务缓存队列及排队策略)
BlockingQueue<Runnable> workQueue,
//线程创建工厂,若需要自定义线程某些参数时使用。
ThreadFactory threadFactory,
//拒绝处理器,当线程超过最大线程数,并且所有线程均处在处理runnable状态。那么就由拒绝处理器回调
RejectedExecutionHandler handler
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
说明:
- 1、workQueue的优先级 > maximumPoolSize。
即workQueue队列如果无限大,那么线程最大不会超过corePoolSize。因为execute()时,
先判断workQueue.offer(command)。只有workQUeue队列装不下的时候才会创建超过corePool的线程
- 2、keepAliveTime
保活时间只对非核心线程有效,或者对设置过allowsCoreThreadTimeOut()的线程池中的核心线程有效。
(3)state状态说明
这里使用一个AtomicInteger来记录state和count两种数据。状态和数量其实是需要原子性操作的,毕竟线程池状态和数量是有必然关联的。
这里如果使用两个AtomicInteger参数分别记录state和count,仍然无法做到线程安全,还需要lock锁来帮助。但是这里巧妙的使用一个AtomicInteger的高3位、低29位拆分,即保证了线程安全还省去了lock的麻烦。
private static final int COUNT_BITS = Integer.SIZE - 3;// 32 - 3 = 29
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; -1左移29位:10000000 00000000 00000000 00000001 => 10100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; 0左移29:00000000 00000000 00000000 00000000 => 00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
1、复习单位
这里我们以RUNNING 、TERMINATED为例,计算一下什么意思
1、字节:byte, 位:bit
2、1字节 = 8位
3、int = 4个字节 = 4 * 8 = 32位
int作为有符号使用。第一位的位置1、0分别代表负数、正数。
那么就是说 int的取值在 [- 2^31 , 2^31 - 1]
取值为啥正数需要-1,因为第一位是符号位。所以正数的时候进1上不去。
例如两位时最大就是 01 = 2^1 - 1 = 1
2、计算RUNNING、TERMINATED、CAPACITY
(1)RUNNING
-1等于
10000000 00000000 00000000 00000001
左移29位
=> 10100000 00000000 00000000 00000000
(2)TERMINATED
00000000 00000000 00000000 00000011
=》
01100000 00000000 00000000 00000000
(3)CAPACITY
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//00011111 11111111 11111111 11111111
运算过程:
1 = 00000000 00000000 00000000 00000001
左移29位=》
00100000 00000000 00000000 00000000
- 1 减1 =》
00011111 11111111 11111111 11111111
3、使用状态时通过runStateOf()方法获取高三位的数据。
runStateOf(int c) {
return c & ~CAPACITY;
}
1、~ 取反运算:相当于变成了 11100000 00000000 00000000 00000000
2、& 与运算:0 & 1 = 0;1 & 1 = 1
所以就相当于只取前三位数据。因为后边都是0,与运算完全是0,所以有效位就是前三位了。只是比较大的int值而已。
4、使用个数时通过workerCountOf(int c)获取低29位的数据。
private static int workerCountOf(int c) {
return c & CAPACITY;
}
直接和00011111 11111111 11111111 11111111与运算
(4)主要方法说明
- ThreadPoolExecutor 构造方法
- execute(Rannable) 执行任务方法
- shutdown() 关闭线程池,设置SHUTDOWN:会等待task执行完成。
- shutdownNow() 关闭线程池,设置STOP:直接关闭线程池,返回没有执行的task
- awaitTermination(long timeout, TimeUnit unit) 阻塞获取线程池状态。最长等待timeout时间获取线程池是否已经停止了。超时返回false
- isShutdown() 线程状态是否在SHUTDOWN之后。
- isTerminating() 线程是否出在 SHUTDOWN 和 TERMINATED 之间。
- isTerminated() 线程是否已经处在TERMINATED状态了。
- allowCoreThreadTimeOut() 设置是否允许核心线程退出。
三、线程池的使用示例
public static void main(String[] args) {
//核心线程数
int corePoolSize = 5;
//最大线程数
int maximumPoolSize = 10;
//线程存活时间
long keepAliveTime = 60;
//存活时间单位
TimeUnit unit = TimeUnit.SECONDS;
//任务阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>(5);
//线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝处理器
RejectedExecutionHandler handler = new myRejectedExecutionHanlder();
//创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
//设置核心线程是否可以退出。默认是false。
//如果设置true,那么keepAliveTime存活时间将在核心线程上同样生效。
// executor.allowCoreThreadTimeOut(true);
for (int i = 0; i < 10; i++) {
executor.execute(new MyTask(i));
System.out.println("线程池中线程数:" + executor.getPoolSize()
+ " ,队列中等待执行的任务数:" + executor.getQueue().size()
+ " ,已执行完成的任务数:" + executor.getCompletedTaskCount()
+ " ,待执行和执行中的任务数:" + executor.getTaskCount()
);
}
}
static class MyTask implements Runnable {
private int taskId;
public MyTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("正在执行taskId " + taskId);
try {
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行完毕taskId " + taskId);
}
}
/**
* 自定义任务拒绝处理器
* 这里只做演示,和系统AbortPolicy实现一致。
*/
static class myRejectedExecutionHanlder implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
executor.toString());
}
}
①执行结果
正在执行taskId 0
线程池中线程数:1 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:1
线程池中线程数:2 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:2
正在执行taskId 1
线程池中线程数:3 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:3
正在执行taskId 2
线程池中线程数:4 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:4
正在执行taskId 3
线程池中线程数:5 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:5
正在执行taskId 4
线程池中线程数:5 ,队列中等待执行的任务数:1 ,已执行完成的任务数:0 ,待执行和执行中的任务数:6
线程池中线程数:5 ,队列中等待执行的任务数:2 ,已执行完成的任务数:0 ,待执行和执行中的任务数:7
线程池中线程数:5 ,队列中等待执行的任务数:3 ,已执行完成的任务数:0 ,待执行和执行中的任务数:8
线程池中线程数:5 ,队列中等待执行的任务数:4 ,已执行完成的任务数:0 ,待执行和执行中的任务数:9
线程池中线程数:5 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:10
执行完毕taskId 0
执行完毕taskId 1
正在执行taskId 5
正在执行taskId 6
执行完毕taskId 3
正在执行taskId 7
执行完毕taskId 4
执行完毕taskId 2
正在执行taskId 8
正在执行taskId 9
执行完毕taskId 6
执行完毕taskId 5
执行完毕taskId 7
执行完毕taskId 9
执行完毕taskId 8
②执行结果
将for循环改成15次执行结果如下:
正在执行taskId 0
线程池中线程数:1 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:1
线程池中线程数:2 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:2
正在执行taskId 1
线程池中线程数:3 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:3
正在执行taskId 2
线程池中线程数:4 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:4
正在执行taskId 3
线程池中线程数:5 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:5
正在执行taskId 4
线程池中线程数:5 ,队列中等待执行的任务数:1 ,已执行完成的任务数:0 ,待执行和执行中的任务数:6
线程池中线程数:5 ,队列中等待执行的任务数:2 ,已执行完成的任务数:0 ,待执行和执行中的任务数:7
线程池中线程数:5 ,队列中等待执行的任务数:3 ,已执行完成的任务数:0 ,待执行和执行中的任务数:8
线程池中线程数:5 ,队列中等待执行的任务数:4 ,已执行完成的任务数:0 ,待执行和执行中的任务数:9
线程池中线程数:5 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:10
线程池中线程数:6 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:11
正在执行taskId 10
线程池中线程数:7 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:12
正在执行taskId 11
线程池中线程数:8 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:13
正在执行taskId 12
线程池中线程数:9 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:14
正在执行taskId 13
线程池中线程数:10 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:15
正在执行taskId 14
执行完毕taskId 2
执行完毕taskId 13
执行完毕taskId 11
执行完毕taskId 12
正在执行taskId 7
执行完毕taskId 14
执行完毕taskId 10
执行完毕taskId 1
执行完毕taskId 0
执行完毕taskId 4
执行完毕taskId 3
正在执行taskId 9
正在执行taskId 8
正在执行taskId 6
正在执行taskId 5
执行完毕taskId 6
执行完毕taskId 7
执行完毕taskId 5
执行完毕taskId 8
执行完毕taskId 9
③执行结果
将for循环设置20次,执行结果如下:
正在执行taskId 0
线程池中线程数:1 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:1
线程池中线程数:2 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:2
正在执行taskId 1
线程池中线程数:3 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:3
正在执行taskId 2
线程池中线程数:4 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:4
正在执行taskId 3
线程池中线程数:5 ,队列中等待执行的任务数:0 ,已执行完成的任务数:0 ,待执行和执行中的任务数:5
正在执行taskId 4
线程池中线程数:5 ,队列中等待执行的任务数:1 ,已执行完成的任务数:0 ,待执行和执行中的任务数:6
线程池中线程数:5 ,队列中等待执行的任务数:2 ,已执行完成的任务数:0 ,待执行和执行中的任务数:7
线程池中线程数:5 ,队列中等待执行的任务数:3 ,已执行完成的任务数:0 ,待执行和执行中的任务数:8
线程池中线程数:5 ,队列中等待执行的任务数:4 ,已执行完成的任务数:0 ,待执行和执行中的任务数:9
线程池中线程数:5 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:10
线程池中线程数:6 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:11
正在执行taskId 10
线程池中线程数:7 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:12
正在执行taskId 11
线程池中线程数:8 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:13
正在执行taskId 12
线程池中线程数:9 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:14
正在执行taskId 13
线程池中线程数:10 ,队列中等待执行的任务数:5 ,已执行完成的任务数:0 ,待执行和执行中的任务数:15
正在执行taskId 14
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.crm.web.test.PwdTest$MyTask@45fe3ee3 rejected from java.util.concurrent.ThreadPoolExecutor@4cdf35a9[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
at com.crm.web.test.PwdTest$myRejectedExecutionHanlder.rejectedExecution(PwdTest.java:83)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.crm.web.test.PwdTest.main(PwdTest.java:44)
执行完毕taskId 10
执行完毕taskId 1
执行完毕taskId 0
执行完毕taskId 11
执行完毕taskId 4
执行完毕taskId 3
执行完毕taskId 2
正在执行taskId 9
正在执行taskId 8
正在执行taskId 7
正在执行taskId 6
正在执行taskId 5
执行完毕taskId 13
执行完毕taskId 14
执行完毕taskId 12
执行完毕taskId 7
执行完毕taskId 8
执行完毕taskId 6
执行完毕taskId 9
执行完毕taskId 5
分析:
1、任务缓存队列workQueue和maximumPoolSize的关系
当执行任务超过核心线程时,之后加入的任务会优先进入workQueue队列中。如果队列无限长,那么maximumPoolSize将无意义。因为线程池的队列被撑爆oom,线程数都不会大于核心线程数。
2、核心线程创建完成后,若没有设置allowCoreThreadTimeOut,那么核心线程永远不会退出。除非主动调用shutdown()等方法。
3、如果正在执行和待执行的任务数 > maxinumPoolSize + workQueue.size() 那么就会走到拒绝Handler中。
那么我们的分析到底对不对呢?接下来的源码分析,将验证我们的分析结果。