回顾线程的基本知识
我们最常见的创建线程的几种方法:一是继承Thread类,二是实现Runnable的接口,三是实现Callable接口。单个线程的创建和销毁会消耗机器资源,如果线程数量多的话,频繁的创建和销毁会大大的降低程序运行的效率,耗费大量的内存,因为正常来说线程执行完毕后死亡,会被当作垃圾回收。怎么能让线程复用、统一管理命名线程、提高资源的使用率?线程池就能完美的解决这个问题。
为什么使用线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,线程池在工程启动时并不会立即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务。执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
简而言之,线程池就是一组线程的集合。
SpringBoot配置线程池
首先在启动类中配置如下Bean
@Bean("defaultThreadPool")
public ThreadPoolExecutor defaultThreadPool() {
log.info("start asyncServiceExecutor");
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HiveMetric-pool-%d").build();
// 核心线程数
int corePoolSize = 10;
// 配置最大线程数
int maximumPoolSize = 20;
// 当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间
long keepAliveTime = 2L;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
log.info("线程池,{}", threadPoolExecutor);
return threadPoolExecutor;
}
1)首先创建一个ThreadFactoryBuilder
给线程统一命名,方便管理及检索日志;
2)通过ThreadPoolExecutor
来创建一个线程池,该类来自于JUC包下,下面介绍一下重要的几个参数;
corePoolSize
:核心线程数量
maximumPoolSize
:线程池中的线程总数=核心线程数+非核心线程数
keepAliveTime
:当线程数大于内核数时,多余的空闲线程将在终止之前等待新任务的最长时间。
TimeUnit
:时间单位
BlockingQueue<Runnable>
:在执行任务之前用于保留任务的队列。
ThreadFactory
:线程工厂
RejectedExecutionHandler
:因为达到了线程界限和队列容量而在执行被阻止时使用的处理器。线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。JDK 内置的拒绝策略如下:
- AbortPolicy : 直接抛出异常,阻止系统正常运行。
- CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的
任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。 - DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再
次提交当前任务。 - DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢
失,这是最好的一种方案。
以上内置拒绝策略均实现了RejectedExecutionHandler
接口,若以上策略仍无法满足实际需要,完全可以自己扩展RejectedExecutionHandler
接口。
3)在service层通过如下方法注入
@Autowired
private ThreadPoolExecutor defaultThreadPool;
ThreadPoolExecutor
继承自AbstractExecutorService
,下面看一下submit()。
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
最后通过Future<V>
的get()
的方法取出线程的执行结果。
for (HostEntity hostEntity : hiveServer2Info) {
String hostName = hostEntity.getHostName();
Future<JSONObject> submit = defaultThreadPool.submit(() -> stringConvertJsonObject(hostName, getJmxFileContent(hostName, metricFileLocation)));
map.put(hostName, submit);
}
for (Map.Entry<String, Future<JSONObject>> next : map.entrySet()) {
String hostName = next.getKey();
try {
result.put(hostName, next.getValue().get());
} catch (Exception e) {
log.error("线程池获取返回结果异常!主机名:{}", hostName, e);
}
}
之前从多台服务器解析文件由串行改为线程池并行的解决方法,初步优化之后线上接口的响应时间由10s+降为1s。