JDK 线程池使用过程中,很多人都知道有一些关键参数需要配置,
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
也大致知道线程池的大致原理,但是不一定能解释某些现象。
有个系统,设计大致是这样的:服务A 发送消息到 MQ(具体来说是 kafka),消费端调用服务 B 实际执行消费操作。公司的中间件对 kafka 做了一层封装,能够动态配置一些参数,动态重建 consumer 应用新的配置,其中有一个参数就是并行度 parallelCount ,含义是:对于同一个 partition 分配多少个线程并行处理消息。
系统设计之初,就考虑了使用这个配置来动态调整整个系统的承载能力,因为流量弹性比较高,少的时候一天没有调用量,多的时候可能需要在较短时间内处理几十万到上百万的消息,处理时间甚至可能需要根据下游系统性能调整。所以这个参数的动态调整至关重要。
但是实际上线之后,一次大批量调用,观察到并行度调整似乎没有达到预期效果,默认 parallelCount = 1,如果业务能保证不依赖消息顺序,则可以调整并行度提高吞吐量
有一天线上收到告警,消息积压。于是赶紧调整并行度,
n=1,
n=2,
…
一切有序进行中,消息处理速度整体不断增加
n=8,
n=9,
n=10
…
n=16
但是观察线上监控,似乎处理能力不再增加了?这是怎么回事?
我记得下游系统 actual service 配置了最大 64 线程,这还差很多呢,怎么就不线性增长了?
下游系统响应变慢?
开始的时候猜想,是不是下游系统处理能力不够了?请求的响应速度变慢,所以请求堆积起来了?
由于下游系统是个外部的 HTTP 服务,所以无从得知,但是从历史经验来看,远远达不到这个系统的瓶颈,因为这个系统其实有很多的外部调用方,我们的请求量不见得算很大。
而且从 actual service 的内部打点来看,实际执行 HTTP 调用的地方 TP99 并没有变慢,和平时一样。
下游系统限流?
这是有可能的,因为下游系统这个HTTP服务本身有对各个接入放有限流,但是查了文档,当前调用量还远没有达到限流阈值。
系统内部分析
那么问题只会出现在系统内部了
查看监控
consumer 应用内部,当时的线程堆栈采样可以看出来,kafka consumer 端线程数量确实有 16 个
这样也就排除了 consumer 并行度调整不生效的问题。
actual service 内部,查看当时的线程堆栈采样,对应 consumer 并行度 16 的时候,service 内部用于处理任务的专用线程池,thread-count == 10
起初很奇怪,细想一下终于明白了。JDK 线程池确实是这个逻辑
简单来说,就是 threadCount > coreSize ,先开始排队,队列满再扩充线程池
//java.util.concurrent.ThreadPoolExecutor#execute
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
JDK 代码注释中有解释,代码短小精悍。
优化方案
知道问题所在了,那么怎么解决呢?
初步想,有两种方案。
方案一
直接把 coreSize 设置到一个足够大的值,比如 64,或者干脆配置一个 fixed size 的线程池
优点:简单直接,能解决问题
缺点:请求量低的时候大量线程闲置,浪费系统资源
方案二
这也是本篇的精髓所在了,改造 JDK 线程池。
既然缺陷在于先排队后扩容,延迟了扩容的时机,那就改成先扩容后排队,这样就能确保在一定空间下处理能力线性增长了。
怎么做呢?分析上面的代码,第二个 if 语句,isRunning(c) && workQueue.offer(command)
如果入队成功了就不会创建线程,所以只要重载 Queue,判断当前 threadCount > coreSize && threadCount < maxCount 的时候返回 false,就可以了,等到 threadCount > maxSize 的时候再实际执行入队操作。
其实这就是 tomcat 线程池的做法,细节上需要注意:queue 需要感知到 threadPool 当前的 count,需要做一些改造。
看源码:tomcat 8.0.30 版本
//org.apache.tomcat.util.threads.TaskQueue#offer
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
创建的时候持有 Pool 的引用
// org.apache.catalina.core.StandardThreadExecutor#startInternal
@Override
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
剥离开 tomcat 的一些不相关的参数,简单改造一下就可以用了。
感谢 tomcat ,随便一看都是宝藏