2024-01-08 CompletableFuture异步编程

Runnable+Thread虽然提供了多线程的能力但是没有返回值。

Callable+Thread的方法提供多线程和返回值的能力但是在获取返回值的时候会阻塞主线程。

Future+ThreadPool会引入更多额外的线程池,随着CPU调度线程数的增加,会导致更严重的资源争用,宝贵的CPU资源被损耗在上下文切换上,而且线程本身也会占用系统资源,且不能无限增加。

CompletableFuture支持任务零依赖、一元依赖、二元依赖和多元依赖。并且代码清晰,可读性高。

@Test
     public void test1() throws ExecutionException, InterruptedException {
         CompletableFuture<Integer> future = new CompletableFuture<>();
         Integer integer = future.get();
     }

     /**
      * runAsync
      * 使用场景:提交异步任务,无返回值
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testRunAsync() throws ExecutionException, InterruptedException {
         System.out.println("main start");
         CompletableFuture.runAsync(() -> {
             System.out.println("读取文件开始");
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("读取文件结束");
         });
         try {
             Thread.sleep(4000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("主线程结束");
     }

     /**
      * supplyAsync
      * 使用场景:提交异步任务,有返回值
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testSupplyAsync() throws ExecutionException, InterruptedException {
         System.out.println("main start");
         CompletableFuture<String> resultFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return "读取文件结束";
         });
         String result = resultFuture.get();
         System.out.println("异步线程返回结果:"+result);
         System.out.println("主线程结束");
     }

     /**
      * 使用自定义线程池
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void test4() throws ExecutionException, InterruptedException {
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture<String> resultFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return "读取文件结束";
         },executorService);
         String result = resultFuture.get();
         System.out.println("异步线程返回结果:"+result);
         System.out.println("主线程结束");
     }

     /**
      * thenApply
      * 任务编排:下一个任务需要上一个任务的返回值
      *
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void test5() throws ExecutionException, InterruptedException {
        //需求:异步读取文件中的内容,读取完成后将文件内容切割为数组返回敏感词
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             return "tmd,尼玛,日了狗";
         },executorService);
         CompletableFuture<String[]> completableFuture = readFileFuture.thenApply(text -> {
             System.out.println("把文件内容转换成敏感词数组");
             return text.split(",");
         });
         System.out.println("main continue");
         String[] strings = completableFuture.get();
         System.out.println("敏感词词组:"+ Arrays.toString(strings));
         System.out.println("主线程结束");
     }

     /**
      * thenAccept 无返回值, 一般是作为整个调用链路的最后一步
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testThenAccept() throws ExecutionException, InterruptedException {
         //需求:异步读取文件中的内容,读取完成后将文件内容切割为数组返回敏感词
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             return "tmd,尼玛,日了狗";
         },executorService).thenApply(text -> {
             System.out.println("把文件内容转换成敏感词数组");
             return text.split(",");
         }).thenAccept(arrays -> System.out.println("打印敏感词数组:"+Arrays.toString(arrays)));
         System.out.println("main continue");
         Thread.sleep(4000);
         System.out.println("主线程结束");
     }

     /**
      * thenRun 无入参,无返回值,
      * 作用:当异步任务完成后,只想得到一个完整的通知,不使用上一步异步任务的结果,
      *       通常会放到链式操作的末端,和thenAccept相似。
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testThenRun() throws ExecutionException, InterruptedException {
         //需求:异步读取文件中的内容,读取完成后将文件内容切割为数组返回敏感词
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             return "tmd,尼玛,日了狗";
         },executorService).thenRun(() -> {
             System.out.println("文件读取完成");
         });
         System.out.println("main continue");
         Thread.sleep(4000);
         System.out.println("主线程结束");
     }

     /**
      * thenCompose
      * 作用:编排两个依赖关系的异步任务,需要使用thenCompose
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testThenCompose() throws ExecutionException, InterruptedException {
         //需求:异步读取文件中的内容,读取完成后将文件内容切割为数组返回敏感词
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture<String[]> completableFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             return "tmd,尼玛,日了狗";
         }, executorService).thenCompose(content ->
                 CompletableFuture.supplyAsync(() -> {
                             System.out.println("把内容转换成敏感词数组");
                             String[] split = content.split(",");
                             return split;
                         }
                 ));
         System.out.println("main continue");
         Thread.sleep(4000);
         System.out.println("敏感词数组:"+Arrays.toString(completableFuture.get()));
         System.out.println("主线程结束");
     }

     /**
      * thenCompose
      * 作用:如果两个Future之间没有依赖关系,但是希望两个future独立运行并且在两者都完成之后执行回调操作
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testThenCombine() throws ExecutionException, InterruptedException {
         //需求:异步读取文件中的内容,读取完成后将文件内容切割为数组返回敏感词
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture<String[]> readFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("敏感词汇解析开始");
             return "tmd,尼玛,日了狗".split(",");
         }, executorService);
         CompletableFuture<String> contentFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             return "你tmd,尼玛,日了狗眼瞎了";
         }, executorService);
         CompletableFuture<String> combineFuture = readFuture.thenCombine(contentFuture, (words, content) -> {
             System.out.println("开始替换操作");
             for (String word:words) {
                 content = content.replace(word, "**");
             }
             return content;
         });
         System.out.println("main continue");
         Thread.sleep(4000);
         System.out.println("脱敏后:"+combineFuture.get());
         System.out.println("主线程结束");
     }

     /**
      * allOf
      * 作用:有多个需要独立运行的future,并在所有这些future都完成后执行一些需求
      *
      *  @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testAllOf() throws ExecutionException, InterruptedException {
         //需求:异步读取文件中的内容,读取完成后将文件内容切割为数组返回敏感词
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
             try {
                 Thread.sleep(20);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return "future1";
         }, executorService);
         CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
             try {
                 Thread.sleep(10);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return "future2";
         }, executorService);
         CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
             try {
                 Thread.sleep(4);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return "future3";
         }, executorService);
         CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future1, future2, future3);
         System.out.println("main continue");
         Thread.sleep(30);
         System.out.println("胜出者:"+objectCompletableFuture.get());
         System.out.println("主线程结束");
     }

     /**
      *
      * anyOf
      * 当给定的多个异步任务中有任意的future完成时,需要执行一些操作,可以用anyOf方法
      *
      * @throws ExecutionException
      * @throws InterruptedException
      */
     @Test
     public void testAnyOf() throws ExecutionException, InterruptedException {
         //需求:异步读取文件中的内容,读取完成后将文件内容切割为数组返回敏感词
         System.out.println("main start");
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         CompletableFuture<String[]> readFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("敏感词汇解析开始");
             return "tmd,尼玛,日了狗".split(",");
         }, executorService);
         CompletableFuture<String> contentFuture = CompletableFuture.supplyAsync(() -> {
             System.out.println("读取文件开始");
             return "你tmd,尼玛,日了狗眼瞎了";
         }, executorService);
         CompletableFuture<String> combineFuture = readFuture.thenCombine(contentFuture, (words, content) -> {
             System.out.println("开始替换操作");
             for (String word:words) {
                 content = content.replace(word, "**");
             }
             return content;
         });
         System.out.println("main continue");
         Thread.sleep(4000);
         System.out.println("脱敏后:"+combineFuture.get());
         System.out.println("主线程结束");
     }

使用须知

1.异步回调要传线程池
前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。

当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

死锁问题

public Object doGet() {
  ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  //do sth
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("child");
        return "child";
      }, threadPool1).join();//子任务
    }, threadPool1);
  return cf1.join();
}

如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。

为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。

2.异步RPC调用注意不要阻塞IO线程池
服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容