使用线程池 ThreadPoolExecutor 过程中你是否有以下痛点呢?
代码中创建了一个 ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
凭经验设置参数值,上线后发现需要调整,改代码重启服务,非常麻烦
线程池相对开发人员来说是个黑盒,运行情况不能及时感知到,直到出现问题
如果你有以上痛点,动态可监控线程池(DynamicTp)或许能帮助到你。
如果看过 ThreadPoolExecutor 的源码,大概可以知道它对核心参数基本都有提供 set / get 方法以及一些扩展方法,可以在运行时动态修改、获取相应的值。
现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。
综上,可以总结出以下的背景:
广泛性:在 Java 开发中,想要提高系统性能,线程池已经是一个 90% 以上的人都会选择使用的基础工具
不确定性:项目中可能会创建很多线程池,既有 IO 密集型的,也有 CPU 密集型的,但线程池的参数并不好确定;需要有套机制在运行过程中动态去调整参数
无感知性:线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理
高可用性:配置变更需要及时推送到客户端;需要有高可用的配置管理推送服务,配置中心是现在大多数互联网系统都会使用的组件,与之结合可以大幅度减少开发量及接入难度
直接上代码
1.导包
<!-- Spring Cloud Alibaba + Nacos 场景必须用 spring-cloud-starter 版本 -->
<dependency>
<groupId>org.dromara.dynamictp</groupId>
<artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
<version>1.1.7</version>
</dependency>
2.线程池配置
package com.iflytek.swk.bjswk.config;
import org.dromara.dynamictp.core.support.DynamicTp;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfiguration {
/**
* 通过{@link DynamicTp} 注解定义普通juc线程池
*/
@DynamicTp("jucThreadPoolExecutor")
@Bean
public ThreadPoolExecutor jucThreadPoolExecutor() {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
}
/**
* 通过{@link DynamicTp} 注解定义spring线程池
*/
@DynamicTp("threadPoolTaskExecutor")
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("task-");
executor.initialize();
return executor;
}
// dtpExecutor1 完全由 DynamicTp 框架根据 Nacos 配置自动创建
// 配置路径:spring.dynamic.tp.executors[0].threadPoolName=dtpExecutor1
}
3.nacos配置
# ============== 动态线程池配置 ==============
spring.dynamic.tp.enabled=true
spring.dynamic.tp.enabledCollect=true
spring.dynamic.tp.collectorTypes=micrometer,logging
spring.dynamic.tp.logPath=/home/logs/dynamictp/bjswk-web/
spring.dynamic.tp.monitorInterval=5
# 告警渠道
spring.dynamic.tp.platforms[0].platform=email
spring.dynamic.tp.platforms[0].platformId=1
spring.dynamic.tp.platforms[0].receivers=lal@qq.com
# 线程池配置
spring.dynamic.tp.executors[0].threadPoolName=dtpExecutor1
spring.dynamic.tp.executors[0].threadPoolAliasName=测试线程池
spring.dynamic.tp.executors[0].corePoolSize=4
spring.dynamic.tp.executors[0].maximumPoolSize=8
spring.dynamic.tp.executors[0].queueCapacity=2000
spring.dynamic.tp.executors[0].queueType=VariableLinkedBlockingQueue
spring.dynamic.tp.executors[0].rejectedHandlerType=CallerRunsPolicy
spring.dynamic.tp.executors[0].keepAliveTime=60
spring.dynamic.tp.executors[0].threadNamePrefix=test
spring.dynamic.tp.executors[0].allowCoreThreadTimeOut=false
spring.dynamic.tp.executors[0].waitForTasksToCompleteOnShutdown=true
spring.dynamic.tp.executors[0].awaitTerminationSeconds=5
spring.dynamic.tp.executors[0].preStartAllCoreThreads=false
spring.dynamic.tp.executors[0].runTimeout=200
spring.dynamic.tp.executors[0].queueTimeout=100
spring.dynamic.tp.executors[0].taskWrapperNames=ttl,mdc
spring.dynamic.tp.executors[0].notifyEnabled=true
spring.dynamic.tp.executors[0].platformIds=1
4.测试
@GetMapping("/test")
@ApiOperation(value = "测试线程池")
public Response test() {
// 通过 DtpRegistry 动态获取最新的线程池实例
DtpExecutor dtpExecutor1 = DtpRegistry.getDtpExecutor("dtpExecutor1");
log.info("获取到动态线程池实例: {}", dtpExecutor1.getThreadPoolName());
log.info("corePoolSize: {}", dtpExecutor1.getCorePoolSize());
log.info("maximumPoolSize: {}", dtpExecutor1.getMaximumPoolSize());
dtpExecutor1.execute(() -> log.info("test task executed"));
// 返回当前线程池参数
Map<String, Object> result = new HashMap<>();
result.put("threadPoolName", dtpExecutor1.getThreadPoolName());
result.put("corePoolSize", dtpExecutor1.getCorePoolSize());
result.put("maximumPoolSize", dtpExecutor1.getMaximumPoolSize());
result.put("queueSize", dtpExecutor1.getQueue().size());
result.put("queueCapacity", dtpExecutor1.getQueueCapacity());
result.put("activeCount", dtpExecutor1.getActiveCount());
result.put("taskCount", dtpExecutor1.getTaskCount());
return Response.success(result);
}
5.在nacos上修改,然后通过接口调用看结果是否变化