震惊!!!只需三分钟,跟线程池“死锁”说拜拜

1、背景概述

问题表象:线上某一服务突然频繁宕机,所有请求都响应超时

2、排查过程:

不得不说,其中排查过程跟唐僧取经似的,历经千辛万苦,最后才取得真经。

1)、登陆监控平台发现所有mysql请求延迟特别高。

dba也反馈数据库很多大事务未提交,查看线上日志发现许多报数据库连接池没有活跃连接数的错误


image.png

2、查看mysql qps,发现qps并不高,排除了因为并发高导致连接数不够用的原因。

image.png

并且,宕机的该台机器的内存和cpu使用率当时也不高


image.png

后面又基于arthas查看线程堆栈,查看线程是否死锁等等各种方式,都没发现问题。排查到这大家已经都快要奔溃了,有些同学头发都白了,还有些头都想秃了。这到底是啥子问题?

3、柳暗花明

突然一同事大牛,默默在群里发了个traceId,说这个trace的线程走到一半就没往下运行了。机灵的我们灵机一动,立马想起了“不堪回首”的往事。


此处涉及同事隐私,马赛克

会不会是系统中线程池“死锁”了?

果然发现是线程池“死锁”了

这段比较绕,大家可以多看八百遍。
代码片段一和代码片段二使用了同一线程池,并且代码片段一的线程创建并等待代码片段二线程的执行结果
当代码片段一开启的线程把线程池打满后代码片段二从线程池中获取不到线程时,会压入线程池队列等待线程池有资源后再执行。这就导致了代码片段一和代码片段二互相等待。即线程池“死锁”了。
这里只大概描述死锁过程,如需详细了解,请移步原作者博客:https://duapp.yuque.com/team_tech/scp/rlt7sh

//代码片段一
asyncServiceExecutor.execute(() -> {
            for (DeliveryOrderDo deliveryOrderDo : deliveryOrderListDos) {
                try {
                    deliveryOrderModifyService.partAllocateInv(deliveryOrderDo.getDeliveryOrderCode());
                } catch (WmsException e) {
                    log.error("库存分配出错,错误信息:{}", e);
                    log.error("当前出库单号为:{}", deliveryOrderDo.getDeliveryOrderCode());
//                throw new WmsException("当前出库单号为:" + deliveryOrderDo.getDeliveryOrderCode() + ",其" + e.getMessage());
                }
            }
        });
//代码片段二
       /*开启异步线程池进行库存分配*/
        try {
            allocatedPojos.stream().map(
                    item -> CompletableFuture.supplyAsync
                            (() -> {
                                // 多线程异步执行
                                return inventoryAllocated.process(item);
                            }, asyncServiceExecutor)
            ).collect(Collectors.toList()).
                    stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
        } catch (Exception e) {
            log.error("库存分配异常", e);
        }

3、排查结果:

在后台疯狂调用该功能,结果确实复现了,线程池"死锁"的结论成立
至此,问题解决了一半,剩下一半就是如何去解决和避免此类问题再次发生了,毕竟,头发就那么多,掉着掉着就没了。……^ - ^

4、解决方案:

问题找到了,解决方案其实也简单,只需要主线程和子线程使用不同的线程池来处理相关业务就行了。把上述代码片段二的线程池改为使用另外一个线程池b去执行相关业务代码即可。
但是,如何去避免后续还发生这种线程池死锁问题?毕竟每次排查下来都需要花费大家大量的时间和精力(头发)。

来了来了,重点来了,只需要三分钟,买不了吃亏买不了上当

解决方案一

在创建线程池时指定ThreadFactory的newThread方法上加个主子线程是否是同一线程池的判断。
结果:本地跑测试用例后,发现并没有想象中的抛出异常。原来原因是线程池在创建时,其核心线程数都是项目启动时就被其他线程创建好了的,所以测试时使用到的子线程都是别人家的“孩子”,所以方案一是行不通了

@Data
@Slf4j
public class MyThreadFactory implements ThreadFactory {
    
    private String threadNamePrefix;
    private String nameSpace;
    private AtomicInteger i = new AtomicInteger();
    public MyThreadFactory(String threadNamePrefix){
        this.threadNamePrefix = threadNamePrefix;
    }
    //都是伪代码
    @Override
    public Thread newThread(Runnable r) {
        String currentThreadName = Thread.currentThread().getName();
        if(currentThreadName.startsWith(getThreadNamePrefix())){

            WmsException e = new WmsException(String.format("主线程不能与子线程共用同一个线程池poolName:%s",currentThreadName));
            log.error("wms monitor 主子线程不能用同一个线程池",e);
            //非生产环境,直接抛异常中断线程
            if(!"csprd".equalsIgnoreCase(nameSpace)){
                throw e;
            }
        }
        Thread t = new Thread();
        t.setName(threadNamePrefix+"_"+i.getAndIncrement());
        return t;
    }
}

解决方案二

为线程池生成代理类,在代理方法中去实现主子线程的判断逻辑,具体实现步骤如下:

步骤1、创建线程池代理类

重点说明:
1)WmsThreadPoolTaskExecutor类需要重写实现ThreadPoolTaskExecutor的所有方法
2)nameSpace变量用来控制当发生主子线程属于同一线程池时是否中断线程
3)taskExecutor变量是被代理的线程池对象
4)checkThreadPool方法用来校验是否是同一线程池

/**
 * 线程池添加监控检测
 * @Author: dwq
 * @Date: 2021/8/12 3:27 下午
 */
@Slf4j
public class WmsThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    @Value("spring.cloud.nacos.config.namespace")
    private String nameSpace;
    //被代理的线程池
    private ThreadPoolTaskExecutor taskExecutor;
    public WmsThreadPoolTaskExecutor(ThreadPoolTaskExecutor taskExecutor){
        this.taskExecutor=taskExecutor;
    }

    @Override
    public void setCorePoolSize(int corePoolSize) {
        taskExecutor.setCorePoolSize(corePoolSize);
    }

    @Override
    public int getCorePoolSize() {
        return taskExecutor.getCorePoolSize();
    }

    @Override
    public void setMaxPoolSize(int maxPoolSize) {
        taskExecutor.setMaxPoolSize(maxPoolSize);
    }

    @Override
    public int getMaxPoolSize() {
        return taskExecutor.getMaxPoolSize();
    }

    @Override
    public void setKeepAliveSeconds(int keepAliveSeconds) {
        taskExecutor.setKeepAliveSeconds(keepAliveSeconds);
    }

    @Override
    public int getKeepAliveSeconds() {
        return taskExecutor.getKeepAliveSeconds();
    }

    @Override
    public void setQueueCapacity(int queueCapacity) {
        taskExecutor.setQueueCapacity(queueCapacity);
    }

    @Override
    public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
        taskExecutor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
    }

    @Override
    public void setTaskDecorator(TaskDecorator taskDecorator) {
        taskExecutor.setTaskDecorator(taskDecorator);
    }

    @Override
    public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
        return taskExecutor.getThreadPoolExecutor();
    }

    @Override
    public int getPoolSize() {
        return taskExecutor.getPoolSize();
    }

    @Override
    public int getActiveCount() {
        return taskExecutor.getActiveCount();
    }

    @Override
    public void execute(Runnable task) {
        taskExecutor.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        taskExecutor.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        //主线程子线程是否是同一线程池校验
        checkThreadPool();
        return taskExecutor.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        checkThreadPool();
        return taskExecutor.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        checkThreadPool();
        return taskExecutor.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        checkThreadPool();
        return taskExecutor.submitListenable(task);
    }

    @Override
    public String getThreadNamePrefix() {
        return taskExecutor.getThreadNamePrefix();
    }
    /**
    * 主线程子线程是否是同一线程池校验
    **/
    private void checkThreadPool(){
        String currentThreadName = Thread.currentThread().getName();
        if(currentThreadName.startsWith(getThreadNamePrefix())){

            WmsException e = new WmsException(String.format("主线程不能与子线程共用同一个线程池poolName:%s",currentThreadName));
            log.error("wms monitor 主子线程不能用同一个线程池",e);
            if(!"csprd".equalsIgnoreCase(nameSpace)){
                throw e;
            }
        }
    }
}

步骤2、创建WmsExecutorBeanPostProcessor类,实现spring的BeanPostProcessor接口

重点说明:
postProcessAfterInitialization方法在bean实例化后执行,此处判断对象是ThreadPoolTaskExecutor类型则返回new WmsThreadPoolTaskExecutor(threadPoolTaskExecutor);代理类

package com.poizon.scm.wms.common.executors;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @Author: dwq
 * @Date: 2021/8/12 3:13 下午
 */
public class WmsExecutorBeanPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof ThreadPoolTaskExecutor){
            ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) bean;
            return new WmsThreadPoolTaskExecutor(threadPoolTaskExecutor);
        }
        return bean;
    }
}

步骤3、初始化WmsExecutorBeanPostProcessor类

    @Bean
    public WmsExecutorBeanPostProcessor wmsExecutorBeanPostProcessor(){
        return new WmsExecutorBeanPostProcessor();
    }

步骤4、最后一步,初始化线程池

/**
     * Admin后台复杂任务处理线程池
     *
     * @return
     */
    @Bean("adminThreadPoolExecutor")
    public ThreadPoolTaskExecutor adminThreadPoolExecutor() {
        log.info("start adminThreadPoolExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        //配置核心线程数
        executor.setCorePoolSize(5);

        //配置最大线程数
        executor.setMaxPoolSize(10);

        //配置队列大小
        executor.setQueueCapacity(1000);

        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("adminThreadPoolExecutor-service-");

        //执行初始化
        executor.initialize();

        return executor;
    }

最后附上测试验证方法

    @Test
    public void testThread(){
        Future<Integer> result = adminThreadPoolExecutor.submit(() -> {
            log.info("开始一个线程,threadname:{}", Thread.currentThread().getName());
            try {
                Future<Integer> future1 = adminThreadPoolExecutor.submit(() -> {
                    log.info("开始一个子线程:{}", Thread.currentThread().getName());
                    return 2;
                });
                return future1.get();

            } catch (Exception e) {
                log.error("请求异常", e);
                throw e;
            }
        });
        try {
           int count = result.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

这样即完成了一个线程池的监控。本地测试也符合期望。赶紧去试试吧

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容