Java并发学习笔记——第十章 Executor框架

Java并发学习笔记——第十章 Executor框架

Java中可以使用线程来异步执行任务,但若为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。

Java的线程即是工作单元,又是执行机制。Java将工作单元与执行机制分离:

  • 工作单元包括:RunnableCallable
  • 执行机制由Executor框架提供。

Executor框架

Executor的两级调度模型

在JVM线程模型中,Java线程被一对一映射到本地操作系统线程中。Java线程与本地操作系统线程生命周期相同。

在上层,Java多线程通常将应用分解为若干任务,使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;然后操作系统将这些线程映射到本地操作系统线程中。

[图片上传失败...(image-139c95-1640854819997)]


Executor框架的结构与成员组件

Executor的框架结构

主要由三大部分组成:

  • 任务:包括被执行任务需要实现的接口(RunnableCallable)。
  • 任务的执行:包括任务执行机制的核心接口Executor,以及继承自ExecutorExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutorScheduledThreadPoolExecutor)。
  • 异步计算的结果:包括接口Future和实现了Future接口的FutureTask类。

Executor:是一个接口,是Executor框架的基础。

ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务。

ScheduledThreadPoolExecutor:是一个实现类,可以在给定延迟后运行命令,或定期执行命令。比Timer更灵活,功能更强大。

FutureFutureTask类:代表异步计算的结果。

RunnableCallable的实现类都可以被ThreadPoolExecutorScheduledThreadPoolExecutor执行。

使用过程:

  1. 主线程创建实现RunnableCallable接口的任务对象,工具类Executors可以把一个Runnable对象封装为一个Callable对象。
  2. Runnable对象或Callable对象提交给ExecutorService执行。(execute()submit()
  3. 若调用的submit()ExecutorService将返回一个FutureTask,执行FutureTask.get()等待任务执行完成;FutureTask.cancel(boolean) 取消此任务的执行。

Executor框架的成员

主要成员:ThreadPoolExecutorScheduledThreadPoolExecutorFuture接口、Runnable接口、Callable接口、Executors

ThreadPoolExecutor

通常使用工厂类Executors创建ThreadPoolExecutorExecutors可以创建三种类型的ThreadPoolExecutor

  • FixedThreadPool:使用固定线程数的线程池。该线程池适用于为了满足资源管理需求,而需要限制当前线程数量的应用场景,如:负载比较重的服务器。
  • SingleThreadExecutor:使用单个线程的线程池。该线程池需要保证顺序地执行各个任务,且在任意时间点不会有多个线程是活动的的应用场景。
  • CachedThreadPool:根据需要改变的大小无界的线程池。适用于执行很多的短期异步任务的小程序、负载较轻的服务器。
ScheduledThreadPoolExecutor

通常使用工厂类Executors创建ScheduledThreadPoolExecutorScheduledThreadPoolExecutor可以创建两种类型的ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor。适用于为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
  • SingleThreadScheduledExecutor:包含单个线程的ScheduledThreadPoolExecutor。适用于需要单个后台线程执行周期任务。可以保证顺序地执行各个任务的应用场景。
Future

FutureTask类实现了Future接口,该类用来表示异步计算的结果。当使用submit()将任务提交给ThreadPoolExecutorScheduledThreadPoolExecutor时,会返回一个FutureTask对象。

Runnable接口和Callable接口

这两个接口的实现类都可以被ThreadPoolExecutorScheduledThreadPoolExecutor执行。他们的区别在于Runnable不会返回结果,而Callable可以返回结果。

除了手动创建实现Callable接口的对象外,还可以使用工厂类Executors把一个Runnable包装成一个Callable

public static Callable<Object> callable(Runnable task); //假设该方法将把一个Runnable包装成一个Callable返回Callable1
public static <T> Callable<T> callable(Runnable task, T result) //假设该方法把一个Runnable和一个待返回的结果包装成一个Callable返回Callable2

submit(Callable<T> task) 返回一个FutureTask对象后,可以执行FutureTask.get()方法等待任务执行完成,在任务成功完成后该方法将返回该任务的结果。假设submit()提交了上面的Callable1Callable2,则FutureTask1.get()将返回nullFutureTask2.get()将返回result对象。

不同的ThreadPoolExecutor

Executor框架的核心类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成:

  • corePool:核心线程池大小。
  • maximumPool:最大线程池的大小。
  • BlockingQueue:暂时保存任务的工作队列。
  • RejectedExecutionHandler:拒绝策略。

通过Executor框架的工具类Executors,可以创建3种ThreadPoolExecutor

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

FixedThreadPool

一种可重用的固定线程数的线程池。线程数在创建的时候传入,以下为创建FixedThreadPool的源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    //参数分别为corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit,BlockingQueue
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

在该线程池中,当线程池中的线程数大于核心线程数,因为keepAliveTime设置为0,因此多余的空闲线程会被立即终止。

该线程池使用的工作队列为无界队列LinkedBlockingQueue。因此,运行中的FixedThreadPool不会拒绝任务


SingleThreadExecutor

一种使用单个worker线程的线程池。以下为创建SingleThreadExecutor的源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

CachedThreadPool

一种会根据需要创建新线程的线程池。以下为创建CachedThreadPool的源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

由于CachedThreadPoolmaximumPoolSize是无界的,且SynchronousQueue是一个没有容量的阻塞队列,因此若主线程提交任务的速度高于线程池中已被创建出来的线程的处理任务的速度,CachedThreadPool则会不停地创建新线程,因此可能导致创建过多线程耗尽CPU和内存资源。

keepAliveTime为60,意味着线程池中的空闲线程等待新任务最久等60秒,超过60秒的没收到任务的线程将被终结。

由于coolPoolSize为0,因此CachedThreadPool若在空闲情况下则不会使用任何资源。

ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。该组件主要用来在给定的延迟之后运行任务,或定期执行任务。其功能与Timer类似。区别在于前者可以在构造函数中指定多个对应的后台线程数,但后者对应单个后台线程。

ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了一些修改:

  • 使用DelayQueue作为任务队列,该队列存放的是实现了RunnableScheduledFuture接口的ScheduledFutureTask
  • 使用了不同的获取任务方式。
  • 执行周期任务后,做了额外的处理。

ScheduledFutureTask

ScheduledThreadPoolExecutor会将等待执行的任务放入一个DelayQueue中。

ScheduledFutureTask主要包含3个成员变量:

  • time:表示这个任务将被执行的具体时间。
  • sequenceNumber:表示这个任务的序号。
  • period:表示任务的执行周期。

DelayQueue封装了一个优先级队列,该队列会对待调度任务的更小的time做优先级排序,若time相等,则取更小的序号sequenceNumber

ScheduledThreadPoolExecutor中线程执行周期任务的步骤

  1. 线程从DelayQueue获取已到期(time小于当前时间)的ScheduledFutureTask
  2. 线程执行这个任务。
  3. 线程将该任务的time修改为下次将要被执行的时间。
  4. 线程将该任务放回DelayQueue

向DelayQueue获取与添加任务

由于线程池中多线程会多工作队列执行并发操作,因此向工作队列获取、添加任务都需要考虑到并发问题。

当执行DelayQueue.take(),即线程向工作队列获取任务时,需经历3大步骤:

  1. 获取DelayQueueLock

  2. 获取ScheduledFutureTask

    • 若工作队列为空,则当前线程到Lock关联的Condition中等待;
    • 若工作队列中首任务的time比当前时间大,则当前线程到Lock关联的Condition中等待time - nowTime的时间;
    • 成功获取工作队列的首任务,唤醒Condition中所有线程。
  3. 释放Lock

当线程执行任务时达到了任务周期,需要将任务放回DelayQueue,执行DelayQueue.add(),需经历3大步骤:

  1. 获取Lock
  2. 添加ScheduledFutureTask
    • 向工作队列中添加任务;
    • 若添加的任务是头元素,唤醒在Condition中等待的所有线程。
  3. 释放Lock

FutureTask详解

Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

在JDK8中,所有Executor.submit()返回的都是FutureTask对象,但submit()方法本身设置的返回参数是Future

FutureTask不仅实现了Future接口,还实现了Runnable接口,因此FutureTask也可以交给Executor执行或直接由调用线程执行(FutureTask.run())。

FutureTask.get()会阻塞当前线程,直到FutureTask被执行完。

FutureTask的三种状态

  • 未启动。FutureTask.run()未被执行前的状态。
  • 已启动。FutureTask.run()执行中的状态。
  • 已完成。

FutureTask实现机制(内部设计AQS子类)

FutureTask的实现基于AbstractQueuedSynchronizer(简称AQS)。java.util.concurrent中很多可阻塞类都基于AQS实现。

AQS是一个同步框架,它提供以原子性的机制管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。

每一个AQS实现的同步器都包含两种类型的基础操作:

  • 至少一个acquire操作。这个操作将阻塞当前调用线程,直到AQS的状态允许这个线程继续执行。如:FutureTask.get()
  • 至少一个release操作。这个操作改变AQS的状态,允许一个或多个阻塞线程被解除阻塞。如:FutureTask.run()FutureTask.cancel()

注(个人理解):此处的将线程阻塞或解除阻塞应理解为一个出/入线程等待队列的动作,结合下文。

基于“复合优先于继承”的原则,FutureTask内部声明了一个私有的继承与AQS的子类Sync,所有FutureTask公有方法的调用都会被委托给Sync执行。

该内部子类只需要额外实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。

以下举一个多线程执行同一任务的例子:

FutureTask的设计示意图

FuntureTask.get()被执行时:

  1. 线程通过委托调用acquireSharedInterruptibly(),该方法会回调Sync中的tryAcquireShared()确认该FutureTask的状态state是否为完成状态或已取消状态,且runner不为null。(若该任务已被执行完,那runner会被置空吗?)
  2. 若成功则get()立即返回,否则线程将进入线程等待队列等待其他线程执行release操作。
  3. 其他线程执行release操作将唤醒当前线程,当前线程再次执行tryAcquireShared()将返回1,且当前线程离开线程等待队列后将唤醒它的后继线程。

FutureTask.run()被执行时:

  1. 执行构造函数中制定的任务(Callable.call())。
  2. 在任务执行完成后,以原子方式更新同步状态state。若成功更新,设置一个result结果作为Callable.call()的返回值,然后调用AQS.releaseShared()。该方法会回调Sync.tryReleaseShared()执行release操作,唤醒线程等待队列中的第一个线程。
  3. 调用FutureTask.done()

注(个人理解)tryAcquireShared()是否真的是查询runner为空?查询result是否为空似乎更符合逻辑?

关于线程等待队列的级联唤醒:

FutureTask的级联唤醒示意图

当执行FutureTask.get()时,若FutureTask状态不运行FutureTask.get()直接返回,则该线程将进入AQS的线程等待队列中等待(行为如图中线程D,状态为图中线程A、B、C、D)。当某个线程(如线程E)执行了release的两种之一的操作,会唤醒队列中第一个线程。

线程A被唤醒后,首先将自己从队列删除,然后唤醒后继线程B,然后从FutureTask.get()返回。

本章小结

本章的重点在于更好的了解Executor框架,并且更好地使用Executor框架的不同线程池。

其次,FutureTask中对AQS的内部继承并且在这之上做的设计的思想也十分重要。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容