线程池
线程是宝贵的内存资源、单个线程约占1MB空间,过多分配易造成内存溢出
频繁的创建及销毁线程会增加虚拟机回收频率、资源开销,造成程序性能下降
线程池是线程的容器。可设定线程分配的数量上限,将预先创建的线程对象存入池中,并重用线程池中的线程对象,避免线程频繁的创建和销毁。
一、JDK提供创建线程池的四种快捷方式
//创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
//适用于需要保证顺序执行各个任务。
ExecutorService pool01 = Executors.newSingleThreadExecutor();
//创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。因为采用无界的阻塞队列,所以实际线程数量永远不会变化。
//适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重)
ExecutorService pool02 = Executors.newFixedThreadPool(4);
//创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
//适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换)
ExecutorService pool03 = Executors.newCachedThreadPool();
//创建一个定长线程池,支持定时及周期性任务执行。
//适用于执行延时或者周期性任务。
ScheduledExecutorService pool04 = Executors.newScheduledThreadPool(4);
二、工作原理
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
int corePoolSize:核心线程数
核心线程会一直存活,即使没有任务需要执行。
当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理。
设置 allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。
可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程池中的基本线程。
int maxPoolSize:最大线程数
线程池所允许的最大线程个数
maxPoolSize>当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务。
当线程数=maxPoolSize,且任务队列已满时,线程池会根据handle策略处理,默认是AbortPolicy 丢弃任务,抛运行时异常。
long keepAliveTime:非核心线程空闲保持时间
当线程空闲时间达到 keepAliveTime 时,线程会退出,直到线程数量= corePoolSize。
如果 allowCoreThreadTimeout = true,则会直到线程数量=0。
TimeUnit unit:时间单位
TimeUnit是一个枚举类型 ,包括以下属性:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小时 DAYS : 天
BlockingQueue workQueue:任务队列容量(阻塞队列)
当核心线程数达到最大时,新任务会放在队列中排队等待执行。
-
常用的几个阻塞队列:
-
LinkedBlockingQueue
链式阻塞队列,底层数据结构是链表,默认大小是
Integer.MAX_VALUE
,也可以指定大小。 -
ArrayBlockingQueue
数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
-
SynchronousQueue
同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。
-
DelayQueue
延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
-
ThreadFactory threadFactory:线程工厂
用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
RejectedExecutionHandler handler:任务拒绝处理器
-
两种情况会拒绝处理任务:
- 当线程数已经达到 maxPoolSize,切队列已满,会拒绝新任务。
- 当线程池被调用 shutdown() 后,会等待线程池里的任务执行完毕,再 shutdown。如果在调用shutdown() 和线程池真正 shutdown 之间提交任务,会拒绝新任务。线程池会调用rejectedExecutionHandler 来处理这个任务。如果没有设置默认是 AbortPolicy,会抛出异常。
-
ThreadPoolExecutor 类有几个内部实现类来处理这类情况-handle饱和策略:
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
三、线程池使用不当的危害
不知道大家在使用线程池的时候,是否存在以下的疑惑,这是我看到的某些同事的代码,他们是这样使用线程池的:
1、 线程池局部变量,new出来使用,没有手动shutdown
2、 线程池局部变量,new出来使用,并且最后手动shutdown
3、 线程池定义为static类型,进行类复用
大家先想想到底哪种方式是正确的,以及错误的方式可能会带来什么问题
方式一:线程池局部使用,没有shutdown
首先,我们明确:局部变量new出来的线程池,执行这段代码的程序的每一个线程都会去创建一个局部的线程池。暂且不说每一个线程都去创建线程池是出于什么神奇的目的,首当其冲的线程池的复用的性质就被打破了。创建出来的线程池都得不到复用,那么还有什么必要花费大精力创建线程池?
所以线程池局部使用本身就是不推荐的使用方式!
其次,我们再来想想,局部使用线程池,同时设置核心线程不为0,且设置allowCoreThreadTimeOut=false(空闲后不回收核心线程池)会导致什么问题?(想都不用想,核心线程池得不到回收,自然会导致OOM)
以下是问题代码:
public static void main(String[] args) {
while (true) {
try {
//newFixedThreadPool不会回收核心线程 可能导致OOM
ExecutorService service = Executors.newFixedThreadPool(1);
service.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000); 模拟处理业务
} catch (InterruptedException e) {
}
}
});
service = null;
} catch (Exception e) {
}
try {
Thread.sleep(2000);
System.gc();
} catch (InterruptedException e) {
}
}
}
那么,是否我们设置核心线程可以回收不就好咯?同样会出现问题。系统可能会根据你设置的线程过期时间,呈现有规律的内存占用上升,然后下降,然后又上升,然后又下降的趋势。你说说这是好的内存运行情况?
方式二:线程池局部使用,使用完后手动shutdown线程池
okok,这种方式OOM的风险降低了,但是又是局部使用局部使用,你干嘛要局部使用线程池呢?这样不就使得每一个线程都会new一个线程池,导致线程池不会复用,这和你不用线程池有什么区别呢?系统还白白花费资源去创建线程池。
方式三:线程池定义为static类型,进行类复用
明显,到这里才是正确的使用线程池的方式。static修饰的类变量只会加载一次,所有的线程共享这一个线程池了呗。以下是正确的使用代码:
public class staticTestExample {
//static 定义线程池
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
timeout,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
//使用线程池
Future<Boolean> submit = pool.submit(() -> true);
}
}
业务中的线程池的复用
这里给出以下两种思路
第一种:根据业务执行类型去创建线程池(同一类型的业务复用一个线程池)
简单来说,业务场景相同,且需要用到线程池的地方,复用一个线程池。比如,拆分任务场景,一次性需要同时拆分100个任务去执行,就可以把这100个相同业务场景的任务交给一个特定命名的线程池处理。这个线程池就是专门去处理任务拆分的。
代码如下:
public class ThreadPoolUtil {
private static final Map<String, ExecutorService> POOL_CATCH = Maps.newConcurrentMap();
/**
* 根据特定名称去缓存线程池
* @param poolName
* @return
*/
public static ExecutorService create(String poolName) {
//map有缓存直接复用缓存中存在的线程池
if (POOL_CATCH.containsKey(poolName)) {
return POOL_CATCH.get(poolName);
}
// synchronized锁字符串常量池字段 防止并发,使得map中缓存的线程池,只会创建一次
synchronized (poolName.intern()) {
int poolSize = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize,
30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.AbortPolicy());
POOL_CATCH.put(poolName, threadPoolExecutor);
return threadPoolExecutor;
}
}
}
public class CorrectTest {
public static void main(String[] args) {
ExecutorService pool = ThreadPoolUtil.create("拆分任务线程池");
//线程池执行任务
Future<Boolean> submit = pool.submit(() -> true);
}
}
第二种:根据用户登陆性质去创建线程池(用一类型的用户复用一个线程池)
简单来说,用户类型且业务场景相同,需要用到线程池的地方,复用一个线程池。
public class CorrectTest {
public static void main(String[] args) {
//模拟得到用户信息
Userinfo = getUserinfo();
//模拟用相同的用户类型(type)去创建线程池
ExecutorService pool = ThreadPoolUtil.create(Userinfo.getType);
//线程池执行任务
Future<Boolean> submit = pool.submit(() -> true);
}
}
总结
- 使用全局线程池而不是局部线程池,否则可能会有连续创建局部线程池的OOM风险
- 就算使用局部线程池,最后一定要shutdown,否则可能导致不回收核心线程的内存泄漏
- 理解线程池是为了复用的,不要代码中随意new一个局部线程池
四、结语
四种常见的线程池基本够我们使用了,但是《阿里巴巴开发手册》不建议我们直接使用Executors类中的线程池,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学需要更加明确线程池的运行规则,规避资源耗尽的风险。
但如果你及团队本身对线程池非常熟悉,又确定业务规模不会大到资源耗尽的程度(比如线程数量或任务队列长度可能达到Integer.MAX_VALUE)时,其实是可以使用JDK提供的这几个接口的,它能让我们的代码具有更强的可读性。