CompletableFuture

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.*;

@Slf4j
public class CompletableFutureTest {
    ExecutorService EXECUTOR = new ThreadPoolExecutor(5, 20, 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * runAsync 无返回值
     * 如果在参数中包含指定executor的话,任务在这个executor执行;
     * 如果没有指定executor,在ForkJoinPool.commonPool()线程池中运行
     */
    @Test
    public void runAsyncMethod() {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("current thread:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("result:" + i);
        }, EXECUTOR);
    }

    /**
     * supplyAsync有返回值
     * 如果在参数中包含指定executor的话,任务在这个executor执行;
     * 如果没有指定executor,在ForkJoinPool.commonPool()线程池中运行
     * whenComplete 能感知异常,能感知结果,但没方法给返回值
     * exceptionally能感知异常,不能感知结果,能给返回值。相当于,如果出现异常就返回这个值
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void supplyAsyncMethod() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("current thread:" + Thread.currentThread().getId());
            //int i = 10 / 2;
            int i = 10 / 0;
            System.out.println("result:" + i);
            return i;
        }, EXECUTOR).whenComplete((res, exception) -> {
            System.out.println("async task end,result:" + res + ";exception:" + exception);
        }).exceptionally(throwable -> {
            return 10;
        });
        System.out.println("result:" + future.get());
    }

    /**
     * supplyAsync有返回值
     * 如果在参数中包含指定executor的话,任务在这个executor执行;
     * 如果没有指定executor,在ForkJoinPool.commonPool()线程池中运行
     * handle能拿到返回结果,也能的到异常信息,也能修改返回值
     * 带有async的方法,如果指定线程池,就使用指定的线程池,如果没有指定线程池,就使用默认的线程池
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void supplyAsyncMethodByHandle() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("current thread:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("result:" + i);
            return i;
        }, EXECUTOR).handle((res, exception) -> {
            if (null != exception) {
                return 0;
            } else {
                return res * 2;
            }
        });
        System.out.println("result:" + future.get());
    }

    /**
     * 任务线性化
     * thenRunAsync:不能接收上一次的执行结果,也没返回值
     *
     * @throws InterruptedException
     */
    @Test
    public void taskSerial() throws InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("current thread:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("result:" + i);
            return i;
        }, EXECUTOR).thenRunAsync(() -> {
            System.out.print("task 2 start....");
        }, EXECUTOR);
    }

    /**
     * 任务线性化
     * thenAcceptAsync:能接收上一次的执行结果,但没返回值
     */
    @Test
    public void taskSerialAccept() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("current thread:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("result:" + i);
            return i;
        }, EXECUTOR).thenAcceptAsync(res -> {
            System.out.print("task 2 start...." + res);
        }, EXECUTOR);
    }

    /**
     * 任务线性化
     * 前面两个任务都完成,才执行任务3
     * thenApplyAsync:能接收上一次的执行结果,有可以有返回值
     */
    @Test
    public void taskSerialApply() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("current thread:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("result:" + i);
            return i;
        }, EXECUTOR).thenApplyAsync(res -> {
            System.out.println("task 2 start...." + res);
            return "hello:" + res + "\n";
        }, EXECUTOR);
        System.out.print("result:" + future.get());
    }

    /**
     * runAfterBothAsync
     * 任务1和任务2都完成后,在执行任务3,不感知任务1,2的结果,也没有返回值
     *
     * @throws InterruptedException
     */
    @Test
    public void taskBianPai() throws InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("task 1 end...");
            return i;
        }, EXECUTOR);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 2:" + Thread.currentThread().getId());
            try {
                Thread.sleep(3000);
                System.out.println("task 2 end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, EXECUTOR);
        future1.runAfterBothAsync(future2, () -> {
            System.out.println("task 3 start...");
        }, EXECUTOR);
    }

    /**
     * thenAcceptBothAsync
     * 任务1和任务2都完成后,在执行任务3,感知任务1,2的结果,也没有返回值
     */
    @Test
    public void taskBianPaiAccept() {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("task 1 end...");
            return i;
        }, EXECUTOR);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 2:" + Thread.currentThread().getId());
            System.out.println("task 2 end...");
            try {
                Thread.sleep(3000);
                System.out.println("task 2 end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, EXECUTOR);
        CompletableFuture<Void> completableFuture = future1.thenAcceptBothAsync(future2, (f1, f2) -> {
            System.out.println(f1);
            System.out.println(f2);
            System.out.println("task 3 start...,f1:" + f1 + ",f2:" + f2);
        }, EXECUTOR);
    }

    /**
     * thenCombineAsync
     * 任务1和任务2都完成后,在执行任务3,感知任务1,2的结果,可以自己带返回值
     */
    @Test
    public void taskBianPaiCombine() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("task 1 end...");
            return i;
        }, EXECUTOR);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 2:" + Thread.currentThread().getId());
            System.out.println("task 2 end...");
            try {
                Thread.sleep(3000);
                System.out.println("task 2 end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, EXECUTOR);
        CompletableFuture<String> completableFuture = future1.thenCombineAsync(future2, (f1, f2) -> {
            return "task 3 start...,f1:" + f1 + ",f2:" + f2;
        }, EXECUTOR);
        System.out.println(completableFuture.get());
    }

    /**
     * 三任务组合,前两个任务只要有一个完成,就执行任务3
     */

    /**
     * thenCombineAsync
     * 两个任务只要有一个完成,就执行任务3,不感知结果,自己没返回值
     */
    @Test
    public void taskRun() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("task 1 end...");
            return i;
        }, EXECUTOR);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 2:" + Thread.currentThread().getId());
            System.out.println("task 2 end...");
            try {
                Thread.sleep(3000);
                System.out.println("task 2 end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, EXECUTOR);
        CompletableFuture<Void> completableFuture = future1.runAfterEitherAsync(future2, () -> {
            System.out.println("task 3 start.....");
        }, EXECUTOR);
    }

    /**
     * acceptEitherAsync
     * 两个任务只要有一个完成,就执行任务3,感知结果,自己没返回值
     */
    @Test
    public void taskAccept() {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("task 1 end...");
            return String.valueOf(i);
        }, EXECUTOR);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 2:" + Thread.currentThread().getId());
            System.out.println("task 2 end...");
            try {
                Thread.sleep(3000);
                System.out.println("task 2 end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, EXECUTOR);
        CompletableFuture<Void> completableFuture = future1.acceptEitherAsync(future2, (res) -> {
            System.out.println("task 3 start....." + res);
        }, EXECUTOR);
    }

    /**
     * applyToEitherAsync
     * 两个任务只要有一个完成,就执行任务3,感知结果,自己有返回值
     */
    @Test
    public void taskApply() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("task 1 end...");
            return String.valueOf(i);
        }, EXECUTOR);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task 2:" + Thread.currentThread().getId());
            System.out.println("task 2 end...");
            try {
                Thread.sleep(3000);
                System.out.println("task 2 end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, EXECUTOR);
        CompletableFuture<String> completableFuture = future1.applyToEitherAsync(future2, (res) -> {
            System.out.println("task 3 start....." + res);
            return "task 3 end...";
        }, EXECUTOR);
        System.out.println(completableFuture.get());
    }

    /**
     * 多任务组合
     */
    /**
     * allOf 所有任务都执行完
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void multiTaskCombine() throws ExecutionException, InterruptedException {
        CompletableFuture<String> productInfo = CompletableFuture.supplyAsync(() -> {
            System.out.println("product info");
            return "product info";
        }, EXECUTOR);
        CompletableFuture<String> productProperty = CompletableFuture.supplyAsync(() -> {
            System.out.println("product property");
            return "product property";
        }, EXECUTOR);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("查询商品介绍信息");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为...";
        }, EXECUTOR);
        CompletableFuture<Void> allOf = CompletableFuture.allOf(productInfo, productProperty, future);
        allOf.get();
    }

    /**
     * anyOf 其中有一个任务执行完就可以
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void multiTaskCombineAny() throws ExecutionException, InterruptedException {
        CompletableFuture<String> productInfo = CompletableFuture.supplyAsync(() -> {
            System.out.println("product info");
            return "product info";
        }, EXECUTOR);
        CompletableFuture<String> productProperty = CompletableFuture.supplyAsync(() -> {
            System.out.println("product property");
            return "product property";
        }, EXECUTOR);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("查询商品介绍信息");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为...";
        }, EXECUTOR);
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(productInfo, productProperty, future);
        anyOf.get();
    }


}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容