概念
基于池化思想管理和使用线程的机制
优点
- 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁曹成的损耗
- 提高响应速度:无需等待线程创建可立即执行
- 提高线程的可管理性:可控的线程数量,对线程进行统一的分配、调优和监控
-提供更多更强大功能:有各种功能的线程池、提供阻塞队列、提供拒绝方案
创建类型
创建类型主要分为Executors和ThreadPoolExecutor,其中Executors是对ThreadPoolExecutor进行了特定场景的封装。
阿里的规范中强制要求按照ThreadPoolExecutor的方式显示的创建线程。
Executors
共有六种常见创建方式
- 1.Executors.newSingleThreadExecutor
创建单个线程数的线程池,保证先进先出的执行顺序
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static void singleThreadExecutor() {
// 创建线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 执行任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + ":任务被执行");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
});
}
}
- 2.Executors.newSingleThreadScheduledExecutor
创建一个单线程的可以执行延迟任务的线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
//源码可知,是核心线程数为1的线程池
public static void SingleThreadScheduledExecutor() {
// 创建线程池
ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
// 添加定时执行任务(2s 后执行)
System.out.println("添加任务,时间:" + new Date());
threadPool.schedule(() -> {
System.out.println("任务被执行,时间:" + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}, 2, TimeUnit.SECONDS);
}
- 3.Executors.newFixedThreadPool
创建固定大小线程池,超时的线程会在队列中等待
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static void fixedThreadPool() {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// 执行任务
threadPool.execute(() -> {
System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
});
}
- 4.Executors.newCachedThreadPool
创建可缓存的线程池,若线程数超过,缓存一段时间后会回收,若线程数不够,则新建线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static void cachedThreadPool() {
// 创建线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
// 执行任务
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
});
}
}
- 5.Executors.newScheduledThreadPool
创建一个可以执行延迟任务的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static void scheduledThreadPool() {
// 创建线程池
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
// 添加定时执行任务(1s 后执行)
System.out.println("添加任务,时间:" + new Date());
threadPool.schedule(() -> {
System.out.println("任务被执行,时间:" + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}, 1, TimeUnit.SECONDS);
}
- 6.Executors.newWorkStealingPool
创建一个抢占式执行的线程池
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static void workStealingPool() {
// 创建线程池
ExecutorService threadPool = Executors.newWorkStealingPool();
// 执行任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
});
}
// 确保任务执行完成
while (!threadPool.isTerminated()) {
}
}
ThreadPoolExecutor
public static void myThreadPoolExecutor() {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
// 执行任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
常见的参数解析如下:
1.int corePoolSize
核心线程数,线程池中始终存活的线程数2.int maximumPoolSize
最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后,可以直接创建线程,当前总的线程数不能超过该值3.long keepAliveTime
最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程4.TimeUnit unit
上述存活时间的单位,从天开始往下,总共有7种5.BlockingQueue<Runnable> workQueue
阻塞队列,用来存储线程池等待执行的任务,均为线程安全
常见的阻塞队列有如下几种:
(1)ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
(2)LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
(3)SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们
(4)PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
(5)DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素
(6)LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
(7)LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。6.ThreadFactory threadFactory
线程工厂,用来创建线程,默认为正常优先级、非守护线程
扩展:可加入beforeExecute()、afterExecute()和terminated()三个接口来实现对每个人物的开始和结束时间的监控或者做一些其他的功能。7.RejectedExecutionHandler handler
拒绝策略,拒绝处理任务时的策略
常见的拒绝策略有以下几种:
(1)AbortPolicy:拒绝并抛出错误
(2)CallerRunsPolicy:使用当前调用的线程来执行此任务
(3)DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务
(4)DiscardPolicy:忽略并抛弃当前任务
(5)NewThreadRunsPolicy:新建一个线程来处理任务(netty)
(6)AbortPolicyWithReport:输出日志、输出堆栈详情、抛出异常(dubbo)