线程池是日常开发中常用的技术,使用也非常简单,不过想使用好线程池也不是件容易的事,开发者需要不断探索底层的实现原理,才能在不同的场景中选择合适的策略,最大程度发挥线程池的作用以及避免踩坑。
一、线程池工作流程
以下是Java线程池的工作流程,涉及创建线程的参数及拒绝策略,如果读者对这部分内容不太了解,可参考其他的文档,本文不在赘述。
二、线程池进阶
1、线程池的创建
需要手动通过ThreadPoolExecutor创建,使用者要非常明确业务场景并定制线程池,避免误用可能导致的问题。
以下是阿里巴巴Java开发手册中的描述:
ThreadFactory:推荐使用guava中的ThreadFactoryBuilder创建:
new ThreadFactoryBuilder().setNameFormat("name-%d").build();
2、阻塞队列在线程池中的使用
很多同学一看到阻塞队列就自然的认为出入队列都是阻塞的,使用的阻塞队列也就没必要关心拒绝策略了,其实不然,阻塞队列在任务提交和任务获取阶段使用了不同的策略。
任务提交阶段:调用的阻塞队列的offer方法,这个方法是非阻塞的,如果插入队列失败会直接返回false,并触发拒绝策略;
获取任务阶段:使用的是take方法,此方法是阻塞的;
3、保证提交阶段任务不丢失
有三种方法:使用CallerRunsPolicy拒绝策略、自定义拒绝策略、使用MQ系统保证任务不丢失。
(1)CallerRunsPolicy拒绝策略
ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程处理
这种是最简单的策略,但需要注意的是如果任务耗时较长,会阻塞提交任务的线程,可能会成为系统瓶颈。
(2)自定义拒绝策略
既然Java线程默认使用的是offer提交任务,那我们可以自定义拒绝策略在任务提交失败时改为put阻塞提交。
缺点也是会阻塞提交线程,不过相比CallerRunsPolicy策略更能发挥多线程的优势。
RejectedExecutionHandler executionHandler = (r, executor) -> { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Producer thread interrupted", e); } };
(3)配合MQ保证任务不丢失
使用默认的ThreadPoolExecutor.AbortPolicy策略,如果抛出RejectedExecutionException异常则返回给MQ消费失败,MQ会保证自动重试。
4、保证队列、未执行完成的任务不丢失
当服务停止的时候,线程池中队列和活跃线程中未执行完成的任务可能会造成数据丢失,首先说下结论:无论采取任何策略,在Java层都不能100%保证不丢,比如机器突然断电的情况。我们还是可以采取一定的措施尽量避免任务丢失。
(1)线程池关闭
线程池关闭有两个方法:
shutdownNow方法:线程池拒绝接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行,并抛出InterruptedException异常。
shutdown方法:线程池拒绝接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。
(2)注册关闭钩子
使用以下方法注册JVM进程关闭钩子,在钩子方法中执行线程池关闭、未处理完成的任务持久化保存等。
Runtime.getRuntime().addShutdownHook()
需要注意的是:钩子方法在使用kill -9杀死进程时不会执行,一般的杀进程的方式是先执行kill,等待一段时间,如果进程还没杀死,再执行kill -9。
要保证队列中的任务不丢失,需要消费队列中的数据,发送到外部MQ中;
保证未执行完成的任务不丢失,需要在抛出InterruptedException异常后,将任务参数保证到MQ中;
需要注意的是:1)尽量不要把未完成的任务保存到本地磁盘,尤其是在经常扩缩容的弹性集群里;2)捕获InterruptedException异常后,不要做重试等耗时操作;3)需要监控任务都发送到MQ中的时间,以便调整kill -9强制执行前的等待时间。
(3)使用MQ保证任务必须执行完成
通过上面介绍的两种方式,可以处理大部分正常停止服务丢数据的任务。不过对于极端情况下,比如断电、断网等,需要严格保证任务不丢失的场景还是不能满足业务需要,这种情况下就需要依赖MQ。
方案是使用线程池的submit方法提交任务,通过future获取到任务执行完成再返回给MQ消费完成。在MQ中如何保证数据不丢失是另外一个复杂的话题了,这里不再深入探讨。
需要注意的是,如果采用这种方案,需要保证处理任务的幂等性,在操作步骤比较多的时候,复杂性也会很高。
5、ThreadLocal变量
ThreadLocal中变量的作用域是当前线程,使用线程池后会因跨线程导致数据不能传递,如果业务中使用了ThreadLocal,需要额外处理这种场景。
(1)InheritableThreadLocal
InheritableThreadLocal是在父子线程中自动传递参数,在线程池场景中不适用。
(2)手动处理
在提交任务前把ThreadLocal中的值取出来,在线程池执行时再set到线程池中线程的ThreadLocal中,并且在finally中清理数据。
缺点是每个线程池都要处理一遍,如果对上下文不熟悉,有漏传的风险。
(3)TransmittableThreadLocal
阿里开源地址:TransmittableThreadLocal
原理是通过javaagent自动处理ThreadLocal跨线程池传参,对业务开发者无感知,也是推荐的方案。
6、异常处理
(1)异常感知
execute方法:抛异常会被提交任务线程感知;
submit方法:抛异常不会被提交任务线程感知,在Future.get()执行时会被感知;
(2)统一处理方案1:异步任务里统一catch
在线程池的执行逻辑最外层,包装try、catch,处理所有异常。
缺点是: 1)所有的不同任务都要trycatch,增加了代码量。2)不存在checkedexception的地方也需要都trycatch起来,代码丑陋。
(3)统一处理方案2:覆写统一异常处理方法
此方案有两种常用实现:1)自定义线程池,继承ThreadPoolExecutor并覆写其afterExecute方法;2)创建线程池时自定义ThreadFactory,在实现里手动创建线程池,并调用Thread.setUncaughtExceptionHandler注册统一异常处理器。
(4)统一处理方案3:Future
任务提交都使用submit,并在Future.get()时捕获所有异常。
三、总结
本文从创建线程池、队列注意事项、如何保证任务不丢失、ThreadLocal、异常等方面总结了笔者的一些思考,各位读者可以对照下自己的使用场景,看本文提到的问题是否都考虑到了呢,或者你还有什么线程池方面的使用经验,欢迎交流分享。
本文链接:Java线程池进阶
作者简介:木小丰,美团Java技术专家,专注分享软件研发实践、架构思考。