1 ThreadPoolExecutor
该线程池是比较常用的线程池。参数如下:
参数名 | 解释 |
---|---|
corePoolSize | 核心线程池大小 |
maximumPoolSize | 最大线程池大小 |
keepAliveTime | 线程池中超过corePoolSize数目的空闲进程的最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 |
TimeUnit | keepAliveTime时间单位 |
workQueue | 阻塞任务队列 |
threadFactory | 新建线程工厂 |
RejectedExecutionHandler | 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理 |
* <li> If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.</li>
*
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
*
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize+workQueue时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
1.1 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
创建线程池的时候默认将corePoolSize和maximumPoolSize设置成相同值,表示不会创建出更多线程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
仔细看会发现,在创建线程池的时候默认将corePoolSize和maximumPoolSize设置成相同值,这是因为采用的阻塞线程队列采用的是LinkedBlockingQueue,该队列是一个无边界队列,所以所有未在线程中运行的任务都可以进入到该队列。实际上该线程池的大小为Integer.MAX_VALUE。
1.2 newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
这里就是同时将corePoolSize和maximumPoolSize设置成1。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
1.3 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
- 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
- 在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
这里将maximumPoolSize设置成了Integer.MAX_VALUE,是由于SynchronousQueue阻塞队列的大小是比较小的,如果不将maximumPoolSize设置成比较大的数就容易抛出异常。
一般都不建议使用Integer.MAX_VALUE大小的线程池,容易堆积大量的请求和创建大量的线程。
2 ForkJoinPool
ForkJoinPool原理类似分治法的思想,先把大的任务分成若干个小任务并计算,最后把所有小任务的计算结果合并起来。
2.1 newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
工作窃取线程池,默认所有线程都存在一个自己的任务队列,当自己线程所有任务执行完毕时,可以从别的线程的任务队列中获取到未执行的任务放入本线程执行,这就是工作窃取,使用该线程池可以有效提高CPU利用率。
每一个工作线程简单的通过以下两条原则进行活动:
- 若队列非空,则代表自己线程的Task还没执行完毕,取出Task并执行。
- 若队列为空,则随机选取一个其他的工作线程的Task并执行。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
3.ScheduledThreadPoolExecutor
该线程池继承自ThreadPoolExecutor。
3.1 newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
4 排队策略
4.1 Direct handoffs 直接提交
* <li> <em> Direct handoffs.</em> A good default choice for a work
* queue is a {@link SynchronousQueue} that hands off tasks to threads
* without otherwise holding them. Here, an attempt to queue a task
* will fail if no threads are immediately available to run it, so a
* new thread will be constructed. This policy avoids lockups when
* handling sets of requests that might have internal dependencies.
* Direct handoffs generally require unbounded maximumPoolSizes to
* avoid rejection of new submitted tasks. This in turn admits the
* possibility of unbounded thread growth when commands continue to
* arrive on average faster than they can be processed. </li>
在收到提交的任务时,会将任务直接提交给线程,并会不自己持有,如果没有线程立刻能立刻处理该任务,将会让该任务直接失败。
4.2 Unbounded queues 无界队列
* <li><em> Unbounded queues.</em> Using an unbounded queue (for
* example a {@link LinkedBlockingQueue} without a predefined
* capacity) will cause new tasks to wait in the queue when all
* corePoolSize threads are busy. Thus, no more than corePoolSize
* threads will ever be created. (And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate when
* each task is completely independent of others, so tasks cannot
* affect each others execution; for example, in a web page server.
* While this style of queuing can be useful in smoothing out
* transient bursts of requests, it admits the possibility of
* unbounded work queue growth when commands continue to arrive on
* average faster than they can be processed. </li>
使用的是一种无界队列,例如LinkedBlockingQueue队列。当线程池中线程数目达到corePoolSize时,所有的任务都会放入队列中进行等待,而不会创建新线程。所以对于maximunPoolSize的值并不会有什么影响。无界队列适合那些任务独立的情况,因为任务之间不会相互影响。
4.3 Bounded queues 有界队列
* <li><em>Bounded queues.</em> A bounded queue (for example, an
* {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
* used with finite maximumPoolSizes, but can be more difficult to
* tune and control. Queue sizes and maximum pool sizes may be traded
* off for each other: Using large queues and small pools minimizes
* CPU usage, OS resources, and context-switching overhead, but can
* lead to artificially low throughput. If tasks frequently block (for
* example if they are I/O bound), a system may be able to schedule
* time for more threads than you otherwise allow. Use of small queues
* generally requires larger pool sizes, which keeps CPUs busier but
* may encounter unacceptable scheduling overhead, which also
* decreases throughput. </li>
有界队列,例如ArrayBlockingQueue。可以用来在有限的线程池中防止资源被耗尽,但是难以维护和控制。
5.保持存活机制
* <dt>Keep-alive times</dt>
*
* <dd>If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
* This provides a means of reducing resource consumption when the
* pool is not being actively used. If the pool becomes more active
* later, new threads will be constructed. This parameter can also be
* changed dynamically using method {@link #setKeepAliveTime(long,
* TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link
* TimeUnit#NANOSECONDS} effectively disables idle threads from ever
* terminating prior to shut down. By default, the keep-alive policy
* applies only when there are more than corePoolSize threads. But
* method {@link #allowCoreThreadTimeOut(boolean)} can be used to
* apply this time-out policy to core threads as well, so long as the
* keepAliveTime value is non-zero. </dd>
如果超过corePoolSize部分的线程如果闲置了超过keepAliveTime的时间(可以通过getKeepAliveTime()方法来获取存活时间),线程将会被终止。这可以减少线程池使用不活跃时资源的浪费。可以通过setKeepAliveTime()方法来设置存活时间。可以通过设置一个Long.MAX_VALUE的值来有效的避免空闲线程在关闭之前终止。allowCoreThreadTimeOut(boolean)可以让corePoolSize的线程也适用存活机制。
6.拒绝策略
* <dt>Rejected tasks</dt>
*
* <dd>New tasks submitted in method {@link #execute(Runnable)} will be
* <em>rejected</em> when the Executor has been shut down, and also when
* the Executor uses finite bounds for both maximum threads and work queue
* capacity, and is saturated. In either case, the {@code execute} method
* invokes the {@link
* RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
* method of its {@link RejectedExecutionHandler}. Four predefined handler
* policies are provided:
*
* <ol>
*
* <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
* handler throws a runtime {@link RejectedExecutionException} upon
* rejection. </li>
*
* <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
* that invokes {@code execute} itself runs the task. This provides a
* simple feedback control mechanism that will slow down the rate that
* new tasks are submitted. </li>
*
* <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
* cannot be executed is simply dropped. </li>
*
* <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
* executor is not shut down, the task at the head of the work queue
* is dropped, and then execution is retried (which can fail again,
* causing this to be repeated.) </li>
*
* </ol>
*
* It is possible to define and use other kinds of {@link
* RejectedExecutionHandler} classes. Doing so requires some care
* especially when policies are designed to work only under particular
* capacity or queuing policies. </dd>
- AbortPolicy:默认直接抛出RejectedExecutionException异常
- CallerRunsPolicy:由提交任务线程负责执行
- DiscardPolicy:直接抛弃
- DiscardOldestPolicy:将消息队列中的第一个任务替换为当前新进来的任务执行。
7.线程池如何执行任务
public void execute(Runnable command) {
//如果Rnnable对象为空,直接抛出空指针异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 如果当前线程数目少于corePoolSize的大小,会尝试开启一个新线程给任务。同时会自动检测当前线程池状态和工作线程数目(ctl),避免产生不应该有的警告。
- 如果一个任务能够成功入队,仍然需要进行双重检查来判断是否需要开启一个线程。因为可能在上次检查后有新的线程停止或者在进入方法后线程池就停止了。如果线程不再运行状态就从队列中移除并拒绝该任务,否则当没有正在运行的线程就创建一个新线程。
- 如果任务进入队列失败,尝试开启一个新线程,如果新线程创建失败(可能线程池已关闭或者已达到最大值),就拒绝该任务。
8.addWorker()
方法的作用:
检查一个新worker能否按照线程池的状态以及线程数目被添加进去。如果满足条件,会创建一个新worker,并把第一个任务作为自己的第一个任务。如果线程池已经被关闭或停止,该方法就会放回false。同样,如果创建新线程失败也会返回false。如果创建失败,有可能线程工厂回返回一个null,有可能跑出一个OutOfMemoryError的错误,同时我们还要进行回滚。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
首先就是对各种状态的判断,然后通过CAS将workerCont的值加1,如果失败通过自旋的方式去自增。后面,新创建一个worker,并通过ReetranLock进行加锁。检查持有锁后线程池的状态,如果此时线程已经启动会抛出IllegalThreadStateException的异常,同时更新largestPoolSize的值,然后启动线程。如果添加失败,就执行addWorkerFailed()方法。
9.addWorkerFailed
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
先进行加锁,判断Worker是否存在,如果存在,就从workers中移除它。同时,对workCount进行自减。
9.getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}