如需转载请评论或简信,并注明出处,未经允许不得转载
目录
前言
相信很多同学都有一定的多线程开发经验,常见的实现多线程的方式有两种,一种是直接继承Thread类,一种是通过实现Runnable接口。但这两种方式都有一个共同的问题,就是线程运行完毕后都会被虚拟机销毁,当需要使用的时候又会创建新的线程。而Java中创建和销毁一个线程是比较昂贵的操作,需要系统调用。如果线程数量多的话,频繁的创建和销毁线程会大大浪费时间和效率,同时会浪费内存。所以本文将通过对线程池的学习,从而更加高效的使用多线程
在面向对象编程中,创建和销毁对象是很费时间的,提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。比如大家所熟悉的数据库连接池正是遵循这一思想而产生的
对Java对象池相关概念有兴趣的也可以看:对象池技术:如何正确创建对象
概念
线程池就是首先创建一些线程,将它们的集合称为线程池。使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务
作用
线程池的作用简单来说就是通过线程复用,减少线程的创建于销毁的性能损耗
工作机制
- 任务不是直接提交给某个线程,而是提交给整个线程池,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程
- 一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务
UML类图
源码分析
Executor
Executor
是一个接口,其只定义了一个execute()
方法:void execute(Runnable command);
Executor
只能提交Runnable
形式的任务,不支持提交Callable
带有返回值的任务
public interface Executor {
void execute(Runnable command);
}
ExecutorService
ExecutorService
也是一个接口,在Executor
的基础上加入了线程池的生命周期管理,我们可以通过shutdown()
或者shutdownNow()
方法来关闭我们的线程池。
ExecutorService
支持提交Callable
形式的任务,提交完Callable
任务后我们拿到一个Future
,它代表一个异步任务执行的结果
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
shutdown()
和shutdownNow()
区别
- 相同点
这两个方法都是非阻塞的,调用后立即返回,不会等待线程池关闭完成。如果我们需要等待线程池处理完成再返回可以使用ExecutorService#awaitTermination
来完成
- 不同点
shutdown()
会等待线程池中已经运行的任务和阻塞队列中等待执行的任务执行完成
shutdownNow()
会尝试中断线程池中已经运行的任务,阻塞队列中等待的任务不会再被执行,阻塞队列中等待执行的任务会作为返回值返回
ThreadPoolExecutor
这个类是线程池最核心的类
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:核心线程数量,当有新任务在
execute()
方法提交时,会执行以下判断:- 如果运行的线程少于
corePoolSize
,则创建新线程来处理任务,即使线程池中的其他线程是空闲的 - 如果线程池中的线程数量大于等于
corePoolSize
且小于maximumPoolSize
,则只有当workQueue
满时才创建新的线程去处理任务 - 如果设置的
corePoolSize
和maximumPoolSize
相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue
未满,则将请求放入workQueue
中,等待有空闲的线程去从workQueue
中取任务并处理 - 如果运行的线程数量大于等于
maximumPoolSize
,这时如果workQueue
已经满了,则通过handler
所指定的策略来处理任务
所以,任务提交时,判断的顺序为
corePoolSize
–>workQueue
–>maximumPoolSize
- 如果运行的线程少于
maximumPoolSize:最大线程数量
-
workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于
corePoolSize
的时候,把该任务封装成一个Worker
对象放入等待队列。workQueue主要有三种:SynchronousQueue:
SynchronousQueue
没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。使用SynchronousQueue阻塞队列一般要求maximumPoolSizes
为无界,避免线程拒绝执行操作LinkedBlockingQueue:
LinkedBlockingQueue
是一个无界(没有大小限制)缓存等待队列。当前执行的线程数量达到corePoolSize
的数量时,剩余的元素会在阻塞队列里等待,在使用此阻塞队列时maximumPoolSizes
就相当于无效了-
ArrayBlockingQueue
ArrayBlockingQueue
是一个有界缓存等待队列,可以指定缓存队列的大小,当线程数量大于corePoolSize
时,多余的任务会缓存在ArrayBlockingQueue
队列中等待有空闲的线程时继续执行;当ArrayBlockingQueue
满时,则又会开启新的线程去执行,直到线程数量达到maximumPoolSize
;当线程数已经达到最大的maximumPoolSize
时,再有新的任务到达时会执行拒绝执行策略(RejectedExecutionException
)
keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于
corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
threadFactory:它是
ThreadFactory
类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory()
来创建线程。使用默认的ThreadFactory
来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称-
handler:它是
RejectedExecutionHandler
类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:-
AbortPolicy:直接抛出异常(
RejectedExecutionException
),这是默认策略 - CallerRunsPolicy:用调用者所在的线程来执行任务
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
- DiscardPolicy:直接丢弃任务
-
AbortPolicy:直接抛出异常(
如果线程池中的线程数量大于 corePoolSize
时,如果某线程空闲时间超过keepAliveTime
,线程将被终止,直至线程池中的线程数目不大于corePoolSize
;
如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过 keepAliveTime
,线程也会被终止
ThreadPoolExecutor#execute()方法执行流程如下:
ScheduledThreadPoolExecutor
ThreadPoolExecutor
子类,它在ThreadPoolExecutor
基础上加入了任务定时执行的功能
Executors
java提供Executors
类直接创建四种线程池
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
- getTaskCount:线程池已经执行的和未执行的任务总数
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount
-
getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了
maximumPoolSize
- getPoolSize:线程池当前的线程数量
- getActiveCount:当前线程池中正在执行任务的线程数量
通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor
类中提供了几个空方法,如beforeExecute()
方法,afterExecute()
方法和terminated()
方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor
来进行扩展
总结
ThreadPoolExecutor
的使用还是很有技巧的。使用无界workQueue
可能会耗尽系统资源,使用有界workQueue
可能不能很好的满足性能,需要调节线程数和workQueue
大小,所以需要根据不同场景进行调节