为什么要有线程池
因为不让主要的service线程卡主,可以继续serve新来的task。记得刚毕业的时候面试,面试官问如果一个客户端请求服务端80端口,建立连接后服务端的端口是多少,如果这时又有两个请求进来了它们和哪个端口建立连接等等。其实和这个很类似,主线程很快处理完请求,fork主进程或者创建新线程,“异步”地运行这个请求,这样主线程就不会被block,自然端口也是服务器端1024-65535之间的随机端口。
那么问题来了,如果任意来一个新的Task我们都建一个新的Thread然后执行,随后完成,再结束它。这里会有很多开销,包括创建Thread的CPU和内存,执行结束后再通过GC回收这个内存资源。
那么怎么可以提升它的效率呢,这就好像PV操作,就像火车站飞机场,在没有旅客的时候,出租车(consumer)都在等待,乘客(producer)来了,出租车(consumer)从队列里接走一名乘客,等待乘客减一,出租车也减一,以此类推。此处乘客相当于向线程池提交的Task,线程池里的Thread相当于出租车。乘客太多,出租车太少,会造成乘客排队过长,最终导致车站崩溃。如果乘客太少,出租车太多,会造成资源浪费,出租车接不到人,干等,也赚不到钱,其他地方需要坐出租车的人等不到车。那么黄金配比是多少呢?
线程数 = CPU核数 * (1 + 线程等待时间 / 线程执行时间)
出租车和乘客的模型只可以类比,不可以完全套用在线程池问题上,因为车站里可没有CPU核数这个东西,如果算作可以同时处理Task的个数,那么对于出租车来说,是趋于无线大的,因为公路允许非常多的车同时行驶,就是说只要资源允许,理想情况是,有多少乘客就有多少出租车,因为可以一下全都处理完,但现实是车站的车次和客流量的平均值是一定的,出租车资源也是有限的,那么只要出租车数的平均值可以匹配上这个客流量的平均值即可。这就是不同于线程池的地方。
扯远了,说回线程池,那么现在有了公式,怎么算理想的大小呢。这就得加一些Metric(数值统计,标记)了,比如在执行I/O任务的时候记录下时间,这就是等待时间,再在Task的开头结尾记录下,然后最终减去这个等待时间,剩下的就是CPU真正处理的时间。为什么I/O算作等待时间,因为I/O任务基本都是内核在与网络或磁盘等做交互,等待对方处理完后把数据写入缓冲区,我们本机的CPU是不花时间的,当缓冲区准备完毕会产生一个interrupt(中断)通知CPU,此时CPU把这个Task的状态从waiting改为ready,然后继续参与时间片轮转。所以其实I/O也是会用一小部分CPU的时间的,但考虑到相比于等待I/O和写入缓冲区并产生interrupt,这些CPU时间基本可以忽略不计。最终我们有了waiting time和CPU time,就可以按照核数大致算出一个合理的线程池大小,记住这里的大小只是理论上的,最终还应该结合Stress Test以及真实产品的情况再做调整。
线程池的参数
参数请参考之前我写过的一个文章,ThreadPoolExecutor。
Thread Pool基本上有这么几个比较重要的参数,coreSize
,maxSize
,blockingQueue
,rejectedExceptionHandler
。
- coreSize就是线程池要维护的核心线程数,大于这个的话,如果超过了keepAlive时间就会被干掉,最终保持coreSize
- maxSize就是最大的线程数,超过这个数量就会被拒绝,rejectedExceptionHandler就会收到通知并处理
- blockingQueue,阻塞队列,大致有四种,常用三种,BlockingArrayList,BlockingLinkedList和SynchronousQueue还有DelayedWorkerQueue。BlockingArrayList和BlockingLinkedList就是数组和链表,这个数组初始化的时候必须指定大小,链表也可以设置大小,这个大小就是blocking queue size后面会讲到。SynchronousQueue就是offer的同时必须有poll,所以可以简单理解为它是一个size为0的queue,offer的时候会返回false。Delayed queue因为没有用到,不在这里展开。
- rejectedExceptionHandler,如果超过了maxSize会抛出异常,这个handler就是来处理异常的,大致有这么几种handler,AbortPolicy,默认的策略,直接抛出RuntimeException,DiscardPolicy,安静的丢弃,CallerRunPolicy,如果线程池没有关闭则主线程来执行任务,DiscardOldestPolicy和DiscardPolicy类似,不过是从queue里取出最老的丢弃,然后再重新提交。
分配策略
如果一个任务提交进来:
- 先判断当前的poolSize是否小于coreSize,如果小于则直接创建;
- 如果大于等于,则把Task提交到BlockingQueue;
- 根据BlockingQueue的特性进行排队,如果排队失败或者队列已满,则尝试创建新线程;
- 如果总线程数小于maxSize则创建成功并开始运行,如果大于等于则抛出异常;
- RejectExceptionHandler接收到异常进行下一步处理;
遇到的问题
产品里遇到的问题是用于日志记录和展示方法调用图的一个线程池满了,导致无法再创建新的线程,最终拒绝任务,让整个请求失败。是一个P100的Metric,P99.9都没有问题,所以本地很难复现,只可以通过修改线程池大小为1同时增大任务处理时间尝试复现。
这里就先不贴图了,当线程池满了后,默认采用了AbortPolicy然后让整个请求失败了。我们是Tier-1的Service,对于这种用于debug的线程池我们是不允许fail掉整个请求的。解决办法也很简单,就是采用DiscardPolicy,同时override里面的handler方法,添加自己的代码对异常进行记录,这样我们就对何时失败以及单位时间内抛出了多少异常了如指掌了,这也很帮助我们调整合理的线程池大小。