五、线程池
在我们的开发中经常会使用到多线程。例如在Android中,由于主线程的诸多限制,像网络请求等一些耗时的操作我们必须在子线程中运行。我们往往会通过new Thread来开启一个子线程,待子线程操作完成以后通过Handler切换到主线程中运行。这么以来我们无法管理我们所创建的子线程,并且无限制的创建子线程,它们相互之间竞争,很有可能由于占用过多资源而导致死机或者OOM。所以在Java中为我们提供了线程池来管理我们所创建的线程。
为什么要使用多线程?
-
更多的处理器核心
线程是大多数操作系统调度的基本单元,一个程序作为一个进程来运行,程序运行过程中能够创建多个线程,而一个线程在一个时刻只能运行在一个处理器核心上,采用多线程技术,将计算逻辑分配到多个处理器核心上,就会显示减少程序的处理时间,而现在计算机都是多核处理器,随着核心的增加而变得更加有效率
-
更快的响应时间
在编写一些较为复杂的代码(指的是复杂的逻辑),例如,一笔订单的创建,包括订单的数据插入、生成订单快照、通知卖家和处理货品销售数量等,我们可以使用多线程技术,将一些操作性不强的操作(生成订单、通知卖家)派发给其他线程来处理。这样的好处是响应用户请求的线程能够尽快地处理完成,缩短响应时间,提升用户体验
-
更好的编程模型
Java为多线程编程提供了良好、考究并且一致的编程模型,使开发人员能够更加专注于问题的解决,为遇到的问题建立合适的模型,而不是考虑如何将其多线程化
线程池的优势
重用线程池中的线程,避免因为线程创建和销毁所带的性能开销
能够有效控制线程池的最大并发数,避免大量的线程之间因为互相抢占系统资源而导致的阻塞现象
能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行功能
线程池的使用
-
使用步骤
// 创建线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); // 向线程池提交任务 threadPool.execute(new Runnable() { @Override public void run() { ... // 线程执行的任务 } }); // 关闭线程池 threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程 threadPool.shutdownNow(); // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
-
七大参数
-
corePoolSize
线程池中的核心线程数,默认情况下,核心线程一直存活在线程池中,即便他们在线程池中处于闲置状态。除非我们将ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这时候处于闲置的核心线程在等待新任务到来时会有超时策略,这个超时时间由keepAliveTime来指定。一旦超过所设置的超时时间,闲置的核心线程就会被终止。
-
maximumPoolSize
线程池中所容纳的最大线程数,如果活动的线程达到这个数值以后,后续的新任务将会被阻塞。包含核心线程数+非核心线程数。
-
keepAliveTime
非核心线程闲置时的超时时长,对于非核心线程,闲置时间超过这个时间,非核心线程就会被回收。只有对ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这个超时时间才会对核心线程产生效果。
-
TimeUnit
用于指定keepAliveTime参数的时间单位。他是一个枚举,可以使用的单位有天(TimeUnit.DAYS),小时(TimeUnit.HOURS),分钟(TimeUnit.MINUTES),毫秒(TimeUnit.MILLISECONDS),微秒(TimeUnit.MICROSECONDS, 千分之一毫秒)和毫微秒(TimeUnit.NANOSECONDS, 千分之一微秒);
-
workQueue
线程池中保存等待执行的任务的阻塞队列。通过线程池中的execute方法提交的Runable对象都会存储在该队列中。
-
ThreadFactory
线程工厂,为线程池提供新线程的创建。ThreadFactory是一个接口,里面只有一个newThread方法。 默认为DefaultThreadFactory类
-
RejectedExecutionHandler
饱和策略
-
-
执行流程
提交任务后,如果线程池的线程数未达到核心线程数,则创建核心线程处理任务
如果线程数大于或等于核心线程数,则将任务加入到任务队列中,线程池中的空闲线程会不断地从任务队列中取出任务进行处理。
如果任务队列已经满了,并且线程数没有达到最大的线程数,则会创建非核心线程去处理任务
如果线程数超过了最大的线程数,则执行饱和策略。
-
常用的线程池
-
定长线程池(FixedThreadPool)
它是一种线程数量固定的线程池,当线程处于空闲状态时,它们并不会被回收,除非线程池关闭了。当所有线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来。由于FixedThreadPool只有核心线程并且这些核心线程不会被回收,这意味能够更加快速的响应外界的请求。核心线程没有超时机制,另外任务队列没有大小限制(LinkedBlockingQueue<Runnable>)。
特点:只有核心线程,线程数量固定,任务队列为链表结构的有界队列。
应用场景:控制线程最大并发数
-
定时线程池(ScheduledThreadPool )
主要用于执行定时任务和具有固定周期的重复任务。核心线程数固定,非核心线程数没有限制,并且当非核心线程闲置时会立即被回收。任务队列为延时阻塞队列(DelayedWorkQueue<RunnableScheduledFuture>),Future为任务的包装类
特点:核心线程数量固定,非核心线程数量无限,执行完闲置10ms后回收,任务队列为延时阻塞队列。
应用场景:执行定时或周期性的任务。
-
可缓存线程池(CachedThreadPool)
它是一个线程数量不定的线程池,只有非核心线程,并且最大线程数为Integer.MAX_VALUE,实际上就相当于最大线程数量可以任意大。当线程中的线程都处于活跃状态时,线程池就会创建新的线程来处理任务,否则就会利用空闲的线程来处理新任务。线程池中的线程都有超时机制,60秒,超过闲置时间就会被回收。使用的任务队列为不存储元素的SynchronousQueue<Runnable>,相当于一个空集合,这就导致任何线程都会立刻被执行。当整个线程池都属于空闲状态时,线程池中的线程都会超时而停止,这个时候池中相当于是没有线程的,几乎不会占有任何系统资源的。
特点:无核心线程,非核心线程数量无限,执行完闲置60s后回收,任务队列为不存储元素的阻塞队列。
应用场景:执行大量、耗时少的任务。
-
单线程化线程池(SingleThreadExecutor)
内部只有一个核心线程,它确保所有任务都在同一个线程中按顺序执行,SingleThreadExecutor存在的意义在于统一所有的外界任务到一个线程中,使得这些任务不需要处理线程同步问题。任务队列为LinkedBlockingQueue<Runnable>
特点:只有1个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
应用场景:不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作、文件操作等。
对比:
补充:
-
阻塞队列
-
ArrayBlockingQueue
基于数组实现的有界的阻塞队列,该队列按照FIFO(先进先出)原则对队列中的元素进行排序。
-
LinkedBlockingQueue
基于链表实现的有界阻塞队列,该队列按照FIFO(先进先出)原则对队列中的元素进行排序,容量默认为Integer.MAX_VALUE。当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize(几乎不可能达到),因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
-
SynchronousQueue
内部没有任何容量的阻塞队列。在它内部没有任何的缓存空间。对于SynchronousQueue中的数据元素只有当我们试着取走的时候才可能存在,每一个put必须等待一个take操作
-
PriorityBlockingQueue
具有优先级的无限阻塞队列。
-
DelayQueue
是支持延时获取元素的无界阻塞队列。要求元素都实现Delayed接口,通过执行延时从队列中提取任务,时间没到任务取不出来。
-
FixedThreadPool和SingleThreadExecutor:主要问题是堆积的请求处理队列均采用LinkedBlockingQueue,可能会耗费非常大的内存,甚至OOM。
-
-
如何配置参数
需要针对具体情况而具体处理,不同的任务类别应采用不同规模的线程池,任务类别可划分为CPU密集型任务、IO密集型任务和混合型任务。(N代表CPU个数)
-
cpu密集型任务
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
当线程数量太小,同一时间大量请求将被阻塞在线程队列中排队等待执行线程,此时 CPU 没有得到充分利用;当线程数量太大,被创建的执行线程同时在争取 CPU 资源,又会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。通过测试可知,4~6 个线程数是最合适的。
-
io密集型任务
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
当线程数量在 8 时,线程平均执行时间是最佳的,这个线程数量和我们的计算公式所得的结果就差不多。
-
混合型
可以拆分为CPU密集型任务和IO密集型任务,当这两类任务执行时间相差无几时,通过拆分再执行的吞吐率高于串行执行的吞吐率,但若这两类任务执行时间有数据级的差距,那么没有拆分的意义。
线程池的核心线程数=(线程等待时间/线程CPU时间+1)*CPU核心数;
-
快速响应用户请求
从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
-
快速处理批量任务
这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。
我们要提高线程池的处理能力,一定要先保证一个合理的线程数量,也就是保证 CPU 处理线程的最大化。在此前提下,我们再增大线程池队列,通过队列将来不及处理的线程缓存起来。在设置缓存队列时,我们要尽量使用一个有界队列,以防因队列过大而导致的内存溢出问题。
-
Java线程池ThreadPoolExecutor八大拒绝策略浅析
内置:4个
-
AbortPolicy(中止策略)
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } 功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程 使用场景:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。 ThreadPoolExecutor中默认的策略就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因为都没有显示的设置拒绝策略,所以默认的都是这个。 但是请注意,ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。
-
CallerRunsPolicy(调用者运行策略)
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } 功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。 使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。
-
DiscardOldestPolicy(弃老策略)
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } 功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行当前任务 使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。 基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。
-
DiscardPolicy(丢弃策略)
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } 功能:直接静悄悄的丢弃这个任务,不触发任何动作 使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了
第三方实现的拒绝策略
-
dubbo[ˈdʌbəʊ]中的线程拒绝策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; private static volatile long lastPrintTime = 0; private static Semaphore guard = new Semaphore(1); public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); } private void dumpJStack() { //省略实现 } } 可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因。 1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在 2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。 3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性
-
Netty中的线程池拒绝策略
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { NewThreadRunsPolicy() { super(); } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { final Thread t = new Thread(r, "Temporary task executor"); t.start(); } catch (Throwable e) { throw new RejectedExecutionException( "Failed to start a new thread", e); } } } Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。 所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常。
-
activeMq中的线程池拒绝策略
new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { try { executor.getQueue().offer(r, 60, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); } throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); } }); activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常
-
pinpoint中的线程池拒绝策略
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler { private final RejectedExecutionHandler[] handlerChain; public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) { Objects.requireNonNull(chain, "handlerChain must not be null"); RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]); return new RejectedExecutionHandlerChain(handlerChain); } private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) { this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null"); } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { rejectedExecutionHandler.rejectedExecution(r, executor); } } } pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。
-