CompletableFuture原理与实践

准备

  1. 保证写入时,线程安全的List和Set结构?
    ConcurrentHashMap是线程安全的HashMap,CopyOnWriteArrayList是线程安全的ArrayList。
    CopyOnWriteArraySet是线程安全的HashSet。

  2. 考虑多线程处理任务点?

任务的类型: 计算能力复杂,IO操作;

任务是否异步: 同步,异步

每个子线程是否有依赖关系:有,没有 (例如:使用多线程处理 从1累加到1万)

如果是多个任务,每个任务使用多线程处理,主线程需要等待子线程任务执行完毕之后在进行执行: 在没有CompletableFuture 使用CountDownLatch解决
https://blog.csdn.net/qq_38599840/article/details/120708245

使用CompletableFuture处理依赖任务

  1. 线程池的方式并行获取数据弊端?
  • CPU资源大量浪费在阻塞等待上,导致CPU资源利用率低。在Java 8之前,一般会通过回调的方式来减少阻塞,但是大量使用回调,又引发臭名昭著的回调地狱问题,导致代码可读性和可维护性大大降低。

  • 为了增加并发度,会引入更多额外的线程池,随着CPU调度线程数的增加,会导致更严重的资源争用,宝贵的CPU资源被损耗在上下文切换上,而且线程本身也会占用系统资源,且不能无限增加。
    同步模型下,会导致硬件资源无法充分利用,系统吞吐量容易达到瓶颈。

  1. 保证了多线程写入时安全,咋样使用什么样的多线程呢?
    使用executor.submit(() -> {})处理RPC任务;
    使用future处理每一个子线程是否 终端还是跳过 逻辑
public class Test3 {
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,

            new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
            .setNameFormat("coustomThread %d")
            .setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕获到:" +t.getName()+"发生异常:"+e.getMessage()))
            .build());

    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);//线程不安全List
        List<String> test = test(list);
        System.out.println("主线程:" + Thread.currentThread().getName() + ":"  + test);

    }

    public static List<String> test(List<Integer> list) {
        List<String> safeList = new CopyOnWriteArrayList();//线程安全的list

        list.stream().map(i -> executor.submit(() -> {
            //模拟rpc
            try {
                Thread.sleep(100);
                if(i == 3) {
                    int j = i / 0;
                }
                safeList.add(i + "A");
                System.out.println("子线程:" + Thread.currentThread().getName() + "返回参数:" + i + "A");
                return i + "A";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).collect(Collectors.toList()).forEach(future -> {
            try {
                if (future != null) {
                    Object o = future.get();
//                    System.out.println("子线程:" + Thread.currentThread().getName() + "返回参数:" + o);
                }
            } catch (InterruptedException e) {
                //如果不抛出异常,那么线程执行不会终端;反之,如果抛出异常,则线程中断
//                throw new RuntimeException(e);
            } catch (ExecutionException e) {
//                throw new RuntimeException(e);
            }
        });

        System.out.println(123);
        return safeList;

    }

``

4. 使用CountDownLatch主线程需要等待子线程任务执行完毕之后在进行执行?

```java

public class Test4 {
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,

            new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
            .setNameFormat("coustomThread %d")
            .setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕获到:" +t.getName()+"发生异常:"+e.getMessage()))
            .build());

    public static void main(String[] args) throws Exception{
        System.out.println("主线程开始:" + Thread.currentThread().getName());
        CountDownLatch downLatch = new CountDownLatch(2);

        //任务1
        AtomicReference<String> q1 = new AtomicReference<>("");//线程安全
        Future<String> future1 = executor.submit(() -> {
            try {
                Thread.sleep(1500);
                q1.set("任务1");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
//            downLatch.countDown();
            return "任务1";

        });

        //任务2
        AtomicReference<String> q2 = new AtomicReference<>("");//线程安全
        Future<String> future2 = executor.submit(() -> {
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            q2.set("任务2" + q1.get());
//            downLatch.countDown();
            return "任务2" + q1.get();

        });
//        downLatch.await();

        Thread.sleep(600);

        System.out.println("获取任务1返回:" + future1.get());

        System.out.println("获取任务2返回:" + future2.get());

        System.out.println("主线程结束:" + Thread.currentThread().getName());



    }

    /*
    *   主线程开始:main
        获取任务1返回:任务1
        获取任务2返回:任务2任务1
        主线程结束:main
    * */


}

CompletableFuture

实例

 public void testCompletableInfo() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
  
          //调用用户服务获取用户基本信息
          CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
                  //模拟查询商品耗时500毫秒
          {
              try {
                  Thread.sleep(500);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return "用户A";
          });
  
          //调用商品服务获取商品基本信息
          CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
                  //模拟查询商品耗时500毫秒
          {
              try {
                  Thread.sleep(400);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return "商品A";
          });
  
          System.out.println("获取用户信息:" + userFuture.get());
          System.out.println("获取商品信息:" + goodsFuture.get());
  
          //模拟主程序耗时时间
          Thread.sleep(600);
          System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }

相关的方法使用


public class TestFuture1 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        //自定义线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        //cf1 cf2是0依赖
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "cf1", executor);
        CompletableFuture<String> cf2 = CompletableFuture.completedFuture("cf2");

        //CF3,CF5分别依赖于CF1和CF2,一元依赖
        CompletableFuture<String> cf3 = cf1.thenApply(res1 -> "cf3");
        CompletableFuture<String> cf5 = cf2.thenApply(res2 -> "cf5");

        //cf4 依赖 cf1和cf2, 二元依赖
        CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (res1, res2) -> "cf4");

        //cf6 依赖 cf3, cf4, cf5, 多元依赖
        CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);

        //最终结果
        CompletableFuture<String> result = cf6.thenApply(v -> "cf6");

//        //模拟主程序耗时时间
//        Thread.sleep(600);
//        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }


}


注意事项

  1. fulture需要返回值,才能获取异常信息
    Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。

小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。

  1. CompletableFuture.get()方法是阻塞的(CompletableFuture.get(5, TimeUnit.SECONDS);)
    CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

  2. 不建议使用默认的线程池(ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。)

CompletableFuture代码中又使用了默认的「ForkJoin线程池」,处理的线程个数是电脑「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数;
前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。

当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

  1. 自定义线程池,注意饱和策略
    CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

https://mp.weixin.qq.com/s?__biz=MzIzNjg4OTcyNA==&mid=2247484855&idx=1&sn=ad340b3fe63ab30d2e3e001c0f6edb87&scene=21&key=02eedae091b155c60a961e782b2307ab64d986af0bdcad1a803e2a12d697dd24740aaee860a4fa0fad55cef1a1e488a18bc593982f805ebb39a819488a5896ecbf4198d00f973900948cf2450fe53b502954d43ccf30279c71a744e39e7e6559f5fcc141b7f913c23e5e371fac0c1905fce64eab8c7ee55cb1f3f68436a26cee&ascene=14&uin=NzYxNzQxNTIx&devicetype=Windows+10+x64&version=6308011a&lang=zh_CN&exportkey=n_ChQIAhIQu9%2FTeSfvf6HU%2FTFANi4G7BL0AQIE97dBBAEAAAAAAOHfOF8YvT4AAAAOpnltbLcz9gKNyK89dVj0fLR5hkgYPSrWBvYpc1TcPAiVSvNkWX%2FSHdP%2FD9K%2BTEsBVfqk%2B0oZOhMs%2FN9hSBM77%2FiQZZ%2BphmkyMVh4hMNPlp8%2B%2BpmMqsXyA9jy%2BbvS9EYRHMNvj1UNHI0%2FjCL26xTGiPvunos1SXLWxiYCO47rLZOT%2FwFIMFickER5%2B9%2B15bZdDPLJrUP1vPI%2BDpwWZzckwFZiYtElitvSZd2tTe0k0Adc92mbaS34HH4clKUGLFXomPHfSV5cMf9fHayGoHu%2BTJAWjW9QNM8FoiFIVQk%3D&acctmode=0&pass_ticket=%2BfzqdTXdyhHUndM9%2F8BTisVSSsPUfiRdNFUYSuosHmHEOb2SSG2ljnJnKZfSy7vL3E1zqV87pHg%2BAagXuPt85w%3D%3D&wx_header=1&fontgear=2

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容