一:类官方注释预览
/**
* An {@link ExecutorService} that executes each submitted task using
* one of possibly several pooled threads, normally configured
* using {@link Executors} factory methods.
*
* 一种ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,
* 这些线程池通常使用Executors工厂方法进行配置。
*
* <p>Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.
*
* 线程池解决两个不同的问题:它们通常在执行大量异步任务时提供更好的性能,这是因为减少了每个任务的调用开销,
* 它们还提供了一种方法来限制和管理执行任务集合时所消耗的资源,包括线程。
* 每个 ThreadPoolExecutor 还维护一些基本统计信息,例如已完成任务的数量。
*
* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios. Otherwise, use the following guide when manually
* configuring and tuning this class:
*
* 为了在广泛的上下文中有用,这个类提供了许多可调整的参数和可扩展性挂钩。
* 但是,建议程序员使用更方便的Executors工厂方法
* Executors.newCachedThreadPool(具有自动线程回收功能的无界线程池)、
* Executors.newFixedThreadPool(固定大小的线程池)、
* Executors.newSingleThreadExecutor(单后台线程),最常见的使用场景的预配置设置。
* 否则,请在手动配置和优化此类时使用以下指南:
*
* <dl>
*
* <dt>Core and maximum pool sizes</dt>
*
* <dd>A {@code ThreadPoolExecutor} will automatically adjust the
* pool size (see {@link #getPoolSize})
* according to the bounds set by
* corePoolSize (see {@link #getCorePoolSize}) and
* maximumPoolSize (see {@link #getMaximumPoolSize}).
*
* ThreadPoolExecutor将根据corePoolSize(请参阅getCorePoolSize)
* 和maximumPoolSize(请参阅getMaximumPoolSize)设置的界限自动调整池大小(请参阅getPoolSize)。
*
* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}. </dd>
*
* 当execute(Runnable)方法提交任务时,
* 如果正在运行的线程数小于corePoolSize,就会新建一个线程来处理这个请求,即使其它工作线程空闲。
* 如果正在运行的线程数大于corePoolSize 小于maximumPoolSize,只有当queue满了的时候,才会创建一个新线程。
* 通过设置相同的corePoolSize和maximumPoolSize,可以创建一个固定大小的线程池。
* 通过将maximumPoolSize设置为一个基本无界的值(例如Integer.MAX_VALUE),将允许池子容纳任意数量的并发任务。
* 大多数情况下,corePoolSize和maximumPoolSize仅在构建时设置,
* 但是他们也可以通过setCorePoolSize和setMaximumPoolSize动态修改。
*
* <dt>On-demand construction</dt>
*
* 按需构建
*
* <dd>By default, even core threads are initially created and
* started only when new tasks arrive, but this can be overridden
* dynamically using method {@link #prestartCoreThread} or {@link
* #prestartAllCoreThreads}. You probably want to prestart threads if
* you construct the pool with a non-empty queue. </dd>
*
* 默认情况下,只有在新任务到达时才初始创建和启动核心线程,但这可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 动态覆盖。
* 如果您使用非空队列构造池,您可能想要预启动线程
*
* <dt>Creating new threads</dt>
*
* 创建新线程
*
* <dd>New threads are created using a {@link ThreadFactory}. If not
* otherwise specified, a {@link Executors#defaultThreadFactory} is
* used, that creates threads to all be in the same {@link
* ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
* non-daemon status. By supplying a different ThreadFactory, you can
* alter the thread's name, thread group, priority, daemon status,
* etc. If a {@code ThreadFactory} fails to create a thread when asked
* by returning null from {@code newThread}, the executor will
* continue, but might not be able to execute any tasks. Threads
* should possess the "modifyThread" {@code RuntimePermission}. If
* worker threads or other threads using the pool do not possess this
* permission, service may be degraded: configuration changes may not
* take effect in a timely manner, and a shutdown pool may remain in a
* state in which termination is possible but not completed.</dd>
*
* 使用一个ThreadFactory来创建新线程。如果没有特别指定,会使用Executors.defaultThreadFactory,
* 创建的线程都在同一个 ThreadGroup 中,并具有相同的 NORM_PRIORITY 优先级和非守护进程状态。
* 通过提供一个不同的ThreadFactory,你可以改变线程的名字、线程组、优先级、守护状态等。
* 当请求创建线程,但是ThreadFactory通过newThread方法创建失败,executor将继续执行,但是可能不会处理任何任务。
* 线程应该拥有RuntimePermission的"modifyThread"权限。如果工作线程或其它使用池的线程不拥有此权限,
* 服务可能会降级:配置更改可能无法及时生效,关闭池可能会保持可以终止但未完成的状态。
*
* <dt>Keep-alive times</dt>
*
* <dd>If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
* This provides a means of reducing resource consumption when the
* pool is not being actively used. If the pool becomes more active
* later, new threads will be constructed. This parameter can also be
* changed dynamically using method {@link #setKeepAliveTime(long,
* TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link
* TimeUnit#NANOSECONDS} effectively disables idle threads from ever
* terminating prior to shut down. By default, the keep-alive policy
* applies only when there are more than corePoolSize threads. But
* method {@link #allowCoreThreadTimeOut(boolean)} can be used to
* apply this time-out policy to core threads as well, so long as the
* keepAliveTime value is non-zero. </dd>
*
* 如果池中当前的线程数量大于corePoolSize,如果空闲时间超过keepAliveTime,过量的将被终止。
* 这提供了一种在未积极使用池时减少资源消耗的方法。如果池稍后变得更加活跃,则将构建新线程。
* 该参数也可以通过setKeepAliveTime来动态修改。使用Long.MAX_VALUE这个值可以在关闭前禁止终止线程。
* 默认情况下,仅当有超过 corePoolSize 的线程时,keep-alive 策略才适用。
* 但是方法 allowCoreThreadTimeOut(boolean) 也可将此超时策略应用于核心线程,只要 keepAliveTime 值不为零。
*
* <dt>Queuing</dt>
*
* <dd>Any {@link BlockingQueue} may be used to transfer and hold
* submitted tasks. The use of this queue interacts with pool sizing:
*
* 任何 BlockingQueue 都可用于传输和保持提交的任务。此队列的使用与池大小相互影响:
*
* <ul>
*
* <li> If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.</li>
* 如果正在运行的线程数少于corePoolSize,Executor总是添加一个新线程 而不是排队。
*
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
* 如果正在运行的线程数大于等于corePoolSize,Executor总是将请求进行排队,而不是添加一个新的线程。
*
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>
* 如果一个请求不能进行排队,将创建一个新的线程,除非这会超过maximumPoolSize,这种情况下任务将被拒绝。
*
* </ul>
*
* There are three general strategies for queuing:
* 排队的三种一般性策略:
* <ol>
*
* <li> <em> Direct handoffs.</em> A good default choice for a work
* queue is a {@link SynchronousQueue} that hands off tasks to threads
* without otherwise holding them. Here, an attempt to queue a task
* will fail if no threads are immediately available to run it, so a
* new thread will be constructed. This policy avoids lockups when
* handling sets of requests that might have internal dependencies.
* Direct handoffs generally require unbounded maximumPoolSizes to
* avoid rejection of new submitted tasks. This in turn admits the
* possibility of unbounded thread growth when commands continue to
* arrive on average faster than they can be processed. </li>
* 直接传递。工作队列的一个很好的默认选择是 SynchronousQueue ,它将任务交给线程而不用其他方式保留它们。
* 在这里,如果没有线程可以立即运行任务,则尝试将任务排队会失败,因此将构造一个新线程。
* 此策略在处理可能具有内部依赖关系的请求集时避免锁定。
* 直接传递通常需要无限的MaximumPoolSize,以避免拒绝新提交的任务。
* 这继而允许线程的无限增长(当任务继续以平均快于其处理速度的速度到达时)。
*
* <li><em> Unbounded queues.</em> Using an unbounded queue (for
* example a {@link LinkedBlockingQueue} without a predefined
* capacity) will cause new tasks to wait in the queue when all
* corePoolSize threads are busy. Thus, no more than corePoolSize
* threads will ever be created. (And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate when
* each task is completely independent of others, so tasks cannot
* affect each others execution; for example, in a web page server.
* While this style of queuing can be useful in smoothing out
* transient bursts of requests, it admits the possibility of
* unbounded work queue growth when commands continue to arrive on
* average faster than they can be processed. </li>
* 无界队列。使用一个无界队列(例如一个未预定义容量的LinkedBlockingQueue),
* 将导致新任务在所有corePoolSize线程忙时在队列中等待。
* 因此,创建的线程不会超过corePoolSize。(因此,maximumPoolSize的值没有任何影响。)
* 当每个任务完全独立于其他任务时,这可能是合适的,因此任务不会影响其他任务的执行;
* 例如,在网页服务器中。虽然这种排队方式有助于消除瞬时的请求突发,
* 但当命令继续以平均比处理速度更快的速度到达时,工作队列可能会无限增长。
*
* <li><em>Bounded queues.</em> A bounded queue (for example, an
* {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
* used with finite maximumPoolSizes, but can be more difficult to
* tune and control. Queue sizes and maximum pool sizes may be traded
* off for each other: Using large queues and small pools minimizes
* CPU usage, OS resources, and context-switching overhead, but can
* lead to artificially low throughput. If tasks frequently block (for
* example if they are I/O bound), a system may be able to schedule
* time for more threads than you otherwise allow. Use of small queues
* generally requires larger pool sizes, which keeps CPUs busier but
* may encounter unacceptable scheduling overhead, which also
* decreases throughput. </li>
* 有界队列。有界队列(例如,ArrayBlockingQueue)在与有限的maximumPoolSizes
* 一起使用时有助于防止资源耗尽,但可能更难优化和控制。
* 队列大小和最大池大小可以相互权衡:使用大型队列和小型池可以最大限度地
* 减少CPU使用、操作系统资源和上下文切换开销,但也可能导致人为的低吞吐量。
* 如果任务经常阻塞(例如,如果它们是I/O绑定的),系统可能能够为更多的线程安排时间,而不是您允许的线程。
* 使用小队列通常需要更大的池大小,这使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
*
* </ol>
*
* </dd>
*
* <dt>Rejected tasks</dt>
* 拒绝的任务
*
* <dd>New tasks submitted in method {@link #execute(Runnable)} will be
* <em>rejected</em> when the Executor has been shut down, and also when
* the Executor uses finite bounds for both maximum threads and work queue
* capacity, and is saturated. In either case, the {@code execute} method
* invokes the {@link
* RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
* method of its {@link RejectedExecutionHandler}. Four predefined handler
* policies are provided:
* 当执行器已关闭时,以及当执行器对最大线程和工作队列容量使用有限边界且已饱和时,
* 在方法execute(Runnable)中提交的新任务将被拒绝。
* 在这两种情况下,execute方法调用其RejectedExecutionHandler的rejectedExecution()方法。
* 提供四个预定义的策略处理器:
*
* <ol>
*
* <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
* handler throws a runtime {@link RejectedExecutionException} upon
* rejection. </li>
* 在默认的AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException。
*
* <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
* that invokes {@code execute} itself runs the task. This provides a
* simple feedback control mechanism that will slow down the rate that
* new tasks are submitted. </li>
* 在CallerRunsPolicy中,调用execute本身的线程运行任务。
* 这提供了一种简单的反馈控制机制,可以降低提交新任务的速度。
*
* <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
* cannot be executed is simply dropped. </li>
* 在DiscardPolicy中,无法执行的任务被简单地丢弃。
*
* <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
* executor is not shut down, the task at the head of the work queue
* is dropped, and then execution is retried (which can fail again,
* causing this to be repeated.) </li>
* 在DiscardOldestPolicy中,如果未关闭执行器,则会丢弃工作队列头部的任务,
* 然后重试执行(这可能再次失败,导致重复执行)
*
* </ol>
*
* It is possible to define and use other kinds of {@link
* RejectedExecutionHandler} classes. Doing so requires some care
* especially when policies are designed to work only under particular
* capacity or queuing policies. </dd>
* 可以定义和使用其他类型的RejectedExecutionHandler类。
* 这样做需要一定的谨慎,特别是当策略设计为仅在特定容量或排队策略下工作时。
*
* <dt>Hook methods</dt>
* 钩子方法
*
* <dd>This class provides {@code protected} overridable
* {@link #beforeExecute(Thread, Runnable)} and
* {@link #afterExecute(Runnable, Throwable)} methods that are called
* before and after execution of each task. These can be used to
* manipulate the execution environment; for example, reinitializing
* ThreadLocals, gathering statistics, or adding log entries.
* Additionally, method {@link #terminated} can be overridden to perform
* any special processing that needs to be done once the Executor has
* fully terminated.
* 此类提供了protected的可重写的
* beforeExecute(Thread, Runnable)和
* afterExecute(Runnable, Throwable)方法,这些方法在每个任务执行之前和之后都会被调用。
*
* <p>If hook or callback methods throw exceptions, internal worker
* threads may in turn fail and abruptly terminate.</dd>
* 如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。
*
* <dt>Queue maintenance</dt>
* 队列维护
*
* <dd>Method {@link #getQueue()} allows access to the work queue
* for purposes of monitoring and debugging. Use of this method for
* any other purpose is strongly discouraged. Two supplied methods,
* {@link #remove(Runnable)} and {@link #purge} are available to
* assist in storage reclamation when large numbers of queued tasks
* become cancelled.</dd>
* 方法getQueue()允许访问工作队列以进行监视和调试。
* 强烈反对将此方法用于任何其他目的。
* 提供的两种方法,remove(Runnable)和purge可用于在取消大量排队的任务时帮助进行存储回收。
*
* <dt>Finalization</dt>
*
* <dd>A pool that is no longer referenced in a program <em>AND</em>
* has no remaining threads will be {@code shutdown} automatically. If
* you would like to ensure that unreferenced pools are reclaimed even
* if users forget to call {@link #shutdown}, then you must arrange
* that unused threads eventually die, by setting appropriate
* keep-alive times, using a lower bound of zero core threads and/or
* setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
* 程序中不再引用且没有剩余线程的池将自动shutdown。
* 如果您希望确保即使用户忘记调用shutdown也能回收未引用的池,
* 那么您必须通过设置适当的keep-alive时间、使用零核心线程的下限
* 和/或设置allowCoreThreadTimeOut(boolean)。来安排未使用的线程最终死亡。
*
* </dl>
*
* <p><b>Extension example</b>.
* 扩展示例
*
* Most extensions of this class
* override one or more of the protected hook methods. For example,
* here is a subclass that adds a simple pause/resume feature:
* 此类的大多数扩展重写一个或多个受保护的钩子方法。
* 例如,下面是一个子类,它添加了一个简单的暂停/恢复功能:
*
* <pre> {@code
* class PausableThreadPoolExecutor extends ThreadPoolExecutor {
* private boolean isPaused;//表示是否暂停
* private ReentrantLock pauseLock = new ReentrantLock();
* private Condition unpaused = pauseLock.newCondition();
*
* public PausableThreadPoolExecutor(...) { super(...); }
*
* protected void beforeExecute(Thread t, Runnable r) {
* super.beforeExecute(t, r);
* pauseLock.lock();
* try {
* while (isPaused) unpaused.await(); //如果已暂停,等在Condition上
* } catch (InterruptedException ie) {
* t.interrupt();
* } finally {
* pauseLock.unlock();
* }
* }
*
* public void pause() {
* pauseLock.lock();
* try {
* isPaused = true;//设置暂停标志
* } finally {
* pauseLock.unlock();
* }
* }
*
* public void resume() {
* pauseLock.lock();
* try {
* isPaused = false;
* unpaused.signalAll();//恢复,则唤醒所有等待在Condition上的线程
* } finally {
* pauseLock.unlock();
* }
* }
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
}
总结:类注释和变量注释都描述的很清楚了(过一遍注释就都清楚了)。
- 是一个用线程池实现的ExecutorService。
- 通过Executors可以方便的构造不同类型的线程池(无界队列的)。
- 新任务提交时,根据当前线程数来决定是创建新线程还是加入队列。
- 无法创建线程队列也满了,则执行拒绝策略。
- 提供了一些钩子方法供子类使用。
二:核心内部变量
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* 主池控制状态,ctl,是一个包含两个概念字段的原子整数:
* workerCount, 指示有效线程数
* runState, 只是是否正在运行、关闭等。
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
* 为了将他们打包进一个int,我们限制workerCount为(2^29)-1 (大约5亿)个线程而不是(2^31)-1(20亿)个。
* 如果这在将来成为问题,可以将变量更改为AtomicLong,并调整下面的移位/掩码常数。
* 但是,在需要之前,使用int代码会更快、更简单一些。
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
* workCount是允许启动和不允许停止的worker数量。
* 该值可能瞬间的不同于活动线程的实际数量,例如,当ThreadFactory在被请求时无法创建线程,
* 并且退出的线程在终止之前仍在执行簿记时。用户可见池大小将报告为worker集合的当前大小。
*
* The runState provides the main lifecycle control, taking on values:
* runState提供主要的生命周期控制,具有以下值:
*
* RUNNING: Accept new tasks and process queued tasks
* 接受新任务并处理排队的任务
*
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* 不接受新任务,但是执行排队的任务
*
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* 不接受新任务,不执行排队的任务,并且中断正在执行的任务。
*
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* 所有任务已经结束,workCount为0,转换到状态TIDYING的线程将运行terminated()钩子方法
*
* TERMINATED: terminated() has completed
* terminated()已完成
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 这些值之间的数字顺序很重要,以便进行有序比较。
* 运行状态随时间单调增加,但不需要达到每个状态。这些转变是:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* 调用shutdown()时,可能隐式地在finalize()中
*
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* 调用shutdownNow()时
*
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* 当队列和池都是空的时候
*
* STOP -> TIDYING
* When pool is empty
* 当池是空的时候
*
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
* 当terminated()钩子方法执行完时
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
* 在awaitTermination()中等待的线程将在状态达到TERMINATED时返回。
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
* 检测从SHUTDOWN到TIDYING的转换并不像您希望的那样简单,
* 因为队列在非空之后可能变为空,而在SHUTDOWN状态期间,队列也可能变为空,
* 我们看到workerCount为0(有时需要重新检查——见下文)。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//实际最大线程数,用户指定的最大线程数不会超过此值。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 五个线程池状态,留意一下大小关系
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//获取当前线程池状态,从ctl中解析出来。
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取当前worker数量,即线程数量,也是通过ctl解析出来。
private static int workerCountOf(int c) { return c & CAPACITY; }
//将状态和worker数量两个属性组成一个ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//用于判断当前线程池是否是运行状态,只有RUNNING的值小于SHUTDOWN
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
* 用于保存任务并将其传递给工作线程的队列。
* 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty()为空,
* 因此,仅依靠isEmpty来查看队列是否为空(例如,在决定是否从关闭转换为清理时,我们必须这样做)。
* 这适用于特殊用途的队列,例如允许poll()返回null的DelayQueues,
* 即使延迟过期后poll()可能返回非null。
*
* 注意:这里是一个BlockingQueue,所以一个线程没有任务执行的时候,是通过poll、take阻塞在该queue上的。
*
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
* 锁定workers集合和相关簿记的访问权限。
* 虽然我们可以使用某种类型的并发集,但事实证明,通常最好使用锁。
* 其中一个原因是它序列化了中断的workers???,从而避免了不必要的中断风暴,尤其是在关闭期间。
* 否则,退出的线程将同时中断那些尚未中断的线程。
* 它还简化了一些与largestPoolSize等相关的统计簿记。
* 我们还在shutdown和shutdownNow时保持mainLock,以确保workers设置稳定,
* 同时分别检查是否允许中断和实际中断。
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
* 包含池中所有worker线程的集合。仅在持有mainLock时访问。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
* 支持awaitTermination方法的一个Condition
* 就是你想在线程池终止结束后被通知,可以通过awaitTermination方法await到该Condition上,
* 等线程池终止完成后,会进行terminal.signalAll()进行唤醒。
*/
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
* 跟踪达到的最大池大小。仅在mainLock下访问。
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
* 已完成任务的计数器。仅在工作线程终止时更新。仅在mainLock下访问。
*/
private long completedTaskCount;
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
* 所有用户控制参数都声明为volatile,因此正在进行的操作基于最新的值,
* 但不需要锁定,因为没有内部不变数据依赖于它们相对于其他操作同步更改。
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
* 新线程的工厂。所有线程通过这个工厂来创建(通过addWorker方法)。
* 所有调用者都必须为addWorker失败做好准备,这可能反映了系统或用户限制线程数量的策略。
* 即使未将其视为错误,创建线程的失败也可能会导致新任务被拒绝,或现有任务仍卡在队列中。
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
* 我们更进一步,保留池不变量,即使在遇到错误(例如OutOfMemoryError)时也是如此,
* 这些错误可能在尝试创建线程时抛出。由于需要在Thread.start中分配本机堆栈,
* 因此此类错误非常常见,用户需要执行清理池关闭以进行清理。
* 可能会有足够的内存来完成清理代码,而不会遇到另一个OutOfMemoryError。
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
* execute中在饱和或关闭时调用的处理程序。
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
* 空闲线程等待工作的超时时间(纳秒)。
* 当目前存在线程多于corePoolSize或allowCoreThreadTimeOut(允许核心线程超时)时,线程使用此超时。
* 否则,他们将永远等待新的工作。
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
* 如果是false(默认值),核心线程即使在空闲时也保持活着。
* 如果是true,核心线程在等待工作时使用keepAliveTime来进行超时。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
* 核心线程数是保持存活状态(不允许超时等)的最小工作线程数,
* 除非设置了allowCoreThreadTimeOut,在这种情况下,最小值为零。
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
* 最大池大小。请注意,实际最大值在内部由CAPACITY限制。
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
* 默认被拒绝的执行程序。
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
}
总结:有以下核心变量
- ctl(AtomicInteger):线程池的主要状态变量,封装了运行状态和当前线程数两个值。
通过CAS操作更改。 - workQueue(BlockingQueue<Runnable>):阻塞的任务队列,用于保存未执行的任务,
也用来hold住空闲的worker线程,通过take、poll方法。 - workers(HashSet<Worker>):Worker线程集合,存着所有创建出来的未终止的worker。
仅在持有mainLock时访问。 - mainLock(ReentrantLock):各种状态变更、workers集合变更时控制多线程并发导致错误。
- termination(Condition): 一个Condition,用于终止时通过termination.signalAll()通知其它线程。
- largestPoolSize(ing):用于记录达到的最大池大小。仅在mainLock下访问。
- completedTaskCount(long):用于记录已完成的任务数量。仅在工作线程终止时更新。仅在mainLock下访问。
- threadFactory(ThreadFactory): 用于再创建worker时创建新的线程。
- handler(RejectedExecutionHandler):用于处理拒绝逻辑。
- keepAliveTime(long):空闲线程存活时间。
- allowCoreThreadTimeOut(boolean):表示核心线程是否允许超时终止。
- corePoolSize:核心线程数的最大值。
- maximumPoolSize:线程池的最大值,实际最大值不会超过(1 << 29) - 1
- 状态的变化:
- RUNNING -> SHUTDOWN 调用shutdown时
- (RUNNING or SHUTDOWN) -> STOP 调用shutdownNow时
- SHUTDOWN -> TIDYING 调用shutdown后,当队列和池都为空的时候
- STOP -> TIDYING 调用shutdownNow后,当池为空的时候
- TIDYING -> TERMINATED 当terminated()钩子方法执行完时
三:内部类Worker
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
* Worker类主要维护运行任务的线程的中断控制状态,以及其他次要的簿记。
* 此类适时地扩展AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁的过程。
* 这可以防止旨在唤醒等待任务的工作线程的中断,而不是中断正在运行的任务。
* 我们实现了一个简单的不可重入互斥锁,而不是使用可重入锁(ReentrantLock),
* 因为我们不希望工作任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取锁。
* 此外,为了在线程实际开始运行任务之前抑制中断,我们将锁定状态初始化为负值,
* 并在启动时清除它(在runWorker中)。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
// 用来运行此worker的线程,如果工厂失败则为null。
final Thread thread;
/** Initial task to run. Possibly null. */
//要运行的初始任务。可能为null。
Runnable firstTask;
/** Per-thread task counter */
//每个线程完成的任务计数器
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// inhibit interrupts until runWorker
// 在运行worker之前阻止中断,咋阻止呢?
// 先简单说下就是中断前会先来lock,lock成功才会中断,这时state时-1,所以lock不会成功。
// start之后,lock会变为0,这时候就可以中断了。
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//将主运行循环委托给外部runWorker方法
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// 0表示未锁定状态
// The value 1 represents the locked state.
// 1表示锁定状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
//如果已经启动了,则中断,state>=0,已经不是初始状态-1,表示已经启动
//该方法是在shutdownNow是被调用的
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
总结:
- Worker是线程池中一个线程的封装类,一个worker就代表一个线程。
- worker是AQS的实现类,主要是用来表示是否正在执行任务,锁定状态表示正在执行任务。
- worker的初始状态为-1,表示还未启动,所以lock(执行任务)之前,需要现unlock再lock。
- 任务的执行逻辑不再worker中,被委托到了runWorker方法。
四:用于扩展的钩子方法
public class ThreadPoolExecutor extends AbstractExecutorService {
/* Extension hooks */
//扩展的钩子
/**
* Method invoked prior to executing the given Runnable in the
* given thread. This method is invoked by thread {@code t} that
* will execute task {@code r}, and may be used to re-initialize
* ThreadLocals, or to perform logging.
* 在给定线程中执行给定Runnable之前调用的方法。
* 此方法由执行任务r的线程t调用,可用于重新初始化ThreadLocals或执行日志记录
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.beforeExecute} at the end of
* this method.
* 此实现不做任何事情,但可以在子类中进行自定义。
* 注意:为了正确嵌套多个重写,子类通常应该在这个方法的末尾调用super.beforeExecute。
*
* @param t the thread that will run task {@code r}
* @param r the task that will be executed
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught {@code RuntimeException}
* or {@code Error} that caused execution to terminate abruptly.
* 在完成给定Runnable的执行时调用的方法。此方法由执行任务的线程调用。
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.afterExecute} at the
* beginning of this method.
* 此实现不做任何事情,但可以在子类中进行自定义。
* 注意:为了正确嵌套多个重写,子类通常应该在这个方法的开始处调用super.afterExecute
*
*
* <p><b>Note:</b> When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* {@code submit}, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method. If you would like to trap both kinds of
* failures in this method, you can further probe for such cases,
* as in this sample subclass that prints either the direct cause
* or the underlying exception if a task has been aborted:
* 当动作显式或通过submit等方法包含在任务(如FutureTask)中时,
* 这些任务对象捕获并维护计算异常,因此它们不会导致突然终止,
* 并且内部异常不会传递给此方法。
* 如果您想在这个方法中捕获这两种失败,您可以进一步探测这种情况,
* 例如在这个示例子类中,如果任务已中止,则打印直接原因或底层异常:
*
* <pre> {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
* // ...
* protected void afterExecute(Runnable r, Throwable t) {
* super.afterExecute(r, t);
* if (t == null && r instanceof Future<?>) {
* try {
* Object result = ((Future<?>) r).get();
* } catch (CancellationException ce) {
* t = ce;
* } catch (ExecutionException ee) {
* t = ee.getCause();
* } catch (InterruptedException ie) {
* Thread.currentThread().interrupt(); // ignore/reset
* }
* }
* if (t != null)
* System.out.println(t);
* }
* }}</pre>
*
* @param r the runnable that has completed
* @param t the exception that caused termination, or null if
* execution completed normally
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
* 当执行器终止时调用的方法。默认实现什么都不做。
* 注意:为了正确嵌套多个重写,子类通常应该在这个方法中调用super.terminated。
*/
protected void terminated() { }
}
总结:提供了三个子类可以扩展的钩子方法
- beforeExecute 执行任务之前调用
- afterExecute 执行任务之后调用
- terminated 终止时调用(TIDYING和TERMINATED状态之间调用)