1.Thread使用
new Thread的弊端:
- 每次new Thread新建对象,性能差;
- 每次新建Thread实例,线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多的系统资源导致死机或者OOM
- 缺少更多功能,如更多执行、定期执行、线程中断
所以,在实践中使用多线程时,并不会使用Thread,而是使用线程池。
2.线程池使用
线程池的好处:
- 重用存在的线程,减少对象的创建、消亡的开销,性能佳
- 可以有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
- 提供定时执行、定期执行、单线程、并发数控制等功能
3.ThreadPoolExecutor类的注释说明
ExecutorService(ThreadPoolExecutor的顶层接口)使用线程池中的线程执行每个提交的任务,通常我们使用Executors的工厂方法来创建ExecutorService。
线程池解决了两个不同的问题:
<1> 提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
<2> 统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。
为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 但是,在常见场景中,我们预配置了几种线程池,我们敦促程序员使用更方便的Executors的工厂方法直接使用。
- Executors.newCachedThreadPool(无界线程池,自动线程回收)
- Executors.newFixedThreadPool(固定大小的线程池);
- Executors.newSingleThreadExecutor(单一后台线程);
4.ThreadPoolExecutor类参数
重要初始化参数:
<1> corePoolSize:核心线程数量
<2> maximumPoolSize:线程池最大线程数
<3> workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响
<4> keepAliveTime:线程没有任务执行时最多保持多久时间终止
<5> unit:keepAliveTime的时间单位
<6> threadFactory:线程工厂,用来创建线程
<7> rejectHandler:当拒绝处理任务时的策略
- 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
- 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 。
- 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务。
- 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理。
- 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程。
-
当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭。
4.1 workQueue
workQueue是一个BlockingQueue,用于存放提交的任务,队列的实际容量与线程池大小相关联。
主要有三种队列策略:
<1> Direct handoffs 直接握手队列
Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。
此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大于平均处理速度时,会导致线程数量会无限增长问题。
<2> Unbounded queues 无界队列
当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。
<3> Bounded queues 有界队列
一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:
- 使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
- 使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
4.2 拒绝策略(RejectedExecutionHandler的实现类)
拒绝任务有两种情况:1.线程池被关闭 2.任务队列已满且maximumPoolSize已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:
<1> AbortPolicy
默认的策略,直接抛出RejectedExecutionException运行时异常
<2> CallerRunsPolicy
提供一个简单的反馈控制机制,可以减慢提交新任务的速度
<3> DiscardPolicy
直接丢弃新提交的任务
<4> DiscardOldestPolicy
如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程)
4.ThreadPoolExecutor的几种状态
RUNNING:运行态,可以处理新任务并执行队列中的任务
SHUTDOWN:关闭态,不接受新任务,但是处理队列中已存在的任务
STOP:停止态,不接受新任务,不处理队列中任务,并且打断运行中任务
TIDYING:整理态,所有任务已经结束,workerCount=0,将执行terminated()方法
TERMINATED:结束态,terminated()方法已完成
5.ThreadPoolExecutor的几种方法
<1> execute():提交任务,交给线程池执行
<2> submit():提交任务,能够返回执行结果,相当于execute+Future
<3> shutdown():关闭线程池,等待任务都执行完(进入shutdown状态)
<4> shutdownNow():关闭线程池,不等待任务执行完(进入stop状态)
适用于监控
<5> getTaskCount():线程池已执行和未执行的任务总数
<6> getCompletedTaskCount():已完成的任务数量
<7> getPoolSize():线程池当前的线程数量
<8> getActiveCount():当前线程池中正在执行任务的线程数量
6.Executor框架接口
- Executors.newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- Executors.newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- Executors.newScheduledThreadPool
创建一个周期线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
- Executors.newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}