(三)JUC并发工具

一、线程池

1.1 为什么要使用线程池

  在 Java 诞生之初是没有线程池的概念的,而是先有线程,随着线程数的不断增加,人们发现需要一个专门的类来管理它们,于是才诞生了线程池。没有线程池的时候,每发布一个任务就需要创建一个新的线程,这样在任务少时是没有问题的,但是任务多了,我们就需要循环来创建线程,并且提交任务如下面代码所示:

/** 
* 描述:     for循环新建10个线程 
*/ 
public class TenTask { 
 
    public static void main(String[] args) { 
        for (int i = 0; i < 10; i++) { 
            Thread thread = new Thread(new Task());
            thread.start();
        } 
    } 
 
    static class Task implements Runnable { 
 
        public void run() { 
            System.out.println("Thread Name: " + Thread.currentThread().getName());
        } 
    } 
}

  但是创建线程时会产生系统开销,并且每个线程还会占用一定的内存等资源,更重要的是我们创建如此多的线程也会给稳定性带来危害,因为每个系统中,可创建线程的数量是有一个上限的,不可能无限的创建。线程执行完需要被回收,大量的线程又会给垃圾回收带来压力。但我们的任务确实非常多,如果都在主线程串行执行,那效率也太低了,那应该怎么办呢?于是便诞生了线程池来平衡线程与系统资源之间的关系。

1.2 如何去使用线程池

  线程池就好比一个池塘,池塘里的水是有限且可控的,比如我们选择线程数固定数量的线程池,假设线程池有 5 个线程,但此时的任务大于 5 个,线程池会让余下的任务进行排队,而不是无限制的扩张线程数量,保障资源不会被过度消耗。如代码所示,我们往 5 个线程的线程池中放入 10000 个任务并打印当前线程名字,结果会是怎么样呢?

/** 
* 描述:     用固定线程数的线程池执行10000个任务 
*/ 
public class ThreadPoolDemo { 
 
    public static void main(String[] args) { 
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10000; i++) { 
            service.execute(new Task());
        } 
    System.out.println(Thread.currentThread().getName());
    } 
 
    static class Task implements Runnable { 
 
        public void run() { 
            System.out.println("Thread Name: " + Thread.currentThread().getName());
        } 
    } 
}

  如打印结果所示,打印的线程名始终在 Thread Name: pool-1-thread-1~5 之间变化,并没有超过这个范围,也就证明了线程池不会无限制地扩张线程的数量,始终是这5个线程在工作。

Thread Name: pool-1-thread-1
Thread Name: pool-1-thread-2
Thread Name: pool-1-thread-3
Thread Name: pool-1-thread-4
Thread Name: pool-1-thread-5
Thread Name: pool-1-thread-5
Thread Name: pool-1-thread-5
Thread Name: pool-1-thread-5
Thread Name: pool-1-thread-5
Thread Name: pool-1-thread-2
Thread Name: pool-1-thread-1
Thread Name: pool-1-thread-5
Thread Name: pool-1-thread-3
Thread Name: pool-1-thread-5

1.3 使用线程池的好处

使用线程池比手动创建线程主要有三点好处

  • 第一点,线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度。因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,这就大大减小了线程生命周期的开销。而且线程通常不是等接到任务后再临时创建,而是已经创建好时刻准备执行任务,这样就消除了线程创建所带来的延迟,提升了响应速度,增强了用户体验。

  • 第二点,线程池可以统筹内存和 CPU 的使用,避免资源使用不当。线程池会根据配置和任务数量灵活地控制线程数量,不够的时候就创建,太多的时候就回收,避免线程过多导致内存溢出,或线程太少导致 CPU 资源浪费,达到了一个完美的平衡。

  • 第三点,线程池可以统一管理资源。比如线程池可以统一管理任务队列和线程,可以统一开始或结束任务,比单个线程逐一处理任务要更方便、更易于管理,同时也有利于数据统计,比如我们可以很方便地统计出已经执行过的任务的数量。

1.4 线程池的参数

线程池参数

  我们来看下线程池中各个参数的含义,corePoolSize 是核心线程数,也就是常驻线程池的线程数量,与它对应的是 maxPoolSize,表示线程池最大线程数量,当我们的任务特别多而 corePoolSize 核心线程数无法满足需求的时候,就会将任务存档到workQueue工作队列中,如果工作队列满了,就会向线程池中增加线程,以便应对任务突增的情况,若是线程数大于maxPoolSize了,那么线程池就不会再增加线程数量了而会拒绝。

  第三个参数是 keepAliveTime + 时间单位,当线程池中线程数量多于核心线程数时,而此时又没有任务可做,线程池就会检测线程的 keepAliveTime,如果超过规定的时间,无事可做的线程就会被销毁,以便减少内存的占用和资源消耗。如果后期任务又多了起来,线程池也会根据规则重新创建线程,所以这是一个可伸缩的过程,比较灵活,我们也可以用 setKeepAliveTime 方法动态改变 keepAliveTime 的参数值。

  第四个参数是 ThreadFactory,ThreadFactory 实际上是一个线程工厂,它的作用是生产线程以便执行任务。我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程,我们也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名,通常情况下也没有这个必要,用它默认的就可以了。

   第五个参数是 workQueue(工作队列),工作队列一般有三种:

  • SynchronousQueue(直接交接队列),这个队列没有容量,若是我们的任务比较少,我们可以使用它直接将任务交给线程去处理,那么我们的maxPoolSize就要设置大一点,不然我们没有队列进行缓冲,那么当最大线程容量到了以后,任务就会被拒绝。
  • LinkedBlockingQueue(无界队列) 这个队列没有界限,可以一直放,所以使用无界队列那我们的maxPoolSize这个不管设多大都没有用了,当核心线程数满了以后,任务会被放进队列,并且永远放不满,要是线程处理的速度比任务递交的速度慢的话,那么队列中的任务会一直增加,这样容易造成内存浪费,甚至是oom异常。
  • ArrayBlockingQueue(有界队列) 这个队列可以设置队列大小,一旦超过任务存储数量它的界限,那么就会创建新的线程来执行提交的任务。


    线程池执行顺序

1.5 线程池类

1.5.1 线程池的类继承体系

在这里有两个核新的类: ThreadPoolExector 和 ScheduledThreadPoolExecutor ,后者不仅可以执行某个任务,还可以周期性地执行任务。
向线程池中提交的每个任务,都必须实现 Runnable 接口,通过最上面的 Executor 接口中的execute(Runnable command) 向线程池提交任务。
然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值的任务,也就是 Callable。

1.5.2 ThreadPoolExecutor

基于线程池的实现原理,下面是ThreadPoolExector的核心数据结构:

public class ThreadPoolExecutor extends AbstractExecutorService {
 //...
 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 // 存放任务的阻塞队列
 private final BlockingQueue<Runnable> workQueue;
 // 对线程池内部各种变量进行互斥访问控制
 private final ReentrantLock mainLock = new ReentrantLock();
 // 线程集合
 private final HashSet<Worker> workers = new HashSet<Worker>();
 //...
}

每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
 // ...
 final Thread thread; // Worker封装的线程
 Runnable firstTask; // Worker接收到的第1个任务
 volatile long completedTasks; // Worker执⾏完毕的任务个数
 // ...
}

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。

1.6 线程池的特点

线程池希望保持较少的线程数,并且只有在负载变得很大时才增加线程。

线程池只有在任务队列填满时才创建多于 corePoolSize 的线程,如果使用的是无界队列(例如 LinkedBlockingQueue),那么由于队列不会满,所以线程数不会超过 corePoolSize。

通过设置 corePoolSize 和 maximumPoolSize 为相同的值,就可以创建固定大小的线程池。

通过设置 maximumPoolSize 为很高的值,例如 Integer.MAX_VALUE,就可以允许线程池创建任意多的线程。

1.7 线程池的4 种拒绝策略

  • 第一种拒绝策略是 AbortPolicy,这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略。

  • 第二种拒绝策略是 DiscardPolicy,这种拒绝策略正如它的名字所描述的一样,当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。

  • 第三种拒绝策略是 DiscardOldestPolicy,如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险。

  • 第四种拒绝策略是 CallerRunsPolicy,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处:
    (1)新提交的任务不会被丢弃,这样也就不会造成业务损失。
    (2)由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。

1.8 线程池执行原理:


步骤一:判断当前线程数是否⼤于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进入步骤二。
步骤二:判断队列是否已满。如未满,则放⼊;如已满,则进⼊步骤三。
步骤三:判断当前线程数是否⼤于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进入步骤四。
步骤四:根据拒绝策略,拒绝任务。
总结⼀下:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。
很显然,基于这种流程,如果队列是无界的,将永远没有机会走到步骤三,也即maxPoolSize没有使用,也一定不会走到步骤四。

1.9 线程池的优雅关闭

1.9.1 线程池的生命周期

   线程池的关闭,较之线程的关闭更加复杂。当关闭⼀个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要⼀个平滑的过渡,这就涉及线程池的完整生命周期管理。

在JDK 7以后,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在⼀个字段里面,即ctl变量。



由上面的代码可以看到,ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。

线程池生命周期

   线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行⼀个钩自方法terminated(),进入TERMINATED状态,线程池才真正关闭。
   这里的状态迁移有⼀个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。
   除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

1.9.1 正确关闭线程池的步骤

   关闭线程池的过程为:在调⽤ shutdown()或者shutdownNow()之后,线程池并不会⽴即关闭,接下来需要调用awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:

// executor.shutdownNow();
executor.shutdown();
try {
 boolean flag = true;
 do {
 flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
 } while (flag);
} catch (InterruptedException e) {
 // ...
}

awaitTermination(...)方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态
TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞⼀段时间,之后继续判断。

  public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 判断线程池状态是否 TERMINATED
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

1.9.3 shutdown()与shutdownNow()的区别

  1. shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列。
  2. shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。
shutdown
shutdownNow

1.10 线程池线程复用原理

   我们知道线程池会使用固定数量或可变数量的线程来执行任务,但无论是固定数量或可变数量的线程,其线程数量都远远小于任务数量,面对这种情况线程池可以通过线程复用让同一个线程去执行不同的任务,那么线程复用背后的原理是什么呢?

   线程池可以把线程和任务进行解耦,线程归线程,任务归任务,摆脱了之前通过 Thread 创建线程时的一个线程必须对应一个任务的限制。在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行,其核心原理在于线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run 方法,把 run 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加。

   如流程图所示,当提交任务后,线程池首先会检查当前线程数,如果此时线程数小于核心线程数,比如最开始线程数量为 0,则新建线程并执行任务,随着任务的不断增加,线程数会逐渐增加并达到核心线程数,此时如果仍有任务被不断提交,就会被放入 workQueue 任务队列中,等待核心线程执行完当前任务后重新从 workQueue 中提取正在等待被执行的任务。此时,假设我们的任务特别的多,已经达到了 workQueue 的容量上限,这时线程池就会启动后备力量,也就是 maxPoolSize 最大线程数,线程池会在 corePoolSize 核心线程数的基础上继续创建线程来执行任务,假设任务被不断提交,线程池会持续创建线程直到线程数达到 maxPoolSize 最大线程数,如果依然有任务被提交,这就超过了线程池的最大处理能力,这个时候线程池就会拒绝这些任务,我们可以看到实际上任务进来之后,线程池会逐一判断 corePoolSize 、workQueue 、maxPoolSize ,如果依然不能满足需求,则会拒绝任务。

我们接下来具体看看代码是如何实现的,我们从 execute 方法开始分析,源码如下所示:

public void execute(Runnable command) {
    if (command == null) {
        throw new NullPointerException();
    }
    int c = ctl.get();
    // 如果当前线程数小于corePoolSize,则启动新线程
    if (workerCountOf(c) < corePoolSize) {
        // 添加Worker,并将command设置为Worker线程的第一个任务开始执行。
        if (addWorker(command, true)) {
            return;
        }
        c = ctl.get();
    }
    // 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求。
        if (!isRunning(recheck) && remove(command)) {
            reject(command);
        }
        // 放入队列中后发现没有线程执行任务,开启新线程
        else if (workerCountOf(recheck) == 0) {
            addWorker(null, false);
        }
    }
    // 线程数大于maxPoolSize,并且队列已满,调用拒绝策略
    else if (!addWorker(command, false)) {
        reject(command);
    }
}

// 该方法用于启动新线程。如果第二个参数为true,则使用corePoolSize作为上限,否则使用maxPoolSize作为上限。
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get(); ;) {
        // 如果线程池状态值起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列为空
        // 则添加worker失败,返回false
        if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty())) {
            return false;
        }
        for (;;) {
            // 工作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动线程失败
            if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) {
                return false;
            }
            // 增加worker数量成功,返回到retry语句
            if (compareAndIncrementWorkerCount(c)) {
                break retry;
            }
            c = ctl.get(); // Re-read ctl
            // 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CAS
            if (runStateAtLeast(c, SHUTDOWN)) {
                continue retry;
            }
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // worker数量加1成功后,接着运行:
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 新建worker对象
        w = new Worker(firstTask);
        // 获取线程对象
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 由于线程已经在运行中,无法启动,抛异常
                    if (t.isAlive()) { // precheck that t is startable
                        throw new IllegalThreadStateException();
                    }
                    // 将线程对应的worker加入worker集合
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize) {
                        largestPoolSize = s;
                    }
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 如果添加worker成功,则启动该worker对应的线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果启动新线程失败
        if (!workerStarted) {
            // workCount - 1
            addWorkerFailed(w);
        }
    }
    return workerStarted;
}

可以看出,在 execute 方法中,多次调用 addWorker 方法把任务传入,addWorker 方法会添加并启动一个 Worker,这里的 Worker 可以理解为是对 Thread 的包装,Worker 内部有一个 Thread 对象,它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWorker 方法中,简化后的 runWorker 方法代码如下所示:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // 当前Worker对象封装的线程
    final Thread thread;
    // 线程需要运行的第一个任务。可以是null,如果是null,则线程从队列获取任务
    Runnable firstTask;
    // 记录线程执行完成的任务数量,每个线程一个计数器
    volatile long completedTasks;

    /**
     * 使用给定的第一个任务并利用线程工厂创建Worker实例
     * @param firstTask 线程的第一个任务,如果没有,就设置为null,此时线程会从队列获取任务。
     */
    Worker(Runnable firstTask) {
        setState(-1); // 线程处于阻塞状态,调用runWorker的时候中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // 调用ThreadPoolExecutor的runWorker方法执行线程的运行
    public void run() {
        runWorker(this);
    }

    // Worker的实际运行逻辑
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 中断Worker封装的线程
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执行任务。
            while (task != null || (task = getTask()) != null) {
                // 获取线程锁
                w.lock();
                // 如果线程池停止了,确保线程被中断
                // 如果线程池正在运行,确保线程不被中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted()) {
                    // 获取到任务后,再次检查线程池状态,如果发现线程池已经停止,則给自己发送中断信号
                    wt.interrupt();
                }
                try {
                    // 任务执行之前的钩子方法,实现为空
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        // 任务执行结束后的钩子方法,实现为空
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    // 任务执行完成,将task设置为null
                    task = null;
                    // 线程已完成的任务数加1
                    w.completedTasks++;
                    // 释放线程锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // Worker退出
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看出,实现线程复用的逻辑主要在一个不停循环的 while 循环体中。
通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)。
在这里,我们找到了最终的实现,通过取 Worker 的 firstTask 或者 getTask方法从 workQueue 中取出了新任务,并直接调用 Runnable 的 run 方法来执行任务,也就是如之前所说的,每个线程都始终在一个大循环中,反复获取任务,然后执行任务,从而实现了线程的复用。

1.11 线程池Executors工具类

concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池:

  • FixedThreadPool<固定长度线程池>

   第一种线程池叫作 FixedThreadPool,它的核心线程数和最大线程数是一样的,所以可以把它看作是固定线程数的线程池,它的特点是线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的,就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。而且就算任务队列满了,到了本该继续增加线程数的时候,由于它的最大线程数和核心线程数是一样的,所以也无法再增加新的线程了。

   如图所示,线程池有 t0~t9,10 个线程,它们会不停地执行任务,如果某个线程任务执行完了,就会从任务队列中获取新的任务继续执行,期间线程数量不会增加也不会减少,始终保持在 10 个。

  • CachedThreadPool<缓存线程池>

   第二种线程池是 CachedThreadPool,可以称作可缓存线程池,它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1,这个数非常大,所以基本不可能达到),而当线程闲置时还可以对线程进行回收。也就是说该线程池的线程数量不是固定不变的,当然它也有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

   当我们提交一个任务后,线程池会判断已创建的线程中是否有空闲线程,如果有空闲线程则将任务直接指派给空闲线程,如果没有空闲线程,则新建线程去执行任务,这样就做到了动态地新增线程。

  • ScheduledThreadPool<周期性线程池>

   第三个线程池是 ScheduledThreadPool,它支持定时或周期性执行任务。比如每隔 10 秒钟执行一次任务,而实现这种功能的方法主要有 3 种,如代码所示:

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
 
service.schedule(new Task(), 10, TimeUnit.SECONDS);
 
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
 
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);

那么这 3 种方法有什么区别呢?

   第一种方法 schedule 比较简单,表示延迟指定时间后执行一次任务,如果代码中设置参数为 10 秒,也就是 10 秒后执行一次任务后就结束。

   第二种方法 scheduleAtFixedRate 表示以固定的频率执行任务,它的第二个参数 initialDelay 表示第一次延时时间,第三个参数 period 表示周期,也就是第一次延时后每次延时多长时间执行一次任务。

  第三种方法 scheduleWithFixedDelay 与第二种方法类似,也是周期执行任务,区别在于对周期的定义,之前的 scheduleAtFixedRate 是以任务开始的时间为时间起点开始计时,时间到就开始执行第二次任务,而不管任务需要花多久执行;而 scheduleWithFixedDelay 方法以任务结束的时间为下一次循环的时间起点开始计时。

   举个例子,假设某个同学正在熬夜写代码,需要喝咖啡来提神,假设每次喝咖啡都需要花10分钟的时间,如果此时采用第2种方法 scheduleAtFixedRate,时间间隔设置为 1 小时,那么他将会在每个整点喝一杯咖啡,以下是时间表:

00:00: 开始喝咖啡

00:10: 喝完了

01:00: 开始喝咖啡

01:10: 喝完了

02:00: 开始喝咖啡

02:10: 喝完了

   但是假设他采用第3种方法 scheduleWithFixedDelay,时间间隔同样设置为 1 小时,那么由于每次喝咖啡需要10分钟,而 scheduleWithFixedDelay 是以任务完成的时间为时间起点开始计时的,所以第2次喝咖啡的时间将会在1:10,而不是1:00整,以下是时间表:

00:00: 开始喝咖啡

00:10: 喝完了

01:10: 开始喝咖啡

01:20: 喝完了

02:20: 开始喝咖啡

02:30: 喝完了

  • SingleThreadExecutor

   第四种线程池是 SingleThreadExecutor,它会使用唯一的线程去执行任务,原理和 FixedThreadPool 是一样的,只不过这里线程只有一个,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景,而前几种线程池不一定能够保障任务的执行顺序等于被提交的顺序,因为它们是多线程并行执行的。

  • SingleThreadScheduledExecutor

   第五个线程池是 SingleThreadScheduledExecutor,它实际和第三种 ScheduledThreadPool 线程池非常相似,它只是 ScheduledThreadPool 的一个特例,内部只有一个线程,如源码所示:

new ScheduledThreadPoolExecutor(1)
它只是将 ScheduledThreadPool 的核心线程数设置为了 1。
5种线程池的参数图

总结上述的五种线程池,我们以核心线程数、最大线程数,以及线程存活时间三个维度进行对比,如表格所示。

第一个线程池 FixedThreadPool,它的核心线程数和最大线程数都是由构造函数直接传参的,而且它们的值是相等的,所以最大线程数不会超过核心线程数,也就不需要考虑线程回收的问题,如果没有任务可执行,线程仍会在线程池中存活并等待任务。

第二个线程池 CachedThreadPool 的核心线程数是 0,而它的最大线程数是 Integer 的最大值,线程数一般是达不到这么多的,所以如果任务特别多且耗时的话,CachedThreadPool 就会创建非常多的线程来应对。

  • ForkJoinPool(jdk1.7新增加)

   第六种线程池 ForkJoinPool,这个线程池是在 JDK 7 加入的,它的名字 ForkJoin 也描述了它的执行机制,主要用法和之前的线程池是相同的,也是把任务交给线程池去执行,线程池中也有任务队列来存放任务。但是 ForkJoinPool 线程池和之前的线程池有两点非常大的不同之处。第一点它非常适合执行可以产生子任务的任务。

   如图所示,我们有一个 Task,这个 Task 可以产生三个子任务,三个子任务并行执行完毕后将结果汇总给 Result,比如说主任务需要执行非常繁重的计算任务,我们就可以把计算拆分成三个部分,这三个部分是互不影响相互独立的,这样就可以利用 CPU 的多核优势,并行计算,然后将结果进行汇总。这里面主要涉及两个步骤,第一步是拆分也就是 Fork,第二步是汇总也就是 Join。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容