1.如何创建一个线程池?
//核心线程池大小
int corePoolSize = 10;
//线程池最大容量
int maximunPoolSize = 20;
//当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
long keepAliveTime = 1;
//时间单位
TimeUnit timeUnit = TimeUnit.MINUTES;
//工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
//线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝策略
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximunPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
rejectHandler
);
2.线程池运行机制
a.new线程池时,线程池工作队列中已经被添加的Runnable是否会立即被执行?
不会,除非new之后调用prestartAllCoreThreads启动所有核心线程
//核心线程池大小
int corePoolSize = 10;
//线程池最大容量
int maximunPoolSize = 20;
//当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
long keepAliveTime = 1;
//时间单位
TimeUnit timeUnit = TimeUnit.MINUTES;
//工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
//线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝策略
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
//往工作队列添加任务
for (int i = 0; i < 10; i++) {
workQueue.put(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximunPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
rejectHandler
);
//不调用不会执行
//executor.prestartAllCoreThreads();
b.当添加的任务个数X小于核心线程数Y时,线程池会启动几个线程?
小于等于X,具体多少取决于每个任务执行的时间,我们取两种极端的情况来说明
第一
假如for循环添加了5个任务,核心线程数为10,这5个任务执行的时间相对都比较长,假设每个任务都需要6s执行完成,那么最终一定有5个线程被创建出来在运行,可以检验一下,为什么?任务数小于核心线程数的时候,在没有空闲线程的情况下会创建新的线程来执行任务
//核心线程池大小
int corePoolSize = 10;
//线程池最大容量
int maximunPoolSize = 20;
//当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
long keepAliveTime = 1;
//时间单位
TimeUnit timeUnit = TimeUnit.MINUTES;
//工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
//线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝策略
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximunPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
rejectHandler
);
for (int i = 0; i < 5; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
第二
既然举得是极端的例子,那就可能不切实际,同样假如for循环添加了5个任务,核心线程数为10,我们假线程从创建执行到完成的时间无限接近于0,那么在主线程创建这5个线程的时候,首先创建第一个,然后在即将创建第2个线程的时候,发现第一个线程已经执行完成,线程池中有一个空闲线程,那么就不会再创建新的线程,而直接使用这个线程,这样以来,整个过程中只有一个活跃线程,虽然这不太可能发生,但也能说明一下问题,既创建的线程个数是和任务有关系的
c.当添加的任务个数X大于核心线程数Y,但是小于核心线程数和工作队列Z的和时(Y < X < Y+Z),线程池会启动几个线程?
最多启动Y个核心线程,有可能小于Y,同上取决于任务执行的时间。如下,核心线程数为10,工作队列大小为10,开启了15个任务,无法被立即执行的任务会放入队列等待
//核心线程池大小
int corePoolSize = 10;
//线程池最大容量
int maximunPoolSize = 20;
//当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
long keepAliveTime = 1;
//时间单位
TimeUnit timeUnit = TimeUnit.MINUTES;
//工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
//线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝策略
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximunPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
rejectHandler
);
for (int i = 0; i <15 ; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
}
});
}
打印结果:
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:8
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:9
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:5
线程池中活跃线程个数为:5
线程池中活跃线程个数为:3
线程池中活跃线程个数为:3
线程池中活跃线程个数为:2
d.当添加的任务个数X大于核心线程数Y+ 工作队列长度Z的和(X>Y+Z)时,启动的线程个数:
(a). X(任务数)- Z(工作队列长度) <= M(线程池最大容量),此时会启动 X(任务数)- Z(工作队列长度) 个线程,如下,核心线程数10,队列长度20,线程池最大容量30,启动50个任务
//核心线程池大小
int corePoolSize = 10;
//线程池最大容量
int maximunPoolSize = 30;
//当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
long keepAliveTime = 30;
//时间单位
TimeUnit timeUnit = TimeUnit.SECONDS;
//工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
//线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝策略
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximunPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
rejectHandler
);
for (int i = 0; i <50 ; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
}
});
}
打印结果:
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:29
。。。
(b). X(任务数)- Z(工作队列长度) > M(线程池最大容量),此时会启动 M(线程池最大容量)个线程,超出的线程执行拒绝策略,如下,核心线程数10,队列长度20,线程池最大容量30,启动51个任务
//核心线程池大小
int corePoolSize = 10;
//线程池最大容量
int maximunPoolSize = 30;
//当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
long keepAliveTime = 30;
//时间单位
TimeUnit timeUnit = TimeUnit.SECONDS;
//工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
//线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝策略
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximunPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
rejectHandler
);
for (int i = 0; i <51 ; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
}
});
}
打印结果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@36baf30c rejected from java.util.concurrent.ThreadPoolExecutor@7a81197d[Running, pool size = 30, active threads = 30, queued tasks = 20, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at TestThreadPool.main(TestThreadPool.java:30)
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:29
线程池中活跃线程个数为:27
线程池中活跃线程个数为:28
线程池中活跃线程个数为:28
线程池中活跃线程个数为:28
线程池中活跃线程个数为:27
。。。
3.线程池拒绝策略
我们代码中使用的拒绝策略是
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
其实系统给我们提供了几种,都是通过继承RejectedExecutionHandler类实现的,当然也可以自定义,都是比较简单的,我们来看看系统提供的这几种,核心就在rejectedExecution方法
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
AbortPolicy:
直接抛出异常,组织系统正常工作
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
CallerRunsPolicy:
只要线程池没有shutDown,就在调用者线程中执行当前被丢弃的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
DiscardPolicy:
直接丢弃任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
DiscardOldestPolicy:
丢弃最老的一个任务(任务队列中第一个),再尝试提交任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
要自定义只需要继承RejectedExecutionHandler,重写方法rejectedExecution,在里边实现自己的策略即可
Executors框架
1.Executor、Executors、ExecutorService,ThreadPoolExecutor的区别?
Executor是一个接口,而且是线程池的顶层接口,继承关系为ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口,ExecutorService接口又实现了Executor接口,Executor接口只提供了一个方法execute用于任务的执行,ExecutorService接口在Executor基础上进行扩展,提供了线程任务生命周期管理的方法
,AbstractExecutorService则是ExecutorService接口的一个抽象类,ThreadPoolExecutor则是AbstractExecutorService抽象类的一个具体子类
我们这样要说的是Executors,注意他和Executor 的区别,Executors是一个线程池框架,你可以把它看作一个工具类,因为它内部是通过封装了ThreadPoolExecutor来提供一些不同功能的线程池,主要的区别就是ThreadPoolExecutor的那些参数,核心线程数啊,最大线程数啊这些不同而已
2.Executors提供的几种线程池
a.newCachedThreadPool()
这个线程池没有核心线程,最大线程数为Integer.MAX_VALUE,表明在任务足够多的情况下,他可以创建无数的线程来执行。这一特点决定了它适用于一些数量多但是执行时间较短的任务,任务执行完成之后,新的任务可以重用空闲的线程,可以保证每一个任务都立即有线程来执行。如果执行耗时长的任务会导致极多的线程被创建出来,对内存的威胁会很大。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
b.newFixedThreadPool()
这是一个固定线程数量的线程池,他的核心线程数就等于最大线程数,所以无论创建多少任务,都最多只会有这么多个线程运行,多余的任务进入队列排队,队列装不下的执行拒绝策略
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
c.newScheduledThreadPool()
能延迟执行,定时执行的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
d.newWorkStealingPool()
工作窃取,使用多个队列来减少竞争
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
e.newSingleThreadExecutor()
只有一个线程的线程池,无论提交多少任务,都是一个一个排队执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
f.newSingleThreadScheduledExecutor()
单线程能延迟执行,定时执行的线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
线程池使用注意事项
1.避免使用Executors框架创建线程池
使用Executors创建的线程池需要对Executors框架创建的各个线程池特点和使用场景有一个深刻的认识,不然很容易发生OOM,比如newFixedThreadPool
和newSingleThreadExecutor
允许的请求队列长度为Integer.MAX_VALUE,可能导致请求队列中堆积大量等待的任务,导致内存耗尽
如下例子,创建一个线程数为2 的固定大小线程池,然后不断的往里边添加任务,因为队列可以存放Integer.MAX_VALUE个任务,所以内存剧增,导致发生了oom(为了oom出现的更明显,我们配置一下VM Options)
在VM Options选项中添加这些配置
-Xms60m (设置程序初始化的时候内存栈的大小为60M)
-Xmx60m (设置你的应用程序(不是JVM)能够使用的最大内存数60M)
-XX:+HeapDumpOnOutOfMemoryError (发生OOM时将栈信息dump到HeapDumpPath指定的路径中)
-XX:HeapDumpPath=/Users/renzm/Desktop/a/b
添加完配置之后记得点一下save configuration选项
发生OOM时就可以去HeapDumpPath指定的路径下找到异常文件java_pid25049.hprof了。这个文件怎么查看呢,可以在IBM官网下载一个HeapAnalyzer软件,他是一个jar包,通过命令行启动,java -jar .\ha456.jar
,ha456是jar包名称,启动后打开java_pid25049.hprof文件,查看chart项,大致会是这个样子
从图中可以看出,有99%以上的内存都是被线程池的工作队列占用了,可以定位到oom发生的位置
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
while(true){
fixedThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
打印:
pool-1-thread-1
pool-1-thread-2java.lang.OutOfMemoryError: GC overhead limit exceeded
Dumping heap to /Users/renzm/Desktop/a/b/java_pid25049.hprof ...
pool-1-thread-1
Heap dump file created [100583477 bytes in 0.873 secs]
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:87)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:111)
at TestExecutors.main(TestExecutors.java:11)
pool-1-thread-2
newCachedThreadPool
和 newScheduledThreadPool
允许创建的线程数量为Integer.MAX_VALUE,可能导致大量线程被创建,导致OOM
2.创建的线程池核心线程数量不要过大
核心线程数量过大会导致CPU时间片的频繁切换
为什么要用线程池?
a.降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
b.提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
c.提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险**
Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。
线程池的工作原理:线程池可以减少创建和销毁线程的次数,从而减少系统资源的消耗,当一个任务提交到线程池时
a. 首先判断核心线程池中的线程是否已经满了,如果没满,则创建一个核心线程执行任务,否则进入下一步
b. 判断工作队列是否已满,没有满则加入工作队列,否则执行下一步
c. 判断线程数是否达到了最大值,如果不是,则创建非核心线程执行任务,否则执行饱和策略,默认抛出异常
线程池的种类
1.FixedThreadPool:可重用固定线程数的线程池,只有核心线程,没有非核心线程,核心线程不会被回收,有任务时,有空闲的核心线程就用核心线程执行,没有则加入队列排队
2.SingleThreadExecutor:单线程线程池,只有一个核心线程,没有非核心线程,当任务到达时,如果没有运行线程,则创建一个线程执行,如果正在运行则加入队列等待,可以保证所有任务在一个线程中按照顺序执行,和FixedThreadPool的区别只有数量
3.CachedThreadPool:按需创建的线程池,没有核心线程,非核心线程有Integer.MAX_VALUE个,每次提交
任务如果有空闲线程则由空闲线程执行,没有空闲线程则创建新的线程执行,适用于大量的需要立即处理的并且耗时较短的任务
4.ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor,用于延时执行任务或定期执行任务,核心线程数固定,线程总数为Integer.MAX_VALUE