Java线程池

一、池化技术

程序的运行,其本质上,是对系统资源(CPU、内存、磁盘、网络等等)的使用,如何高效的使用这些资源是编程优化演进的一个方向,池化技术就是非常重要的一项优化手段。

池化技术简单点来说,就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能等。在编程领域,比较典型的池化技术有:线程池连接池内存池对象池等。

下面代码可以创建一个线程:

public class Application { 
    public static void main(String[] args) throws Exception { 
        new Thread(new Runnable() { 
            @Override 
            public void run() { 
                System.out.println("线程运行......"); 
            } 
        }).start(); 
    } 
}

实现Runnable接口就可以实现一个简单的线程。可以利用上多核CPU。当一个任务结束,当前线程就接收。但很多时候,我们不止会执行一个任务。如果每次都是如此的创建线程、执行任务,销毁线程,会造成很大的性能开销。

那能否一个线程创建后,执行完一个任务后,又去执行另一个任务,而不是销毁?这就是线程池能解决的问题。这也就是池化技术的思想,通过预先创建好多个线程,放在池中,这样可以在需要使用线程的时候直接获取,避免多次重复创建、销毁带来的开销。

二、线程池的使用

1. 通过Executors创建:

ExecutorService executorService1 = Executors.newFixedThreadPool(3);
        ExecutorService executorService2 = Executors.newSingleThreadExecutor();
        ExecutorService executorService3 = Executors.newCachedThreadPool();
        ExecutorService executorService4 = Executors.newScheduledThreadPool(3);
        executorService1.execute(new Runnable() {
            @Override
            public void run() {
                
            }
        });

通过Executors工厂类中的静态方法可以更简单的进行线程池的创建。在阿里巴巴Java开发手册中,明确说明不允许使用Executors,因为Executors的不正确使用会带来若干问题,如OOM等。

图1. 阿里巴巴Java开发手册中关于线程池的约束

2. 通过ThreadPoolExecutor创建:

   ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
    threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {

        }
    });

我们在实践中开发中,应尽量选择此方式,并进行完善的异常控制机制。

三、线程池的类设计

我们先看一下ThreadPoolExecutor类概要图:


图2. ThreadPoolExecutor类设计
  • ExecutorService是真正的线程池接口。

  • Executor是线程池的顶级接口,只是一个执行线程的工具,只提供一个execute(Runnable command)的方法,真正的线程池接口是ExecutorService。

  • AbstractExecutorService实现了ExecutorService接口,实现了其中大部分的方法(有未实现的方法,所以被声明为Abstract)。

  • ThreadPoolExecutor,继承了AbstractExecutorService,是ExecutorService的默认实现。

另外,Executors也是ThreadPoolExecutor相关类,生产各种类型线程池,上文已作介绍,不推荐使用。

ThreadPoolExecutor类

1. 构造方法

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

在ThreadPoolExecutor类中提供了四个构造方法:

    /**
     * 注释省略.
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    /**
     * 注释省略.
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    /**
     * 注释省略.
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    /**
     * 注释省略.
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

前面三个构造方法缺省部分参数,会使用缺省参数的默认值作为相应实参,调用最后一个构造方法。通过构造方法即可创建线程池。

构造方法中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    TimeUnit.DAYS; // 天
    TimeUnit.HOURS; // 小时
    TimeUnit.MINUTES; // 分钟
    TimeUnit.SECONDS; // 秒
    TimeUnit.MILLISECONDS; // 毫秒
    TimeUnit.MICROSECONDS; // 微妙
    TimeUnit.NANOSECONDS; // 纳秒

  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    • ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
      常用的workQueue类型:
    • SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现「线程数达到了maximumPoolSize而不能新建线程」的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。
    • LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize。
    • ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。
    • DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
  • threadFactory:线程工厂,主要用来创建线程。如果没指定的话,默认会使用Executors.defaultThreadFactory(),一般来说,我们会在这里对线程设置名称、异常处理器等。

  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

图3. 线程池的执行流程图
2. 方法
  • execute()
  • submit()
  • shutdown()
  • shutdownNow()
  • getQueue()
  • getPoolSize()
  • getActiveCount()
  • getCompletedTaskCount()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()是用来关闭线程池的。

getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,还有在下文提到的runwork()、addwork()、processworkerExit() 方法等等。

AbstractExecutorService类

下面我们看一下AbstractExecutorService的各方法:

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {...};
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {...};
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) {...};
    public <T> Future<T> submit(Callable<T> task) {...};
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {...};
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {...};
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {...};
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {...};
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {...};
}

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

ExecutorService类

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService继承了Executor接口。

Executor接口

public interface Executor {
    void execute(Runnable command);
}

上述类与接口之间的关系
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService。

四、线程池的执行过程

AtomicInteger ctl 相关代码

    /**
     * 注释省略.
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl这个 AtomicInteger 的功能很强大,其高3位用于维护线程池运行状态,低29位维护线程池中线程数量

  1. RUNNING:-1 << COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务。
  2. SHUTDOWN:0 << COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务。
  3. STOP:1 << COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务。
  4. TIDYING:2 << COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法。
  5. TERMINATED:3 << COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态。

这些状态均由int型表示,大小关系为 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,这个顺序基本上也是遵循线程池从运行终止这个过程。
runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态。
workerCountOf(int c) 方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量。
ctlOf(int rs, int wc) 方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl。

核心方法源码

execute(Runnable command)方法
   /**
    * Executes the given task sometime in the future.  The task
    * may execute in a new thread or in an existing pooled thread.
    *
    * If the task cannot be submitted for execution, either because this
    * executor has been shutdown or because its capacity has been reached,
    * the task is handled by the current {@code RejectedExecutionHandler}.
    *
    * @param command the task to execute
    *                (待执行的command)
    * @throws RejectedExecutionException at discretion of
    *         {@code RejectedExecutionHandler}, if the task
    *         cannot be accepted for execution
    * @throws NullPointerException if {@code command} is null
    */
    public void execute(Runnable command) {
          // 需要执行的任务command为空,抛出空指针异常
          if (command == null)  // 1
              throw new NullPointerException();

          /*
          * 执行的流程实际上分为三步:
          * 1. 如果运行的线程小于 corePoolSize,以用户给定的 Runable 对象新开一个线程去执行
          *    并且执行addWorker方法会以原子性操作去检查 runState 和 workerCount,以防止当返回false的
          *    时候添加了不应该添加的线程
          * 2. 如果任务能够成功添加到队列当中,我们仍需要对添加的线程进行双重检查,有可能添加的线程在前
          *    一次检查时已经死亡,又或者在进入该方法的时候线程池关闭了。所以我们需要复查状态,并有有必
          *    要的话需要在停止时回滚入列操作,或者在没有线程的时候新开一个线程
          * 3. 如果任务无法入列,那我们需要尝试新增一个线程,如果新建线程失败了,我们就知道线程可能关闭了
          *    或者饱和了,就需要拒绝这个任务
          *
          */

          // 获取线程池的控制状态
          int c = ctl.get();    // 2

          // 通过workCountOf方法算workerCount值,小于corePoolSize
          if (workerCountOf(c) < corePoolSize) {
              // 添加任务到worker集合当中
              if (addWorker(command, true)) 
                  return;    // 成功返回
              
              c = ctl.get();    // 失败的话再次获取线程池的控制状态
          }

          /*
          * 判断线程池是否正处于RUNNING状态
          * 是的话添加Runnable对象到workQueue队列当中
          */
          if (isRunning(c) && workQueue.offer(command)) {    // 3
              int recheck = ctl.get();    // 再次获取线程池的状态

              // 再次检查状态
              // 线程池不处于RUNNING状态,将任务从workQueue队列中移除
              if (! isRunning(recheck) && remove(command))
                  // 拒绝任务
                  reject(command);
              // workerCount等于0
              else if (workerCountOf(recheck) == 0)    // 4
                  // 添加worker
                  addWorker(null, false);
          }
          // 加入阻塞队列失败,则尝试以线程池最大线程数新开线程去执行该任务
          else if (!addWorker(command, false))     // 5 
              // 执行失败则拒绝任务
              reject(command);
      }

我们来说一下上面这个代码的流程:
1. 首先判断任务是否为空,空则抛出空指针异常
2. 不为空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中执行。若成功,则返回;失败的话再接着获取线程池控制状态,因为只有状态变了才会失败,所以重新获取
3. 判断线程池是否处于运行状态,是的话则添加command到阻塞队列,加入时也会再次获取状态并且检测状态是否不处于运行状态,不处于的话则将command从阻塞队列移除,并且拒绝任务
4. 如果线程池里没有了线程,则创建新的线程去执行获取阻塞队列的任务执行
5. 如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃

图4. execute方法执行流程图

addWorker(Runnable firstTask, boolean core)方法
     private boolean addWorker(Runnable firstTask, boolean core) {
          //外部循环标记
          retry:
          //外层死循环
          for (;;) {
              //获取线程池控制状态
              int c = ctl.get();
              //获取runState
              int rs = runStateOf(c);
      ​
              // Check if queue empty only if necessary.

              /**
              *1.如果线程池runState至少已经是SHUTDOWN
              *2\. 有一个是false则addWorker失败,看false的情况
              * - runState==SHUTDOWN,即状态已经大于SHUTDOWN了
              * - firstTask为null,即传进来的任务为空,结合上面就是runState是SHUTDOWN,但是
              *  firstTask不为空,代表线程池已经关闭了还在传任务进来
              * - 队列为空,既然任务已经为空,队列为空,就不需要往线程池添加任务了
              */
              if (rs >= SHUTDOWN &&  //runState大于等于SHUTDOWN,初始位RUNNING
                  ! (rs == SHUTDOWN &&  //runState等于SHUTDOWN
                     firstTask == null &&  //firstTask为null
                     ! workQueue.isEmpty()))  //workQueue队列不为空
                  return false;
      ​
              //内层死循环
              for (;;) {
                  //获取线程池的workerCount数量
                  int wc = workerCountOf(c);
                  //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize
                  //返回false
                  if (wc >= CAPACITY ||
                      wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  //通过CAS操作,使workerCount数量+1,成功则跳出循环,回到retry标记
                  if (compareAndIncrementWorkerCount(c))
                      break retry;

                  //CAS操作失败,再次获取线程池的控制状态
                  c = ctl.get();  // Re-read ctl
                  //如果当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
                  //CAS由于更改workerCount而失败,继续内层循环
              }
          }
      ​
          //通过以上循环,能执行到这是workerCount成功+1了

          //worker开始标记
          boolean workerStarted = false;
          //worker添加标记
          boolean workerAdded = false;
          //初始化worker为null
          Worker w = null;
          try {
              //初始化一个当前Runnable对象的worker对象
              w = new Worker(firstTask);
              //获取该worker对应的线程
              final Thread t = w.thread;
              //如果线程不为null
              if (t != null) {
                  //初始线程池的锁
                  final ReentrantLock mainLock = this.mainLock;
                  //获取锁
                  mainLock.lock();
                  try {
                      // Recheck while holding lock.
                      // Back out on ThreadFactory failure or if
                      // shut down before lock acquired.
                      //获取锁后再次检查,获取线程池runState
                      int rs = runStateOf(ctl.get());
      ​
                      //当runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask为null
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {

                          //线程已存活
                          if (t.isAlive()) // precheck that t is startable
                              //线程未启动就存活,抛出IllegalThreadStateException异常
                              throw new IllegalThreadStateException();

                          //将worker对象添加到workers集合当中
                          workers.add(w);
                          //获取workers集合的大小
                          int s = workers.size();
                          //如果大小超过largestPoolSize
                          if (s > largestPoolSize)
                              //重新设置largestPoolSize
                              largestPoolSize = s;
                          //标记worker已经被添加
                          workerAdded = true;
                      }
                  } finally {
                      //释放锁
                      mainLock.unlock();
                  }
                  //如果worker添加成功
                  if (workerAdded) {
                      //启动线程
                      t.start();
                      //标记worker已经启动
                      workerStarted = true;
                  }
              }
          } finally {
              //如果worker没有启动成功
              if (! workerStarted)
                  //workerCount-1的操作
                  addWorkerFailed(w);
          }
          //返回worker是否启动的标记
          return workerStarted;
     }

简单说一下代码流程:
1. 获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步
2. 死循环,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,没有的话则对workerCount+1操作
3. 如果不符合上述判断或+1操作失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,否则继续内层循环
4. +1操作成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例,添加成功就启动该实例

addWorker方法有4种传参的方式(在execute方法中使用了前3种):
addWorker(command, true)
addWorker(command, false)
addWorker(null, false)
addWorker(null, true)
第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false。
第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false。
第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务。
第四个:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行。

图5. addWorker方法执行流程图
addWorkerFailed(Worker w)
     private void addWorkerFailed(Worker w) {
          //重入锁
          final ReentrantLock mainLock = this.mainLock;
          //获取锁
          mainLock.lock();
          try {
              //如果worker不为null
              if (w != null)
                  //workers移除worker
                  workers.remove(w);
              //通过CAS操作,workerCount-1
              decrementWorkerCount();
              tryTerminate();
          } finally {
              //释放锁
              mainLock.unlock();
          }
     }

addWorker方法添加worker失败,并且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,并且workerCount做-1操作。

tryTerminate()
     final void tryTerminate() {
          //死循环
          for (;;) {
              //获取线程池控制状态
              int c = ctl.get();

              /*
              *线程池处于RUNNING状态
              *线程池状态最小大于TIDYING
              *线程池==SHUTDOWN并且workQUeue不为空
              *直接return,不能终止
              */
              if (isRunning(c) ||
                  runStateAtLeast(c, TIDYING) ||
                  (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                  return;

              //如果workerCount不为0
              if (workerCountOf(c) != 0) { // Eligible to terminate
                  interruptIdleWorkers(ONLY_ONE);
                  return;
              }
      ​
              //获取线程池的锁
              final ReentrantLock mainLock = this.mainLock;
              //获取锁
              mainLock.lock();
              try {
                  //通过CAS操作,设置线程池状态为TIDYING
                  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                      try {
                          terminated();
                      } finally {
                          //设置线程池的状态为TERMINATED
                          ctl.set(ctlOf(TERMINATED, 0));
                          //发送释放信号给在termination条件上等待的线程
                          termination.signalAll();
                      }
                      return;
                  }
              } finally {
                  //释放锁
                  mainLock.unlock();
              }
              // else retry on failed CAS
          }
     }

当对线程池执行了非正常成功逻辑的操作时,都会需要执行tryTerminate尝试终止线程池。

runWorker(Worker w)
     final void runWorker(Worker w) {
        //获取当前线程
        Thread wt = Thread.currentThread();
        //获取worker里的任务
        Runnable task = w.firstTask;
        //将worker实例的任务赋值为null
        w.firstTask = null;

        /*
         *unlock方法会调用AQS的release方法
         *release方法会调用具体实现类也就是Worker的tryRelease方法
         *也就是将AQS状态置为0,允许中断
         */
        w.unlock(); // allow interrupts
        //是否突然完成
        boolean completedAbruptly = true;
        try {
            //worker实例的task不为空,或者通过getTask获取的不为空
            while (task != null || (task = getTask()) != null) {
                //获取锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                /*
                 *获取线程池的控制状态,至少要大于STOP状态
                 *如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP
                 *如果上述满足,检查该对象是否处于中断状态,不清除中断标记
                 */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    //中断改对象
                    wt.interrupt();
                try {
                    //执行前的方法,由子类具体实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        //执行完后调用的方法,也是由子类具体实现
                        afterExecute(task, thrown);
                    }
                } finally {//执行完后
                    //task设置为null
                    task = null;
                    //已完成任务数+1
                    w.completedTasks++;
                    //释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理并退出当前worker
            processWorkerExit(w, completedAbruptly);
        }
    }

该方法的作用就是去执行任务。执行步骤如下:
1. 首先在方法一进来,就执行了w.unlock(),这是为了将AQS的状态改为0,因为只有getState() >= 0的时候,线程才可以被中断
2. 判断firstTask是否为空,为空则通过getTask()获取任务,不为空接着往下执行
3. 判断是否符合中断状态,符合的话设置中断标记
4. 执行beforeExecute(),task.run(),afterExecute()方法
5. 任何一个出异常都会导致任务执行的终止;进入processWorkerExit来退出任务
6. 正常执行的话会接着回到步骤2

图6. runWorker方法执行流程图

getTask()
     private Runnable getTask() {
          //标志是否获取任务超时
          boolean timedOut = false; // Did the last poll() time out?
      ​
          //死循环
          for (;;) {
              //获取线程池的控制状态
              int c = ctl.get();
              //获取线程池的runState
              int rs = runStateOf(c);
      ​
              // Check if queue empty only if necessary.
              /*
              *判断线程池的状态,出现以下两种情况
              *1、runState大于等于SHUTDOWN状态
              *2、runState大于等于STOP或者阻塞队列为空
              *将会通过CAS操作,进行workerCount-1并返回null
              */
              if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                  decrementWorkerCount();
                  return null;
              }
      ​
              //获取线程池的workerCount
              int wc = workerCountOf(c);
      ​
              // Are workers subject to culling?

              /*
              *allowCoreThreadTimeOut:是否允许core Thread超时,默认false
              *workerCount是否大于核心核心线程池
              */
              boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      ​
              /*
              *1、wc大于maximumPoolSize或者已超时
              *2、队列不为空时保证至少有一个任务
              */
              if ((wc > maximumPoolSize || (timed && timedOut))
                  && (wc > 1 || workQueue.isEmpty())) {
                  /*
                  *通过CAS操作,workerCount-1
                  *能进行-1操作,证明wc大于maximumPoolSize或者已经超时
                  */
                  if (compareAndDecrementWorkerCount(c))
                      //-1操作成功,返回null
                      return null;
                  //-1操作失败,继续循环
                  continue;
              }
      ​
              try {
                  /*
                  *wc大于核心线程池
                  *执行poll方法
                  *小于核心线程池
                  *执行take方法
                  */
                  Runnable r = timed ?
                      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                  workQueue.take();
                  //判断任务不为空返回任务
                  if (r != null)
                      return r;
                  //获取一段时间没有获取到,获取超时
                  timedOut = true;
              } catch (InterruptedException retry) {
                  timedOut = false;
              }
          }
      }

在上面的runWorker方法当中我们可以看出,当firstTask为空的时候,会通过该方法来接着获取任务去执行,执行逻辑顺序如下:
1. 获取线程池控制状态和runState,判断线程池是否已经关闭或者正在关闭,是的话则workerCount-1操作返回null
2. 获取workerCount判断是否大于核心线程池
3. 判断workerCount是否大于最大线程池数目或者已经超时,是的话workerCount-1,-1成功则返回null,不成功则回到步骤1重新继续
4. 判断workerCount是否大于核心线程池,大于则用poll方法从队列获取任务,否则用take方法从队列获取任务
5. 判断任务是否为空,不为空则返回获取的任务,否则回到步骤1重新继续

图7. getTask方法执行流程图

processWorkerExit(Worker w, boolean completedAbruptly)
     private void processWorkerExit(Worker w, boolean completedAbruptly) {
          /*
          *completedAbruptly:在runWorker出现,代表是否突然完成的意思
          *也就是在执行任务过程当中出现异常,就会突然完成,传true
          *
          *如果是突然完成,需要通过CAS操作,workerCount-1
          *不是突然完成,则不需要-1,因为getTask方法当中已经-1
          *
          *下面的代码注释貌似与代码意思相反了
          */
          if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
              decrementWorkerCount();
      ​
          //生成重入锁
          final ReentrantLock mainLock = this.mainLock;
          //获取锁
          mainLock.lock();
          try {
              //线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数
              completedTaskCount += w.completedTasks;
              //从HashSet<Worker>中移除
              workers.remove(w);
          } finally {
              //释放锁
              mainLock.unlock();
          }
      ​
          //因为上述操作是释放任务或线程,所以会判断线程池状态,尝试终止线程池
          tryTerminate();
      ​
          //获取线程池的控制状态
          int c = ctl.get();

          //判断runState是否小鱼STOP,即是RUNNING或者SHUTDOWN
          //如果是RUNNING或者SHUTDOWN,代表没有成功终止线程池
          if (runStateLessThan(c, STOP)) {
              /*
              *是否突然完成
              *如若不是,代表已经没有任务可获取完成,因为getTask当中是while循环
              */
              if (!completedAbruptly) {
                  /*
                  *allowCoreThreadTimeOut:是否允许core thread超时,默认false
                  *min-默认是corePoolSize
                  */
                  int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                  //允许core thread超时并且队列不为空
                  //min为0,即允许core thread超时,这样就不需要维护核心核心线程池了
                  //如果workQueue不为空,则至少保持一个线程存活
                  if (min == 0 && ! workQueue.isEmpty())
                      min = 1;
                  //如果workerCount大于min,则表示满足所需,可以直接返回
                  if (workerCountOf(c) >= min)
                      return; // replacement not needed
              }
              //如果是突然完成,添加一个空任务的worker线程--这里我也不太理解
              addWorker(null, false);
          }
      }

明显的,在执行任务当中,会去获取任务进行执行,那既然是执行任务,肯定就会有执行完或者出现异常中断执行的时候,那这时候肯定也会有相对应的操作。代码逻辑如下:

  1. 首先判断线程是否突然终止,如果是突然终止,通过CAS,workerCount-1
  2. 统计线程池完成任务数,并将worker从workers当中移除
  3. 判断线程池状态,尝试终止线程池
  4. 线程池没有成功终止
    • 判断是否突然完成任务,不是则进行下一步,是则进行第三步
    • 如允许核心线程超时,队列不为空,则至少保证一个线程存活
    • 添加一个空任务的worker线程

Worker内部类

我们在上面已经算是挺详细地讲了线程池执行任务execute的执行流程和一些细节,在上面频繁地出现了一个字眼,那就是worker实例,那么这个worker究竟是什么呢?

private final class Worker
          extends AbstractQueuedSynchronizer
          implements Runnable
      {
          /**
           * This class will never be serialized, but we provide a
           * serialVersionUID to suppress a javac warning.
           */
          private static final long serialVersionUID = 6138294804551838833L;
  ​
          /** 工作线程,如果工厂失败则为空. */
          final Thread thread;
          /** 初始化任务,有可能为空 */
          Runnable firstTask;
          /** 已完成的任务计数 */
          volatile long completedTasks;
  ​
          /**
           * 创建并初始化第一个任务,使用线程工厂来创建线程
           * 初始化有3步
           *1、设置AQS的同步状态为-1,表示该对象需要被唤醒
           *2、初始化第一个任务
           *3、调用ThreadFactory来使自身创建一个线程,并赋值给worker的成员变量thread
           */
          Worker(Runnable firstTask) {
              setState(-1); // inhibit interrupts until runWorker
              this.firstTask = firstTask;
              this.thread = getThreadFactory().newThread(this);
          }
  ​
    //重写Runnable的run方法
          /** Delegates main run loop to outer runWorker  */
          public void run() {
              //调用ThreadPoolExecutor的runWorker方法
              runWorker(this);
          }
  ​
          // Lock methods
          //
          // The value 0 represents the unlocked state.
          // The value 1 represents the locked state.
    //代表是否独占锁,0-非独占  1-独占
          protected boolean isHeldExclusively() {
              return getState() != 0;
          }

    //重写AQS的tryAcquire方法尝试获取锁
          protected boolean tryAcquire(int unused) {
           //尝试将AQS的同步状态从0改为1
              if (compareAndSetState(0, 1)) {
               //如果改变成,则将当前独占模式的线程设置为当前线程并返回true
                  setExclusiveOwnerThread(Thread.currentThread());
                  return true;
              }
              //否则返回false
              return false;
          }
  ​
    //重写AQS的tryRelease尝试释放锁
          protected boolean tryRelease(int unused) {
           //设置当前独占模式的线程为null
              setExclusiveOwnerThread(null);
              //设置AQS同步状态为0
              setState(0);
              //返回true
              return true;
          }
  ​
    //获取锁
          public void lock()        { acquire(1); }
          //尝试获取锁
          public boolean tryLock()  { return tryAcquire(1); }
          //释放锁
          public void unlock()      { release(1); }
          //是否被独占
          public boolean isLocked() { return isHeldExclusively(); }
  ​
          void interruptIfStarted() {
              Thread t;
              if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                  try {
                      t.interrupt();
                  } catch (SecurityException ignore) {
                  }
              }
          }
  }

我们可以看到Worker内部类继承AQS同步器并且实现了Runnable接口,所以Worker很明显就是一个可执行任务并且又可以控制中断、起到锁效果的类。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容