字节跳动二面!面试官直接问我生产环境下如何监控线程池?还好我看了这篇文章!

线程池的监控很重要,对于前面章节讲的动态参数调整,其实还是得依赖于线程池监控的数据反馈之后才能做出调整的决策。还有就是线程池本身的运行过程对于我们来说像一个黑盒,我们没办法了解线程池中的运行状态时,出现问题没有办法及时判断和预警。

对于监控这类的场景,核心逻辑就是要拿到关键指标,然后进行上报,只要能实时拿到这些关键指标,就可以轻松实现监控以及预警功能。

ThreadPoolExecutor中提供了以下方法来获取线程池中的指标。

  • getCorePoolSize():获取核心线程数。
  • getMaximumPoolSize:获取最大线程数。
  • getQueue():获取线程池中的阻塞队列,并通过阻塞队列中的方法获取队列长度、元素个数等。
  • getPoolSize():获取线程池中的工作线程数(包括核心线程和非核心线程)。
  • getActiveCount():获取活跃线程数,也就是正在执行任务的线程。
  • getLargestPoolSize():获取线程池曾经到过的最大工作线程数。
  • getTaskCount():获取历史已完成以及正在执行的总的任务数量。

除此之外,ThreadPoolExecutor中还提供了一些未实现的钩子方法,我们可以通过重写这些方法来实现更多指标数据的获取。

  • beforeExecute,在Worker线程执行任务之前会调用的方法。
  • afterExecute,在Worker线程执行任务之后会调用的方法。
  • terminated,当线程池从状态变更到TERMINATED状态之前调用的方法。

比如我们可以在beforeExecute方法中记录当前任务开始执行的时间,再到afterExecute方法来计算任务执行的耗时、最大耗时、最小耗时、平均耗时等。

线程池监控的基本原理

我们可以通过Spring Boot提供的Actuator,自定义一个Endpoint来发布线程池的指标数据,实现线程池监控功能。当然,除了Endpoint以外,我们还可以通过JMX的方式来暴露线程池的指标信息,不管通过什么方法,核心思想都是要有一个地方看到这些数据。

了解对于Spring Boot应用监控得读者应该知道,通过Endpoint发布指标数据后,可以采用一些主流的开源监控工具来进行采集和展示。如图10-9所示,假设在Spring Boot应用中发布一个获取线程池指标信息的Endpoint,那么我们可以采用Prometheus定时去抓取目标服务器上的Metric数据,Prometheus会将采集到的数据通过Retrieval分发给TSDB进行存储。这些数据可以通过Prometheus自带的UI进行展示,也可以使用Grafana图表工具通过PromQL语句来查询Prometheus中采集的数据进行渲染。最后采用AlertManager这个组件来触发预警功能。

在这里插入图片描述

<center>图10-9 线程池指标监控</center>

图10-9中所涉及到的工具都是比较程度的开源监控组件,大家可以自行根据官方教程配置即可,而在本章节中要重点讲解的就是如何自定义Endpoint发布线程池的Metric数据。

在Spring Boot应用中发布线程池信息

对于线程池的监控实现,笔者开发了一个相对较为完整的小程序,主要涉及到几个功能:

  • 可以通过配置文件来构建线程池。
  • 扩展了ThreadPoolExecutor的实现。
  • 发布一个自定义的Endpoint。

该小程序包含的类以及功能说明如下:

  • ThreadPoolExecutorForMonitor:扩展ThreadPoolExecutor的实现类。
  • ThreadPoolConfigurationProperties:绑定application.properties的配置属性。
  • ThreadPoolForMonitorManager:线程池管理类,实现线程池的初始化。
  • ThreadPoolProperties:线程池基本属性。
  • ResizeLinkedBlockingQueue:这个类是直接复制了LinkedBlockingQueue,提供了setCapacity方法,在前面有讲解到,源码就不贴出来。
  • ThreadPoolEndpoint:自定义Endpoint。

ThreadPoolExecutorForMonitor

继承了ThreadPoolExecutor,实现了beforeExecuteafterExecute,在原有线程池的基础上新增了最短执行时间、最长执行时间、平均执行耗时的属性。

public class ThreadPoolExecutorForMonitor extends ThreadPoolExecutor {

  private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

  private static final String defaultPoolName="Default-Task";

  private static ThreadFactory threadFactory=new MonitorThreadFactory(defaultPoolName);

  public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory,defaultHandler);
  }
  public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,String poolName) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,new MonitorThreadFactory(poolName),defaultHandler);
  }
  public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,String poolName) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory,handler);
  }

  //最短执行时间
  private long minCostTime;
  //最长执行时间
  private long maxCostTime;
  //总的耗时
  private AtomicLong totalCostTime=new AtomicLong();

  private ThreadLocal<Long> startTimeThreadLocal=new ThreadLocal<>();

  @Override
  public void shutdown() {
    super.shutdown();
  }

  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    startTimeThreadLocal.set(System.currentTimeMillis());
    super.beforeExecute(t, r);
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    long costTime=System.currentTimeMillis()-startTimeThreadLocal.get();
    startTimeThreadLocal.remove();
    maxCostTime=maxCostTime>costTime?maxCostTime:costTime;
    if(getCompletedTaskCount()==0){
      minCostTime=costTime;
    }
    minCostTime=minCostTime<costTime?minCostTime:costTime;
    totalCostTime.addAndGet(costTime);
    super.afterExecute(r, t);
  }

  public long getMinCostTime() {
    return minCostTime;
  }

  public long getMaxCostTime() {
    return maxCostTime;
  }

  public long getAverageCostTime(){//平均耗时
    if(getCompletedTaskCount()==0||totalCostTime.get()==0){
      return 0;
    }
    return totalCostTime.get()/getCompletedTaskCount();
  }

  @Override
  protected void terminated() {
    super.terminated();
  }

  static class MonitorThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    MonitorThreadFactory(String poolName) {
      SecurityManager s = System.getSecurityManager();
      group = (s != null) ? s.getThreadGroup() :
      Thread.currentThread().getThreadGroup();
      namePrefix = poolName+"-pool-" +
        poolNumber.getAndIncrement() +
        "-thread-";
    }

    public Thread newThread(Runnable r) {
      Thread t = new Thread(group, r,
                            namePrefix + threadNumber.getAndIncrement(),
                            0);
      if (t.isDaemon())
        t.setDaemon(false);
      if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
      return t;
    }
  }
}

ThreadPoolConfigurationProperties

提供了获取application.properties配置文件属性的功能,

@ConfigurationProperties(prefix = "monitor.threadpool")
@Data
public class ThreadPoolConfigurationProperties {

    private List<ThreadPoolProperties>  executors=new ArrayList<>();

}

线程池的核心属性声明。

@Data
public class ThreadPoolProperties {

    private String poolName;
    private int corePoolSize;
    private int maxmumPoolSize=Runtime.getRuntime().availableProcessors();
    private long keepAliveTime=60;
    private TimeUnit unit= TimeUnit.SECONDS;
    private int queueCapacity=Integer.MAX_VALUE;
}

上述配置类要生效,需要通过@EnableConfigurationProperties开启,我们可以在Main方法上开启,代码如下。

@EnableConfigurationProperties(ThreadPoolConfigurationProperties.class)
@SpringBootApplication
public class ThreadPoolApplication {

    public static void main(String[] args) {
        SpringApplication.run(ThreadPoolApplication.class, args);
    }
}

application.properties

配置类创建好之后,我们就可以在application.properties中,通过如下方式来构建线程池。

monitor.threadpool.executors[0].pool-name=first-monitor-thread-pool
monitor.threadpool.executors[0].core-pool-size=4
monitor.threadpool.executors[0].maxmum-pool-size=8
monitor.threadpool.executors[0].queue-capacity=100

monitor.threadpool.executors[1].pool-name=second-monitor-thread-pool
monitor.threadpool.executors[1].core-pool-size=2
monitor.threadpool.executors[1].maxmum-pool-size=4
monitor.threadpool.executors[1].queue-capacity=40

ThreadPoolForMonitorManager

用来实现线程池的管理和初始化,实现线程池的统一管理,初始化的逻辑是根据application.properties中配置的属性来实现的。

  • 从配置类中获得线程池的基本配置。
  • 根据配置信息构建ThreadPoolExecutorForMonitor实例。
  • 把实例信息保存到集合中。
@Component
public class ThreadPoolForMonitorManager {

  @Autowired
  ThreadPoolConfigurationProperties poolConfigurationProperties;

  private final ConcurrentMap<String,ThreadPoolExecutorForMonitor> threadPoolExecutorForMonitorConcurrentMap=new ConcurrentHashMap<>();

  @PostConstruct
  public void init(){
    poolConfigurationProperties.getExecutors().forEach(threadPoolProperties -> {
      if(!threadPoolExecutorForMonitorConcurrentMap.containsKey(threadPoolProperties.getPoolName())){
        ThreadPoolExecutorForMonitor executorForMonitor=new ThreadPoolExecutorForMonitor(
          threadPoolProperties.getCorePoolSize(),
          threadPoolProperties.getMaxmumPoolSize(),
          threadPoolProperties.getKeepAliveTime(),
          threadPoolProperties.getUnit(),
          new ResizeLinkedBlockingQueue<>(threadPoolProperties.getQueueCapacity()),
          threadPoolProperties.getPoolName());
        threadPoolExecutorForMonitorConcurrentMap.put(threadPoolProperties.getPoolName(),executorForMonitor);
      }
    });
  }

  public ThreadPoolExecutorForMonitor getThreadPoolExecutor(String poolName){
    ThreadPoolExecutorForMonitor threadPoolExecutorForMonitor=threadPoolExecutorForMonitorConcurrentMap.get(poolName);
    if(threadPoolExecutorForMonitor==null){
      throw new RuntimeException("找不到名字为"+poolName+"的线程池");
    }
    return threadPoolExecutorForMonitor;
  }

  public ConcurrentMap<String,ThreadPoolExecutorForMonitor> getThreadPoolExecutorForMonitorConcurrentMap(){
    return this.threadPoolExecutorForMonitorConcurrentMap;
  }
}

ThreadPoolEndpoint

使用Spring-Boot-Actuator发布Endpoint,用来暴露当前应用中所有线程池的Metric数据。

读者如果不清楚在Spring Boot中自定义Endpoint,可以直接去Spring官方文档中配置,比较简单。

@Configuration
@Endpoint(id="thread-pool")
public class ThreadPoolEndpoint {
  @Autowired
  private ThreadPoolForMonitorManager threadPoolForMonitorManager;

  @ReadOperation
  public Map<String,Object> threadPoolsMetric(){
    Map<String,Object> metricMap=new HashMap<>();
    List<Map> threadPools=new ArrayList<>();
    threadPoolForMonitorManager.getThreadPoolExecutorForMonitorConcurrentMap().forEach((k,v)->{
      ThreadPoolExecutorForMonitor tpe=(ThreadPoolExecutorForMonitor) v;
      Map<String,Object> poolInfo=new HashMap<>();
      poolInfo.put("thread.pool.name",k);
      poolInfo.put("thread.pool.core.size",tpe.getCorePoolSize());
      poolInfo.put("thread.pool.largest.size",tpe.getLargestPoolSize());
      poolInfo.put("thread.pool.max.size",tpe.getMaximumPoolSize());
      poolInfo.put("thread.pool.thread.count",tpe.getPoolSize());
      poolInfo.put("thread.pool.max.costTime",tpe.getMaxCostTime());
      poolInfo.put("thread.pool.average.costTime",tpe.getAverageCostTime());
      poolInfo.put("thread.pool.min.costTime",tpe.getMinCostTime());
      poolInfo.put("thread.pool.active.count",tpe.getActiveCount());
      poolInfo.put("thread.pool.completed.taskCount",tpe.getCompletedTaskCount());
      poolInfo.put("thread.pool.queue.name",tpe.getQueue().getClass().getName());
      poolInfo.put("thread.pool.rejected.name",tpe.getRejectedExecutionHandler().getClass().getName());
      poolInfo.put("thread.pool.task.count",tpe.getTaskCount());
      threadPools.add(poolInfo);
    });
    metricMap.put("threadPools",threadPools);
    return metricMap;
  }
}

如果需要上述自定义的Endpoint可以被访问,还需要在application.properties文件中配置如下代码,意味着thread-pool Endpoint允许被访问。

management.endpoints.web.exposure.include=thread-pool

TestController

提供使用线程池的方法,用来实现在调用之前和调用之后,通过Endpoint获取到Metric数据的变化。

@RestController
public class TestController {

  private final String poolName="first-monitor-thread-pool";
  @Autowired
  ThreadPoolForMonitorManager threadPoolForMonitorManager;

  @GetMapping("/execute")
  public String doExecute(){
    ThreadPoolExecutorForMonitor tpe=threadPoolForMonitorManager.getThreadPoolExecutor(poolName);
    for (int i = 0; i < 100; i++) {
      tpe.execute(()->{
        try {
          Thread.sleep(new Random().nextInt(4000));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
    }
    return "success";
  }
}

效果演示

访问自定义Endpoint: http://ip:8080/actuator/thread-pool,就可以看到如下数据。我们可以把这个Endpoint配置到Prometheus中,Prometheus会定时抓取这些指标存储并展示,从而完成线程池的整体监控。

{
    "threadPools":[
        {
            "thread.pool.queue.name":"com.concurrent.demo.ResizeLinkedBlockingQueue",
            "thread.pool.core.size":2,
            "thread.pool.min.costTime":0,
            "thread.pool.completed.taskCount":0,
            "thread.pool.max.costTime":0,
            "thread.pool.task.count":0,
            "thread.pool.name":"second-monitor-thread-pool",
            "thread.pool.largest.size":0,
            "thread.pool.rejected.name":"java.util.concurrent.ThreadPoolExecutor$AbortPolicy",
            "thread.pool.active.count":0,
            "thread.pool.thread.count":0,
            "thread.pool.average.costTime":0,
            "thread.pool.max.size":4
        },
        {
            "thread.pool.queue.name":"com.concurrent.demo.ResizeLinkedBlockingQueue",
            "thread.pool.core.size":4,
            "thread.pool.min.costTime":65,
            "thread.pool.completed.taskCount":115,
            "thread.pool.max.costTime":3964,
            "thread.pool.task.count":200,
            "thread.pool.name":"first-monitor-thread-pool",
            "thread.pool.largest.size":4,
            "thread.pool.rejected.name":"java.util.concurrent.ThreadPoolExecutor$AbortPolicy",
            "thread.pool.active.count":4,
            "thread.pool.thread.count":4,
            "thread.pool.average.costTime":1955,
            "thread.pool.max.size":8
        }
    ]
}

总结

线程池的整体实现并不算太复杂,但是里面涉及到的一些思想和理论是可以值得我们去学习和借鉴,如基于阻塞队列的生产者消费者模型的实现、动态扩容的思想、如何通过AQS来实现安全关闭线程池、降级方案(拒绝策略)、位运算等。实际上越底层的实现,越包含更多技术层面的思想和理论。

线程池在实际使用中,如果是新手,不建议直接用Executors中提供的工厂方法,因为线程池中的参数会影响到内存以及CPU资源的占用,我们可以自己集成ThreadPoolExecutor这个类,扩展一个自己的实现,也可以自己构造ThreadPoolExecutor实例,这样能够更好的了解线程池中核心参数的意义避免不必要的生产问题。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容