02_线程池_2_基础概念

一:类官方注释预览

/**
 * An {@link ExecutorService} that executes each submitted task using
 * one of possibly several pooled threads, normally configured
 * using {@link Executors} factory methods.
 * 
 * 一种ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,
 * 这些线程池通常使用Executors工厂方法进行配置。
 *
 * <p>Thread pools address two different problems: they usually
 * provide improved performance when executing large numbers of
 * asynchronous tasks, due to reduced per-task invocation overhead,
 * and they provide a means of bounding and managing the resources,
 * including threads, consumed when executing a collection of tasks.
 * Each {@code ThreadPoolExecutor} also maintains some basic
 * statistics, such as the number of completed tasks.
 * 
 * 线程池解决两个不同的问题:它们通常在执行大量异步任务时提供更好的性能,这是因为减少了每个任务的调用开销,
 * 它们还提供了一种方法来限制和管理执行任务集合时所消耗的资源,包括线程。
 * 每个 ThreadPoolExecutor 还维护一些基本统计信息,例如已完成任务的数量。
 *
 * <p>To be useful across a wide range of contexts, this class
 * provides many adjustable parameters and extensibility
 * hooks. However, programmers are urged to use the more convenient
 * {@link Executors} factory methods {@link
 * Executors#newCachedThreadPool} (unbounded thread pool, with
 * automatic thread reclamation), {@link Executors#newFixedThreadPool}
 * (fixed size thread pool) and {@link
 * Executors#newSingleThreadExecutor} (single background thread), that
 * preconfigure settings for the most common usage
 * scenarios. Otherwise, use the following guide when manually
 * configuring and tuning this class:
 * 
 * 为了在广泛的上下文中有用,这个类提供了许多可调整的参数和可扩展性挂钩。
 * 但是,建议程序员使用更方便的Executors工厂方法
 * Executors.newCachedThreadPool(具有自动线程回收功能的无界线程池)、
 * Executors.newFixedThreadPool(固定大小的线程池)、
 * Executors.newSingleThreadExecutor(单后台线程),最常见的使用场景的预配置设置。
 * 否则,请在手动配置和优化此类时使用以下指南:
 *
 * <dl>
 *
 * <dt>Core and maximum pool sizes</dt>
 *
 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the
 * pool size (see {@link #getPoolSize})
 * according to the bounds set by
 * corePoolSize (see {@link #getCorePoolSize}) and
 * maximumPoolSize (see {@link #getMaximumPoolSize}).
 * 
 * ThreadPoolExecutor将根据corePoolSize(请参阅getCorePoolSize)
 * 和maximumPoolSize(请参阅getMaximumPoolSize)设置的界限自动调整池大小(请参阅getPoolSize)。
 *
 * When a new task is submitted in method {@link #execute(Runnable)},
 * and fewer than corePoolSize threads are running, a new thread is
 * created to handle the request, even if other worker threads are
 * idle.  If there are more than corePoolSize but less than
 * maximumPoolSize threads running, a new thread will be created only
 * if the queue is full.  By setting corePoolSize and maximumPoolSize
 * the same, you create a fixed-size thread pool. By setting
 * maximumPoolSize to an essentially unbounded value such as {@code
 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
 * number of concurrent tasks. Most typically, core and maximum pool
 * sizes are set only upon construction, but they may also be changed
 * dynamically using {@link #setCorePoolSize} and {@link
 * #setMaximumPoolSize}. </dd>
 * 
 * 当execute(Runnable)方法提交任务时,
 * 如果正在运行的线程数小于corePoolSize,就会新建一个线程来处理这个请求,即使其它工作线程空闲。
 * 如果正在运行的线程数大于corePoolSize 小于maximumPoolSize,只有当queue满了的时候,才会创建一个新线程。
 * 通过设置相同的corePoolSize和maximumPoolSize,可以创建一个固定大小的线程池。
 * 通过将maximumPoolSize设置为一个基本无界的值(例如Integer.MAX_VALUE),将允许池子容纳任意数量的并发任务。
 * 大多数情况下,corePoolSize和maximumPoolSize仅在构建时设置,
 * 但是他们也可以通过setCorePoolSize和setMaximumPoolSize动态修改。
 *
 * <dt>On-demand construction</dt>
 * 
 * 按需构建
 *
 * <dd>By default, even core threads are initially created and
 * started only when new tasks arrive, but this can be overridden
 * dynamically using method {@link #prestartCoreThread} or {@link
 * #prestartAllCoreThreads}.  You probably want to prestart threads if
 * you construct the pool with a non-empty queue. </dd>
 * 
 * 默认情况下,只有在新任务到达时才初始创建和启动核心线程,但这可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 动态覆盖。
 * 如果您使用非空队列构造池,您可能想要预启动线程
 *
 * <dt>Creating new threads</dt>
 * 
 * 创建新线程
 *
 * <dd>New threads are created using a {@link ThreadFactory}.  If not
 * otherwise specified, a {@link Executors#defaultThreadFactory} is
 * used, that creates threads to all be in the same {@link
 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
 * non-daemon status. By supplying a different ThreadFactory, you can
 * alter the thread's name, thread group, priority, daemon status,
 * etc. If a {@code ThreadFactory} fails to create a thread when asked
 * by returning null from {@code newThread}, the executor will
 * continue, but might not be able to execute any tasks. Threads
 * should possess the "modifyThread" {@code RuntimePermission}. If
 * worker threads or other threads using the pool do not possess this
 * permission, service may be degraded: configuration changes may not
 * take effect in a timely manner, and a shutdown pool may remain in a
 * state in which termination is possible but not completed.</dd>
 * 
 * 使用一个ThreadFactory来创建新线程。如果没有特别指定,会使用Executors.defaultThreadFactory,
 * 创建的线程都在同一个 ThreadGroup 中,并具有相同的 NORM_PRIORITY 优先级和非守护进程状态。
 * 通过提供一个不同的ThreadFactory,你可以改变线程的名字、线程组、优先级、守护状态等。
 * 当请求创建线程,但是ThreadFactory通过newThread方法创建失败,executor将继续执行,但是可能不会处理任何任务。
 * 线程应该拥有RuntimePermission的"modifyThread"权限。如果工作线程或其它使用池的线程不拥有此权限,
 * 服务可能会降级:配置更改可能无法及时生效,关闭池可能会保持可以终止但未完成的状态。
 *
 * <dt>Keep-alive times</dt>
 *
 * <dd>If the pool currently has more than corePoolSize threads,
 * excess threads will be terminated if they have been idle for more
 * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
 * This provides a means of reducing resource consumption when the
 * pool is not being actively used. If the pool becomes more active
 * later, new threads will be constructed. This parameter can also be
 * changed dynamically using method {@link #setKeepAliveTime(long,
 * TimeUnit)}.  Using a value of {@code Long.MAX_VALUE} {@link
 * TimeUnit#NANOSECONDS} effectively disables idle threads from ever
 * terminating prior to shut down. By default, the keep-alive policy
 * applies only when there are more than corePoolSize threads. But
 * method {@link #allowCoreThreadTimeOut(boolean)} can be used to
 * apply this time-out policy to core threads as well, so long as the
 * keepAliveTime value is non-zero. </dd>
 * 
 * 如果池中当前的线程数量大于corePoolSize,如果空闲时间超过keepAliveTime,过量的将被终止。
 * 这提供了一种在未积极使用池时减少资源消耗的方法。如果池稍后变得更加活跃,则将构建新线程。
 * 该参数也可以通过setKeepAliveTime来动态修改。使用Long.MAX_VALUE这个值可以在关闭前禁止终止线程。
 * 默认情况下,仅当有超过 corePoolSize 的线程时,keep-alive 策略才适用。
 * 但是方法  allowCoreThreadTimeOut(boolean) 也可将此超时策略应用于核心线程,只要 keepAliveTime 值不为零。
 *
 * <dt>Queuing</dt>
 *
 * <dd>Any {@link BlockingQueue} may be used to transfer and hold
 * submitted tasks.  The use of this queue interacts with pool sizing:
 * 
 * 任何 BlockingQueue 都可用于传输和保持提交的任务。此队列的使用与池大小相互影响:
 *
 * <ul>
 *
 * <li> If fewer than corePoolSize threads are running, the Executor
 * always prefers adding a new thread
 * rather than queuing.</li>
 * 如果正在运行的线程数少于corePoolSize,Executor总是添加一个新线程 而不是排队。
 *
 * <li> If corePoolSize or more threads are running, the Executor
 * always prefers queuing a request rather than adding a new
 * thread.</li>
 * 如果正在运行的线程数大于等于corePoolSize,Executor总是将请求进行排队,而不是添加一个新的线程。
 *
 * <li> If a request cannot be queued, a new thread is created unless
 * this would exceed maximumPoolSize, in which case, the task will be
 * rejected.</li>
 * 如果一个请求不能进行排队,将创建一个新的线程,除非这会超过maximumPoolSize,这种情况下任务将被拒绝。
 *
 * </ul>
 *
 * There are three general strategies for queuing:
 * 排队的三种一般性策略:
 * <ol>
 *
 * <li> <em> Direct handoffs.</em> A good default choice for a work
 * queue is a {@link SynchronousQueue} that hands off tasks to threads
 * without otherwise holding them. Here, an attempt to queue a task
 * will fail if no threads are immediately available to run it, so a
 * new thread will be constructed. This policy avoids lockups when
 * handling sets of requests that might have internal dependencies.
 * Direct handoffs generally require unbounded maximumPoolSizes to
 * avoid rejection of new submitted tasks. This in turn admits the
 * possibility of unbounded thread growth when commands continue to
 * arrive on average faster than they can be processed.  </li>
 * 直接传递。工作队列的一个很好的默认选择是 SynchronousQueue ,它将任务交给线程而不用其他方式保留它们。
 * 在这里,如果没有线程可以立即运行任务,则尝试将任务排队会失败,因此将构造一个新线程。
 * 此策略在处理可能具有内部依赖关系的请求集时避免锁定。
 * 直接传递通常需要无限的MaximumPoolSize,以避免拒绝新提交的任务。
 * 这继而允许线程的无限增长(当任务继续以平均快于其处理速度的速度到达时)。
 *
 * <li><em> Unbounded queues.</em> Using an unbounded queue (for
 * example a {@link LinkedBlockingQueue} without a predefined
 * capacity) will cause new tasks to wait in the queue when all
 * corePoolSize threads are busy. Thus, no more than corePoolSize
 * threads will ever be created. (And the value of the maximumPoolSize
 * therefore doesn't have any effect.)  This may be appropriate when
 * each task is completely independent of others, so tasks cannot
 * affect each others execution; for example, in a web page server.
 * While this style of queuing can be useful in smoothing out
 * transient bursts of requests, it admits the possibility of
 * unbounded work queue growth when commands continue to arrive on
 * average faster than they can be processed.  </li>
 * 无界队列。使用一个无界队列(例如一个未预定义容量的LinkedBlockingQueue),
 * 将导致新任务在所有corePoolSize线程忙时在队列中等待。
 * 因此,创建的线程不会超过corePoolSize。(因此,maximumPoolSize的值没有任何影响。)
 * 当每个任务完全独立于其他任务时,这可能是合适的,因此任务不会影响其他任务的执行;
 * 例如,在网页服务器中。虽然这种排队方式有助于消除瞬时的请求突发,
 * 但当命令继续以平均比处理速度更快的速度到达时,工作队列可能会无限增长。
 *
 * <li><em>Bounded queues.</em> A bounded queue (for example, an
 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
 * used with finite maximumPoolSizes, but can be more difficult to
 * tune and control.  Queue sizes and maximum pool sizes may be traded
 * off for each other: Using large queues and small pools minimizes
 * CPU usage, OS resources, and context-switching overhead, but can
 * lead to artificially low throughput.  If tasks frequently block (for
 * example if they are I/O bound), a system may be able to schedule
 * time for more threads than you otherwise allow. Use of small queues
 * generally requires larger pool sizes, which keeps CPUs busier but
 * may encounter unacceptable scheduling overhead, which also
 * decreases throughput.  </li>
 * 有界队列。有界队列(例如,ArrayBlockingQueue)在与有限的maximumPoolSizes
 * 一起使用时有助于防止资源耗尽,但可能更难优化和控制。
 * 队列大小和最大池大小可以相互权衡:使用大型队列和小型池可以最大限度地
 * 减少CPU使用、操作系统资源和上下文切换开销,但也可能导致人为的低吞吐量。
 * 如果任务经常阻塞(例如,如果它们是I/O绑定的),系统可能能够为更多的线程安排时间,而不是您允许的线程。
 * 使用小队列通常需要更大的池大小,这使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
 *
 * </ol>
 *
 * </dd>
 *
 * <dt>Rejected tasks</dt>
 * 拒绝的任务
 *
 * <dd>New tasks submitted in method {@link #execute(Runnable)} will be
 * <em>rejected</em> when the Executor has been shut down, and also when
 * the Executor uses finite bounds for both maximum threads and work queue
 * capacity, and is saturated.  In either case, the {@code execute} method
 * invokes the {@link
 * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
 * method of its {@link RejectedExecutionHandler}.  Four predefined handler
 * policies are provided:
 * 当执行器已关闭时,以及当执行器对最大线程和工作队列容量使用有限边界且已饱和时,
 * 在方法execute(Runnable)中提交的新任务将被拒绝。
 * 在这两种情况下,execute方法调用其RejectedExecutionHandler的rejectedExecution()方法。
 * 提供四个预定义的策略处理器:
 *
 * <ol>
 *
 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
 * handler throws a runtime {@link RejectedExecutionException} upon
 * rejection. </li>
 * 在默认的AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException。
 *
 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
 * that invokes {@code execute} itself runs the task. This provides a
 * simple feedback control mechanism that will slow down the rate that
 * new tasks are submitted. </li>
 * 在CallerRunsPolicy中,调用execute本身的线程运行任务。
 * 这提供了一种简单的反馈控制机制,可以降低提交新任务的速度。
 *
 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
 * cannot be executed is simply dropped.  </li>
 * 在DiscardPolicy中,无法执行的任务被简单地丢弃。
 *
 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
 * executor is not shut down, the task at the head of the work queue
 * is dropped, and then execution is retried (which can fail again,
 * causing this to be repeated.) </li>
 * 在DiscardOldestPolicy中,如果未关闭执行器,则会丢弃工作队列头部的任务,
 * 然后重试执行(这可能再次失败,导致重复执行)
 *
 * </ol>
 *
 * It is possible to define and use other kinds of {@link
 * RejectedExecutionHandler} classes. Doing so requires some care
 * especially when policies are designed to work only under particular
 * capacity or queuing policies. </dd>
 * 可以定义和使用其他类型的RejectedExecutionHandler类。
 * 这样做需要一定的谨慎,特别是当策略设计为仅在特定容量或排队策略下工作时。
 *
 * <dt>Hook methods</dt>
 * 钩子方法
 *
 * <dd>This class provides {@code protected} overridable
 * {@link #beforeExecute(Thread, Runnable)} and
 * {@link #afterExecute(Runnable, Throwable)} methods that are called
 * before and after execution of each task.  These can be used to
 * manipulate the execution environment; for example, reinitializing
 * ThreadLocals, gathering statistics, or adding log entries.
 * Additionally, method {@link #terminated} can be overridden to perform
 * any special processing that needs to be done once the Executor has
 * fully terminated.
 * 此类提供了protected的可重写的
 * beforeExecute(Thread, Runnable)和
 * afterExecute(Runnable, Throwable)方法,这些方法在每个任务执行之前和之后都会被调用。
 *
 * <p>If hook or callback methods throw exceptions, internal worker
 * threads may in turn fail and abruptly terminate.</dd>
 * 如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。
 *
 * <dt>Queue maintenance</dt>
 * 队列维护
 *
 * <dd>Method {@link #getQueue()} allows access to the work queue
 * for purposes of monitoring and debugging.  Use of this method for
 * any other purpose is strongly discouraged.  Two supplied methods,
 * {@link #remove(Runnable)} and {@link #purge} are available to
 * assist in storage reclamation when large numbers of queued tasks
 * become cancelled.</dd>
 * 方法getQueue()允许访问工作队列以进行监视和调试。
 * 强烈反对将此方法用于任何其他目的。
 * 提供的两种方法,remove(Runnable)和purge可用于在取消大量排队的任务时帮助进行存储回收。
 *
 * <dt>Finalization</dt>
 *
 * <dd>A pool that is no longer referenced in a program <em>AND</em>
 * has no remaining threads will be {@code shutdown} automatically. If
 * you would like to ensure that unreferenced pools are reclaimed even
 * if users forget to call {@link #shutdown}, then you must arrange
 * that unused threads eventually die, by setting appropriate
 * keep-alive times, using a lower bound of zero core threads and/or
 * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
 * 程序中不再引用且没有剩余线程的池将自动shutdown。
 * 如果您希望确保即使用户忘记调用shutdown也能回收未引用的池,
 * 那么您必须通过设置适当的keep-alive时间、使用零核心线程的下限
 * 和/或设置allowCoreThreadTimeOut(boolean)。来安排未使用的线程最终死亡。
 *
 * </dl>
 *
 * <p><b>Extension example</b>. 
 * 扩展示例
 *
 * Most extensions of this class
 * override one or more of the protected hook methods. For example,
 * here is a subclass that adds a simple pause/resume feature:
 * 此类的大多数扩展重写一个或多个受保护的钩子方法。
 * 例如,下面是一个子类,它添加了一个简单的暂停/恢复功能:
 *
 *  <pre> {@code
 * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 *   private boolean isPaused;//表示是否暂停
 *   private ReentrantLock pauseLock = new ReentrantLock();
 *   private Condition unpaused = pauseLock.newCondition();
 *
 *   public PausableThreadPoolExecutor(...) { super(...); }
 *
 *   protected void beforeExecute(Thread t, Runnable r) {
 *     super.beforeExecute(t, r);
 *     pauseLock.lock();
 *     try {
 *       while (isPaused) unpaused.await(); //如果已暂停,等在Condition上
 *     } catch (InterruptedException ie) {
 *       t.interrupt();
 *     } finally {
 *       pauseLock.unlock();
 *     }
 *   }
 *
 *   public void pause() {
 *     pauseLock.lock();
 *     try {
 *       isPaused = true;//设置暂停标志
 *     } finally {
 *       pauseLock.unlock();
 *     }
 *   }
 *
 *   public void resume() {
 *     pauseLock.lock();
 *     try {
 *       isPaused = false;
 *       unpaused.signalAll();//恢复,则唤醒所有等待在Condition上的线程
 *     } finally {
 *       pauseLock.unlock();
 *     }
 *   }
 * }}</pre>
 *
 * @since 1.5
 * @author Doug Lea
 */
public class ThreadPoolExecutor extends AbstractExecutorService {
}

总结:类注释和变量注释都描述的很清楚了(过一遍注释就都清楚了)。

  1. 是一个用线程池实现的ExecutorService。
  2. 通过Executors可以方便的构造不同类型的线程池(无界队列的)。
  3. 新任务提交时,根据当前线程数来决定是创建新线程还是加入队列。
  4. 无法创建线程队列也满了,则执行拒绝策略。
  5. 提供了一些钩子方法供子类使用。

二:核心内部变量

public class ThreadPoolExecutor extends AbstractExecutorService {

    /**
     * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     * 主池控制状态,ctl,是一个包含两个概念字段的原子整数:
     *    workerCount, 指示有效线程数
     *    runState,    只是是否正在运行、关闭等。
     *
     * In order to pack them into one int, we limit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.
     * 为了将他们打包进一个int,我们限制workerCount为(2^29)-1 (大约5亿)个线程而不是(2^31)-1(20亿)个。
     * 如果这在将来成为问题,可以将变量更改为AtomicLong,并调整下面的移位/掩码常数。
     * 但是,在需要之前,使用int代码会更快、更简单一些。
     *
     * The workerCount is the number of workers that have been
     * permitted to start and not permitted to stop.  The value may be
     * transiently different from the actual number of live threads,
     * for example when a ThreadFactory fails to create a thread when
     * asked, and when exiting threads are still performing
     * bookkeeping before terminating. The user-visible pool size is
     * reported as the current size of the workers set.
     * workCount是允许启动和不允许停止的worker数量。
     * 该值可能瞬间的不同于活动线程的实际数量,例如,当ThreadFactory在被请求时无法创建线程,
     * 并且退出的线程在终止之前仍在执行簿记时。用户可见池大小将报告为worker集合的当前大小。
     *
     * The runState provides the main lifecycle control, taking on values:
     * runState提供主要的生命周期控制,具有以下值:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *             接受新任务并处理排队的任务
     *             
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *             不接受新任务,但是执行排队的任务
     *             
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *             不接受新任务,不执行排队的任务,并且中断正在执行的任务。
     *             
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *             所有任务已经结束,workCount为0,转换到状态TIDYING的线程将运行terminated()钩子方法
     *             
     *   TERMINATED: terminated() has completed 
     *               terminated()已完成
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     * 这些值之间的数字顺序很重要,以便进行有序比较。
     * 运行状态随时间单调增加,但不需要达到每个状态。这些转变是:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     *    调用shutdown()时,可能隐式地在finalize()中
     *    
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     *    调用shutdownNow()时
     *    
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     *    当队列和池都是空的时候
     *    
     * STOP -> TIDYING
     *    When pool is empty
     *    当池是空的时候
     *    
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *    当terminated()钩子方法执行完时
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     * 在awaitTermination()中等待的线程将在状态达到TERMINATED时返回。
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     * 检测从SHUTDOWN到TIDYING的转换并不像您希望的那样简单,
     * 因为队列在非空之后可能变为空,而在SHUTDOWN状态期间,队列也可能变为空,
     * 我们看到workerCount为0(有时需要重新检查——见下文)。
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static final int COUNT_BITS = Integer.SIZE - 3;
    
    //实际最大线程数,用户指定的最大线程数不会超过此值。
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 五个线程池状态,留意一下大小关系
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    //获取当前线程池状态,从ctl中解析出来。
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
    //获取当前worker数量,即线程数量,也是通过ctl解析出来。
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
    //将状态和worker数量两个属性组成一个ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //用于判断当前线程池是否是运行状态,只有RUNNING的值小于SHUTDOWN
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     * 用于保存任务并将其传递给工作线程的队列。
     * 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty()为空,
     * 因此,仅依靠isEmpty来查看队列是否为空(例如,在决定是否从关闭转换为清理时,我们必须这样做)。
     * 这适用于特殊用途的队列,例如允许poll()返回null的DelayQueues,
     * 即使延迟过期后poll()可能返回非null。
     * 
     * 注意:这里是一个BlockingQueue,所以一个线程没有任务执行的时候,是通过poll、take阻塞在该queue上的。
     * 
     */
    private final BlockingQueue<Runnable> workQueue;

    /**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     * 锁定workers集合和相关簿记的访问权限。
     * 虽然我们可以使用某种类型的并发集,但事实证明,通常最好使用锁。
     * 其中一个原因是它序列化了中断的workers???,从而避免了不必要的中断风暴,尤其是在关闭期间。
     * 否则,退出的线程将同时中断那些尚未中断的线程。
     * 它还简化了一些与largestPoolSize等相关的统计簿记。
     * 我们还在shutdown和shutdownNow时保持mainLock,以确保workers设置稳定,
     * 同时分别检查是否允许中断和实际中断。
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     * 包含池中所有worker线程的集合。仅在持有mainLock时访问。
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination
     * 支持awaitTermination方法的一个Condition
     * 就是你想在线程池终止结束后被通知,可以通过awaitTermination方法await到该Condition上,
     * 等线程池终止完成后,会进行terminal.signalAll()进行唤醒。
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     * 跟踪达到的最大池大小。仅在mainLock下访问。
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     * 已完成任务的计数器。仅在工作线程终止时更新。仅在mainLock下访问。
     */
    private long completedTaskCount;

    /*
     * All user control parameters are declared as volatiles so that
     * ongoing actions are based on freshest values, but without need
     * for locking, since no internal invariants depend on them
     * changing synchronously with respect to other actions.
     * 所有用户控制参数都声明为volatile,因此正在进行的操作基于最新的值,
     * 但不需要锁定,因为没有内部不变数据依赖于它们相对于其他操作同步更改。
     */

    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     * 新线程的工厂。所有线程通过这个工厂来创建(通过addWorker方法)。
     * 所有调用者都必须为addWorker失败做好准备,这可能反映了系统或用户限制线程数量的策略。
     * 即使未将其视为错误,创建线程的失败也可能会导致新任务被拒绝,或现有任务仍卡在队列中。
     *
     * We go further and preserve pool invariants even in the face of
     * errors such as OutOfMemoryError, that might be thrown while
     * trying to create threads.  Such errors are rather common due to
     * the need to allocate a native stack in Thread.start, and users
     * will want to perform clean pool shutdown to clean up.  There
     * will likely be enough memory available for the cleanup code to
     * complete without encountering yet another OutOfMemoryError.
     * 我们更进一步,保留池不变量,即使在遇到错误(例如OutOfMemoryError)时也是如此,
     * 这些错误可能在尝试创建线程时抛出。由于需要在Thread.start中分配本机堆栈,
     * 因此此类错误非常常见,用户需要执行清理池关闭以进行清理。
     * 可能会有足够的内存来完成清理代码,而不会遇到另一个OutOfMemoryError。
     */
    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.
     * execute中在饱和或关闭时调用的处理程序。
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     * 空闲线程等待工作的超时时间(纳秒)。
     * 当目前存在线程多于corePoolSize或allowCoreThreadTimeOut(允许核心线程超时)时,线程使用此超时。
     * 否则,他们将永远等待新的工作。
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     * 如果是false(默认值),核心线程即使在空闲时也保持活着。
     * 如果是true,核心线程在等待工作时使用keepAliveTime来进行超时。
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     * 核心线程数是保持存活状态(不允许超时等)的最小工作线程数,
     * 除非设置了allowCoreThreadTimeOut,在这种情况下,最小值为零。
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     * 最大池大小。请注意,实际最大值在内部由CAPACITY限制。
     */
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler
     * 默认被拒绝的执行程序。
     */
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();

}

总结:有以下核心变量

  1. ctl(AtomicInteger):线程池的主要状态变量,封装了运行状态和当前线程数两个值。
    通过CAS操作更改。
  2. workQueue(BlockingQueue<Runnable>):阻塞的任务队列,用于保存未执行的任务,
    也用来hold住空闲的worker线程,通过take、poll方法。
  3. workers(HashSet<Worker>):Worker线程集合,存着所有创建出来的未终止的worker。
    仅在持有mainLock时访问
  4. mainLock(ReentrantLock):各种状态变更、workers集合变更时控制多线程并发导致错误。
  5. termination(Condition): 一个Condition,用于终止时通过termination.signalAll()通知其它线程。
  6. largestPoolSize(ing):用于记录达到的最大池大小。仅在mainLock下访问
  7. completedTaskCount(long):用于记录已完成的任务数量。仅在工作线程终止时更新。仅在mainLock下访问
  8. threadFactory(ThreadFactory): 用于再创建worker时创建新的线程。
  9. handler(RejectedExecutionHandler):用于处理拒绝逻辑。
  10. keepAliveTime(long):空闲线程存活时间。
  11. allowCoreThreadTimeOut(boolean):表示核心线程是否允许超时终止。
  12. corePoolSize:核心线程数的最大值。
  13. maximumPoolSize:线程池的最大值,实际最大值不会超过(1 << 29) - 1
  14. 状态的变化:
    • RUNNING -> SHUTDOWN 调用shutdown时
    • (RUNNING or SHUTDOWN) -> STOP 调用shutdownNow时
    • SHUTDOWN -> TIDYING 调用shutdown后,当队列和池都为空的时候
    • STOP -> TIDYING 调用shutdownNow后,当池为空的时候
    • TIDYING -> TERMINATED 当terminated()钩子方法执行完时

三:内部类Worker

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     * Worker类主要维护运行任务的线程的中断控制状态,以及其他次要的簿记。
     * 此类适时地扩展AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁的过程。
     * 这可以防止旨在唤醒等待任务的工作线程的中断,而不是中断正在运行的任务。
     * 我们实现了一个简单的不可重入互斥锁,而不是使用可重入锁(ReentrantLock),
     * 因为我们不希望工作任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取锁。
     * 此外,为了在线程实际开始运行任务之前抑制中断,我们将锁定状态初始化为负值,
     * 并在启动时清除它(在runWorker中)。
     */
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        // 用来运行此worker的线程,如果工厂失败则为null。
        final Thread thread;

        /** Initial task to run.  Possibly null. */
        //要运行的初始任务。可能为null。
        Runnable firstTask;

        /** Per-thread task counter */
        //每个线程完成的任务计数器
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // inhibit interrupts until runWorker 
            // 在运行worker之前阻止中断,咋阻止呢?
            // 先简单说下就是中断前会先来lock,lock成功才会中断,这时state时-1,所以lock不会成功。
            // start之后,lock会变为0,这时候就可以中断了。
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        //将主运行循环委托给外部runWorker方法
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // 0表示未锁定状态
        // The value 1 represents the locked state.
        // 1表示锁定状态

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                //如果已经启动了,则中断,state>=0,已经不是初始状态-1,表示已经启动
                //该方法是在shutdownNow是被调用的
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
}

总结:

  1. Worker是线程池中一个线程的封装类,一个worker就代表一个线程。
  2. worker是AQS的实现类,主要是用来表示是否正在执行任务,锁定状态表示正在执行任务。
  3. worker的初始状态为-1,表示还未启动,所以lock(执行任务)之前,需要现unlock再lock。
  4. 任务的执行逻辑不再worker中,被委托到了runWorker方法。

四:用于扩展的钩子方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    /* Extension hooks */
    //扩展的钩子

    /**
     * Method invoked prior to executing the given Runnable in the
     * given thread.  This method is invoked by thread {@code t} that
     * will execute task {@code r}, and may be used to re-initialize
     * ThreadLocals, or to perform logging.
     * 在给定线程中执行给定Runnable之前调用的方法。
     * 此方法由执行任务r的线程t调用,可用于重新初始化ThreadLocals或执行日志记录
     *
     * <p>This implementation does nothing, but may be customized in
     * subclasses. Note: To properly nest multiple overridings, subclasses
     * should generally invoke {@code super.beforeExecute} at the end of
     * this method.
     * 此实现不做任何事情,但可以在子类中进行自定义。
     * 注意:为了正确嵌套多个重写,子类通常应该在这个方法的末尾调用super.beforeExecute。
     *
     * @param t the thread that will run task {@code r}
     * @param r the task that will be executed
     */
    protected void beforeExecute(Thread t, Runnable r) { }

    /**
     * Method invoked upon completion of execution of the given Runnable.
     * This method is invoked by the thread that executed the task. If
     * non-null, the Throwable is the uncaught {@code RuntimeException}
     * or {@code Error} that caused execution to terminate abruptly.
     * 在完成给定Runnable的执行时调用的方法。此方法由执行任务的线程调用。
     *
     * <p>This implementation does nothing, but may be customized in
     * subclasses. Note: To properly nest multiple overridings, subclasses
     * should generally invoke {@code super.afterExecute} at the
     * beginning of this method.
     * 此实现不做任何事情,但可以在子类中进行自定义。
     * 注意:为了正确嵌套多个重写,子类通常应该在这个方法的开始处调用super.afterExecute
     *
     *
     * <p><b>Note:</b> When actions are enclosed in tasks (such as
     * {@link FutureTask}) either explicitly or via methods such as
     * {@code submit}, these task objects catch and maintain
     * computational exceptions, and so they do not cause abrupt
     * termination, and the internal exceptions are <em>not</em>
     * passed to this method. If you would like to trap both kinds of
     * failures in this method, you can further probe for such cases,
     * as in this sample subclass that prints either the direct cause
     * or the underlying exception if a task has been aborted:
     * 当动作显式或通过submit等方法包含在任务(如FutureTask)中时,
     * 这些任务对象捕获并维护计算异常,因此它们不会导致突然终止,
     * 并且内部异常不会传递给此方法。
     * 如果您想在这个方法中捕获这两种失败,您可以进一步探测这种情况,
     * 例如在这个示例子类中,如果任务已中止,则打印直接原因或底层异常:
     *
     *  <pre> {@code
     * class ExtendedExecutor extends ThreadPoolExecutor {
     *   // ...
     *   protected void afterExecute(Runnable r, Throwable t) {
     *     super.afterExecute(r, t);
     *     if (t == null && r instanceof Future<?>) {
     *       try {
     *         Object result = ((Future<?>) r).get();
     *       } catch (CancellationException ce) {
     *           t = ce;
     *       } catch (ExecutionException ee) {
     *           t = ee.getCause();
     *       } catch (InterruptedException ie) {
     *           Thread.currentThread().interrupt(); // ignore/reset
     *       }
     *     }
     *     if (t != null)
     *       System.out.println(t);
     *   }
     * }}</pre>
     *
     * @param r the runnable that has completed
     * @param t the exception that caused termination, or null if
     * execution completed normally
     */
    protected void afterExecute(Runnable r, Throwable t) { }

    /**
     * Method invoked when the Executor has terminated.  Default
     * implementation does nothing. Note: To properly nest multiple
     * overridings, subclasses should generally invoke
     * {@code super.terminated} within this method.
     * 当执行器终止时调用的方法。默认实现什么都不做。
     * 注意:为了正确嵌套多个重写,子类通常应该在这个方法中调用super.terminated。
     */
    protected void terminated() { }
}

总结:提供了三个子类可以扩展的钩子方法

  1. beforeExecute 执行任务之前调用
  2. afterExecute 执行任务之后调用
  3. terminated 终止时调用(TIDYING和TERMINATED状态之间调用)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,640评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,254评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,011评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,755评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,774评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,610评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,352评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,257评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,717评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,894评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,021评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,735评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,354评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,936评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,054评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,224评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,974评论 2 355

推荐阅读更多精彩内容