Java并发(十):Executor框架

一. 简介

1. Executor框架的两级调度模型

  1. 应用程序通过Executor框架控制上层的调度;
  2. 下层的调度由操作系统内核控制;

Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。

2. Executor框架的结构

  • 任务。实现Runnable接口或Callable接口的任务;

  • 任务的执行。核心接口Executor、继承Executor接口的ExecutorService接口、实现了ExecutorService接口的两个关键类ThreadPoolExecutor、ScheduledThreadPoolExecutor;

  • 异步计算的结果。Future接口、实现Future接口的FutureTask类。

  1. 主线程创建任务(实现Rnnable/Callable接口)
  2. 把任务提交给线程池执行
  3. 异步计算的结果(如果有的话)保存在Future中,主线程通过get()方法等待任务执行完成并获取结果,也可以通过FutureTask.cancel()取消此任务的执行。
Executor框架的成员
  1. ThreadPoolExecutor

ThreadPoolExecutor通常使用工厂类Executors来创建:

  • Executors.newFixedThreadPool:固定线程数
  • Executors.newSingleThreadExecutor:单线程数
  • Executors.newCachedThreadPool:0 - Integer.MAX_VALUE个线程数
  1. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工厂类Executors来创建:

  • Executors.newScheduledThreadPool:固定线程数
  • Executors.newSingleThreadScheduledExecutor:单线程数
  1. Future接口

Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。

  1. Runnable接口和Callable接口
  • Runnable接口不会返回结果
  • Callable接口可以返回结果

二. ThreadPoolExecutor详解

1. FixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool使用LinkedBlockingQueue作为任务队列(队列容量为Integer.MAX_VALUE,可被看成是无界队列),使用无界队列对线程池的影响:

  • 由于线程池中线程数达到corePoolSize后,新任务会放在无界队列中,因此maxPoolSize无效,同时keepAliveTime无效;
  • 运行中的FixedThreadPool不会拒绝任务。

2. SingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

类似于FixedThreadPool,但是corePoolSize和maxPoolSize均被设置为1。

3. CachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool的corePoolSize被设置为0,maxPoolSize被设置为Integer.MAX_VALUE,因此线程池的线程数可看成是无界的。

CachedThreadPool的任务队列使用SynchronousQueue,该队列没有容量,不存储任何任务。

  1. 如果当前maxPool中有空闲的线程正在执行SynchronousQueue.poll()操作(这个poll操作会让该空闲线程保持60s,如果超时则销毁该线程),同时主线程提交了任务,即SynchronousQueue.offer(Runnable task),此时两者配对成功,任务被提交给该空闲线程执行;
  2. 如果maxPool为空,或者maxPool中没有空闲线程,则没有线程执行SynchronousQueue.poll()操作,此时CachedThreadPool创建一个新线程执行任务。

注:
SynchronousQueue是一个没有容量的阻塞队列,插入操作和移除操作必须配对成功才会执行,插入的内容被移除操作直接从队列中取走。

三. ScheduledThreadPoolExecutor详解

主要用来在给定的延迟之后运行任务,或者定期执行任务。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  1. ScheduledThreadPoolExecutor使用DelayedWorkQueue作为任务队列

  2. 待调度的任务被封装成ScheduledFutureTask,主要包含如下成员变量:

  • long型成员变量time,表示这个任务将要被执行的具体时间;
  • long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号;
  • long型成员变量period,表示任务执行的间隔周期。

time小的任务被排在前面,如果time相同,sequenceNumber小的排在前面。

  1. 向ScheduledThreadPoolExecutor提交任务时,会在DelayedWorkQueue中添加ScheduledFutureTask

  2. 线程池中的线程从DelayedWorkQueue中获取满足要求的ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor中执行周期任务
  1. 线程从DelayedWorkQueue中获取已到期的ScheduledFutureTask(time大于等于当前时间)
  2. 执行这个ScheduledFutureTask
  3. 修改ScheduledFutureTask的time变量为下次将要被执行的时间
  4. 线程将这个修改time后的ScheduledFutureTask放回DelayedWorkQueue中。

四. FutureTask详解

FutureTask实现了Future接口,可以用来表示异步计算的结果。

FutureTask实现了Runnable接口,可提交给线程执行。

FutureTask的三种状态:未启动、已启动、已完成。

FutureTask的实现:
基于AbstractQueuedSynchronizer(AQS),类似于ReentrantLock,利用AQS来原子性管理同步状态、阻塞和唤醒线程、维护被阻塞线程的队列。

Future接口

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

线程实现Callable<V>接口,并使用submit()方法提交到线程池,可利用Future<V>(超时)获取执行结果。

这里的超时时间是每个任务的超时时间。

对于超时的任务,可以利用Future<V>取消该任务,而不影响其它任务的正常执行。

@SpringBootApplication
public class TestApplication {

    static AtomicInteger atomicInteger = new AtomicInteger(0);

    static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public static void main(String[] args) {
        
        ConfigurableApplicationContext context = SpringApplication.run(TestApplication.class, args);

        for (int i = 0; i < 100; i++) {
            CallableTask task = new CallableTask(atomicInteger);
            Future<String> future = executorService.submit(task);
            
            try {
                String result = future.get(5, TimeUnit.SECONDS);
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
                // 在超时异常中判断任务是否执行完成,未完成则取消该任务
                if(!future.isDone()){
                    future.cancel(true);
                }
            }
        }
    }
}



public class CallableTask implements Callable<String> {

    AtomicInteger atomicInteger;

    public CallableTask() {
        super();
    }

    public CallableTask(AtomicInteger atomicInteger) {
        super();
        this.atomicInteger = atomicInteger;
    }

    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(new Random().nextInt(10));
        int result = atomicInteger.addAndGet(1);
        return "result: " + result;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容