Java多线程技术之五(JUC之执行器框架)

一、Executor框架概述

并发编程的一般方式是将任务拆分为一些子任务,然后使用多个线程来执行这些子任务,最后取得任务结果。Executor框架提供了一种将任务创建、任务执行过程、获取任务结果相分离的方法,让我们无需处理线程使用的细节,使并发编程更加方便。

Executor框架包括3个部分:
  1. 任务: 抽象成Runnable接口和Callable<V>接口。
  2. 任务结果: 抽象成Future<V>接口。
  3. 执行器: 抽象成Executor接口和CompletionService接口。
Executor框架的体系结构
Executor的体系结构

二、任务

Executor框架将任务抽象成Runnable接口和Callable接口,都是用来定义任务然后放进线程执行的。Runnable接口我们已经了解过了,它有只有一个不能返回值也不能抛出检查异常的run()方法。Callable接口与Runnable接口类似,只有一个call()方法,但call()方法返回值并且可能抛出检查异常。

public interface Callable<V> {
    V call() throws Exception;
}

三、任务结果

Executor框架将任务结果抽象成Future接口,Future翻译过来就是未来的意思,表明任务结果在未来才能得到,Future是任务完成后保留任务结果的对象,也可以说Future表示异步计算的结果。

所谓异步调用其实就是另启一个线程来完成调用中的部分计算,使调用者继续执行或返回,而不需要等待计算结果,但调用者仍然需要取线程的计算结果。
··

Future接口方法:
  • V get()
    获取任务结果。任务结果只能在异步计算完成后获取,如果异步计算还没有完成,get()方法会阻塞调用它的线程直到计算完成。

  • V get(long timeout, TimeUnit unit)
    get()的重载方法,限定了超时时间。

  • boolean cancel(boolean mayInterruptIfRunning)
    试图取消对此任务的执行。如果任务已完成或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。

  • boolean isDone()
    检测任务是否完成。可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回true。

  • boolean isCancelled()
    如果在任务正常完成前将其取消,则返回 true。

FutureTask类

FutureTask类实现了Future接口,也实现了Runnable接口,因此可将FutureTask提交给Executor执行。
FutureTask类有两个构造方法:

  • FutureTask(Callable<V> callable)
    创建一个FutureTask,一旦运行就执行给定的Callable。
  • FutureTask(Runnable runnable, V result)
    创建一个FutureTask,一旦运行就执行给定的Runnable,成功完成时get返回给定的结果。
ForkJoinTask类

ForkJoinTask类实现了Future接口,不过它是一个抽象类,ForkJoinTask是为了配合ForkJoinPool执行器实现任务的调度执行的。ForkJoinTask的作用是将任务拆分成小的子任务,并等待子任务的执行结果,然后将子任务的执行结果组合成父任务的结果,此过程会递归的进行下去。ForkJoinTask只适用于没有循环依赖的纯函数计算或孤立对象的操作,否则执行可能会遇到某种形式的死锁,因为任务彼此循环等待。
通常我们不会直接实现ForkJoinTask,而是实现其三个抽象子类,使用的时候,仅仅只需要提供任务的拆分与执行即可。

ForkJoinTask的三个抽象子类
  • RecursiveAction
    用于不返回结果的计算。
  • RecursiveTask
    用于返回结果的计算。
  • CountedCompleter
    用于那些操作完成之后触发钩子函数的操作。
CompletionStage接口

CompletionStage接口是Java8新增的一个接口,大量用在Lambda表达式计算过程中,用来描述多线程任务的时序关系:串行、并行、组合、异常。

CompletionStage接口的方法比较多,但这些方法遵循下列规律。

方法参数类型:

  • Runnable:不使用上一阶段的任务结果并且本阶段不返回值。
  • Function:会对上一阶段的结果做变换。
  • Consumer:消费上一阶段的结果。
  • BiFunction:组合另一个CompletionStage的计算结果做变换。
  • BiConsumer:组合另一个CompletionStage的计算结果做消费。
  • Executor:使用指定的执行器。

方法名中含有:

  • async:异步执行任务。

  • run:方法参数是Runnable类型。

  • apply:方法参数是Function类型。

  • accept:方法参数是Consumer类型。

  • handle:方法参数是Function类型,执行时机同whenComplete。

  • then:上一个阶段正常完成时执行。

  • whenComplete:上一个阶段正常完成或者抛出异常时执行。

  • exceptionally:上一个阶段抛出异常时执行。

  • compose:复合上一个阶段的执行结果和本阶段的执行结果并返回。

  • combine:与另一个CompletionStage成and组合关系,当两个CompletionStage都完成的时候执行。

  • both:与另一个CompletionStage成and组合关系,当两个CompletionStage都正常完成的时候执行。

  • either:与另一个CompletionStage成or组合关系,当任意一个CompletionStage完成的时候执行

  • allOf:当所有的CompletableFuture都执行完后执行。

  • anyOf:当任意一个CompletableFuture执行完后执行。

CompletableFuture类

CompletableFuture类实现了Future接口和CompletionStage接口,提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力。

创建CompletableFuture对象的方法:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

默认情况下CompletableFuture会使用ForkJoinPool.commonPool()线程池

CompletableFuture类常用方法

  • public T get()
    获取计算结果,该方法为阻塞方法会等待计算结果完成。
  • public T join()
    和 get() 方法类似也是主动阻塞线程,等待计算结果。和get() 方法有细微的差别。
  • public boolean complete(T value)
    立即完成计算,并把结果设置为传的值,返回是否设置成功。
  • public boolean completeExceptionally(Throwable ex)
    立即完成计算,并抛出异常。

四、执行器

Executor框架将执行器抽象成Executor接口和CompletionService接口。
Executor接口中只有一个void execute(Runnable command)方法,表示在未来某个时间执行给定的命令。一般不直接使用Executor接口而是使用它的子接口ExecutoreService。CompletionService接口可以更简便地完成异步任务的执行。

ExecutorService接口

ExecutorService接口添加了一些用来管理Executor生命周期和任务生命周期的方法,可以提交、执行和跟踪一个或多个任务。

Executor有运行、关闭、终止三种状态。Executor创建时处于运行状态。调用shutdown()后处于关闭状态,再往Executor提交任务将抛出RejectedExecutionException。所有任务执行完后,Executor处于终止状态。

ExecutorService接口常用方法:

  • Future<?> submit(Runnable task)
    提交一个Runnable任务用于执行,并返回一个表示该任务的结果的Future。

  • <T> Future<T> submit(Callable<T> task)
    提交一个返回值的任务用于执行,返回一个表示任务的结果的Future。

  • <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    执行给定的批量任务,返回任意一个已成功完成的任务结果。

  • <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
    执行给定的批量任务,返回任意一个已成功完成的任务结果,timeout规定了该方法最大的超时等待时间。

  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    执行给定的批量任务,当所有任务完成时,返回保存了任务状态和结果的Future列表。

  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
    执行给定的批量任务,当所有任务完成时,返回保存了任务状态和结果的Future列表,timeout规定了该方法最大的超时等待时间。
    如果该方法执行超时,那么其中一些任务将没有完成,但仍然返回Future列表。

  • void shutdown()
    关闭Executor,允许早前已提交的任务继续执行,但不接受新任务。

  • List<Runnable> shutdownNow()
    试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

ThreadPoolExecutor类

ThreadPoolExecutor类实现了ExecutorService接口,ThreadPoolExecutor是一种线程池执行器。

线程池预先创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。本篇不对线程池作讲解,详细的线程池知识可以参阅 Java线程池详解

ThreadPoolExecutor的构造函数参数比较多,一般不使用构造函数创建ThreadPoolExecutor对象,创建ThreadPoolExecutor对象通常使用Executors执行器工厂类。

ForkJoinPool类

ForkJoinPool类实现了ExecutorService接口,ForkJoinPool也是一种线程池执行器,它负责线程的管理和执行ForkJoinTask。创建ForkJoinPool对象通常使用静态方法ForkJoinPool.commonPool()。

ForkJoinPool的作用是为了实现将一个大任务拆分成多个小任务后,使用fork可以将小任务分发给其他线程同时处理,使用join可以将多个线程处理的结果进行汇总,这个过程递归的进行下去,最终得出最初提交的那个大任务的结果。ForkJoinPool为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是工作窃取机制。 详细的可以参阅 ForkJoinPool入门篇

ScheduledExecutorService接口

ScheduledExecutorService接口继承自ExecutorService接口,它添加了任务延后执行或周期性执行的功能,可以在给定延迟后或者周期性地执行任务。
ScheduledExecutorService接口方法:

  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
    创建并执行在给定延迟后启用的一次性操作。

  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
    创建并执行在给定延迟后启用的ScheduledFuture。

  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    创建和执行一个周期性的操作,操作将在initialDelay后首次执行,然后在initialDelay + period后执行,接着在initialDelay + 2 * period后执行,依此类推。

  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit)
    创建和执行一个周期性的操作,操作将在initialDelay后首次执行,之后的每一次执行终止和下一次执行开始之间都会有delay参数的延迟

scheduleAtFixedRate方法于任务本身执行时间无关,但是当任务本身执行时间超过了period参数指定的时间,则会导致下一次的任务在上一次任务执行完后立即执行(这里不会并发执行任务)。
scheduleWithFixedDelay方法和任务本身执行时间相关,即下一次任务执行的开始时间永远和上一次任务执行的结束时间之间有一个delay的时间间隔。

ScheduledThreadPoolExecutor类

ScheduledThreadPoolExecutor类实现了ScheduledExecutorService接口,创建ScheduledThreadPoolExecutor对象通常使用Executors执行器工厂类。

CompletionService接口

CompletionService维护了一个保存已完成任务的Future队列,Future队列按照任务完成顺序排序。

CompletionService接口方法:

  • Future<V> submit(Callable<V> task)
    提交一个返回值的任务用于执行,返回一个表示任务结果的Future。

  • Future<V> submit(Runnable task, V result)
    提交一个Runnable任务用于执行,并返回一个表示该任务结果的Future。

  • Future<V> poll()
    获取下一个已完成的Future,如果队列为空则返回null。

  • Future<V> poll(long timeout, TimeUnit unit)
    获取下一个已完成的Future,如果队列为空则等待指定的时间,等待超时后返回null。

  • Future<V> take()
    获取下一个Future,如果队列为空则等待。

ExecutorCompletionService类

ExecutorCompletionService类实现了CompletionService接口。

ExecutorCompletionService的构造方法:

  1. ·ExecutorCompletionService(Executor executor)·
  2. ·ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)·
    这两个构造方法都需要传入一个执行器executor,如果不指定completionQueue,那么默认会使用LinkedBlockingQueue。任务的结果Future对象就是加入到completionQueue中。
执行器工厂Executors类

Executors类是JUC提供的执行器工厂类,有一系列创建执行器的工厂方法,它们均为大多数使用场景预定义了设置,使用起来更加方便。

ExecutorService工厂方法

  • Executors.newCachedThreadPool()
    创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
  • Executors.newFixedThreadPool(int corePoolSize)
    创建一个使用固定数量线程的线程池。
  • Executors.newSingleThreadExecutor()
    创建一个使用单个线程的执行器。

ScheduledExecutorService工厂方法

  • Executors.newSingleThreadScheduledExecutor()
    创建一个单线程执行器,它可以在给定延迟后或者周期性地执行任务。
  • Executors.newScheduledThreadPool(int corePoolSize)
    创建一个线程池,它可以在给定延迟后或者周期性地执行任务。

五、ThreadPoolExecutor使用示例

无返回结果的任务

public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 创建一个固定线程数的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 创建实现了Runnable接口的任务对象
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " is running.");
            }
        };
        // 将任务程放入池中进行执行
        executorService.submit(task);       
        // 关闭线程池
        executorService.shutdown();
    }
}

有返回结果的任务

public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 创建一个可根据需要创建新线程的线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 创建实现了Callable接口的任务对象
        Callable<Integer> task = new Callable<Integer>{
            @Override
            public Integer call() throws Exception {
                System.out.println("开始计算任务");
                Thread.sleep(3000);
                return 100;
            }
        };
        // 将任务程放入池中进行执行
        Future<Integer> result = executorService.submit(task);
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        // 关闭线程池
        executorService.shutdown();
    }
}

六、FutureTask使用示例

public class FutureTaskDemo {

    public static void main(String[] args) {
        Callable<Integer> task = new Callable<Integer>{
            @Override
            public Integer call() throws Exception {
                System.out.println("开始计算任务");
                Thread.sleep(3000);
                return 100;
            }
        };
        FutureTask<Integer> futureTask  = new FutureTask<Integer>(task);  
        Thread thread = new Thread(futureTask);  
        thread.start();
        try {
            Integer result = futureTask.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

七、ForkJoinTask使用示例

高级用法参阅 JDK 7 中的 Fork/Join 模式

// RecursiveTask返回值,RecursiveAction不返回值
class SumTask extends RecursiveTask<Long> {
    private Long start;// 起始值
    private Long end;// 结束值
    public static final Long critical = 100000L;// 临界值

    public SumTask(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // 起始值差值
        Long lenth = end - start;        
        // 判断是否超过临界值
        if (lenth <= critical) { // 小于临界值直接求和
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {  // 大于临界值拆分
            Long middle = (end + start) / 2;
            // 拆分成左右两个
            SumTask left = new SumTask(start, middle);
            SumTask right = new SumTask(middle + 1, end);
            // 分别执行左右子任务
            left.fork();
            right.fork();
            // 合并左右子任务结果
            return left.join() + right.join();
        }

    }
}

public class ForkJoinPoolDemo {

    public static void main(String[] args) {
        // 创建一个ForkJoinPool线程池
        ForkJoinPool pool = ForkJoinPool.commonPool();
        // 创建ForkJoinTask任务对象,参数为起始值与结束值
        ForkJoinTask<Long> task = new SumTask(0L, 10000000000L);      
        // 将任务程放入池中进行执行
        Future<Long> result = pool.submit(task);
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }        
        // 关闭线程池
        pool.shutdown();
    }
}

八、ScheduledThreadPool使用示例

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        // 创建一个支持定时及周期性执行任务的线程池
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
        // 创建实现了Runnable接口的任务对象
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " is running.");
            }
        };
        // 将任务程放入池中进行执行
        scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);  
        // 关闭线程池
        scheduler.shutdown();
    }
}

九、CompletionService使用示例

public class CompletionServiceDemo {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionServcie = new ExecutorCompletionService<Integer>(pool);

        try {
            // 将任务批量放入池中执行
            for (int i = 0; i < 10; i++) {
                completionServcie.submit(new Task(i));
            }
            for (int i = 0; i < 10; i++) {
                // 获取已完成的任务的结果
                Future<Integer> future = completionServcie.take();
                System.out.println(future.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        // 关闭线程池
        pool.shutdown();
    }
}

class Task implements Callable<Integer> {
    private int i;

    Task(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        Thread.sleep(new Random().nextInt(5000));
        System.out.println(Thread.currentThread().getName() + "   " + i+ " is running.");
        return i;
    }
}

十、CompletableFuture使用示例

更多示例参阅 通过实例理解 JDK8 的 CompletableFuture

public class CompletableFutureDemo {

    public static void main(String[] args) {
        // 串行执行,不处理上一个任务的结果
        CompletableFuture.runAsync(()->{
            log("任务1");
        }).thenRun(()->{
            log("任务2");
        });

        // 串行执行,结果传递到下一个任务
        CompletableFuture.supplyAsync(() -> {
            log("任务3");
            return "xxx";
        }).thenAccept(result -> {
            log("任务4接收到" + result);
        });

        // 并行执行,合并2个并行任务的结果
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 3)
            .thenCombine(CompletableFuture.supplyAsync(() -> 8), (a, b) -> a + b);
        System.out.println(future.join());

        // 上一个任务完成或者抛出异常后,将结果和异常作为参数传递到下一个任务
        CompletableFuture.supplyAsync(() -> 3).whenComplete((r,throwable)-> System.out.println(r));

    }

    private static void log(String msg){        
        try {
            Thread.sleep(new Random().nextInt(5000));
            System.out.println(Thread.currentThread().getName()+ " => " + msg);
        } catch (Exception e) {
            e.printStackTrace();
        }         
    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343