CompletableFuture搭配Supplier函数使用心得

什么是CompletableFuture?

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

什么是Supplier?

DK提供了大量常用的函数式接口以丰富Lambda的典型使用场景,它们主要在 java.util.function 包中被提供。

java.util.function.Supplier<T> 接口仅包含一个无参的方法: T get() 。用来获取一个泛型参数指定类型的对象数据。

下面通过2个Demo列出疑问点和最终问题的解答
  • 首先定义一个线程池,严谨点不要使用JDK自带的工具类
    /**
     * OMS订单线程池
     */
    public static final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
            new ThreadPoolExecutor(
                    50,
                    100,
                    60L,
                    TimeUnit.SECONDS,new LinkedBlockingQueue(20000),
                    new DefaultThreadFactory("omsOrder"),
                    new ThreadPoolExecutor.CallerRunsPolicy()));
}
  • 为了方便验证,我重写了DefaultThreadFactory线程池工厂自定义线程名称,这样方便测试
public class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    /**
     * 自定义线程名称
     * @param threadName
     */
    DefaultThreadFactory(String threadName) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                poolNumber.getAndIncrement() +
                "-thread-"+threadName;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
  • Demo1:
    @Override
    public CommonResp sendSms() {
        //准备基础数据,一个订单集合,多少个订单就发送多个条短信
        List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
        //批量发送
        List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
        CompletableFuture[] completableFutures = omsOrderDOList.stream().map(omsOrderDO ->
                CompletableFuture.supplyAsync(() -> test(omsOrderDO, this::send), ExecutorConfig.executorService).whenCompleteAsync((val, e) -> {
                    if (val != null) {
                        resultList.add(val);
                    }
                }).exceptionally(e -> {
                    log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms:e{}", e);
                    return null;
                })
        ).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(completableFutures).join();
        System.out.println(Thread.currentThread().getName()+"主线程执行完成");
        resultList.forEach(System.out::println);
        return CommonResp.ok(resultList);
    }

上面几行代码的逻辑是:到数据库查询订单集合,遍历通过CompletableFuture线程池批量调用send方法发送短信,然后打印主线程的线程名,send方面也打印了当前执行的线程名称,代码如下:

    private Pair<Long,String> send(OmsOrderDO omsOrderDO){
        //调用短信服务发送短信
        MessageCmd messageCmd = new MessageCmd();
        messageCmd.setTos(Arrays.asList(omsOrderDO.getBillReceiverEmail()));
        messageCmd.setSubject(omsOrderDO.getOrderSn());
        messageCmd.setText(omsOrderDO.getReceiverName());
        CommonResp smsCaptcha = commonClient.sendBatchSMS(messageCmd);
        //打印当前线程的名称
        System.out.println("我是线程:"+Thread.currentThread().getName());
        return new Pair(omsOrderDO.getId(),smsCaptcha.getCode());
    }

然后执行,看结果:

我是线程:pool-1-thread-omsOrder1
我是线程:pool-1-thread-omsOrder2
我是线程:pool-1-thread-omsOrder3
http-nio-18086-exec-3主线程执行完成
  • 下面来看Demo2:
    @Override
    public CommonResp sendSms2() {
        //基础数据
        List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
        //遍历基础数据的同事去调用send方法发送短信,将返回的结果放到集合
        List<Supplier<Pair<Long,String>>> omsOrderDOSupplierList = new ArrayList<>();
        for (OmsOrderDO omsOrderDO:omsOrderDOList){
            omsOrderDOSupplierList.add(()->send(omsOrderDO));
        }
        //遍历集合,将结果放入到CompletableFuture,这里我就有一个疑问了,这里放入的是一个发送结果,
        // 那是不是代码走到这里的时候就sen方法就已经同步的执行完了,没有走线程池呢?
        List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
        CompletableFuture[] completableFutures = omsOrderDOSupplierList.stream().map(sup ->
                CompletableFuture.supplyAsync(sup, ExecutorConfig.executorService).whenCompleteAsync((r, e) -> {
                    if (r != null){
                        resultList.add(r);
                    }
                }).exceptionally(e -> {
                    log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms2:e{}", e);
                    return null;
                })
        ).toArray(CompletableFuture[]::new);
        System.out.println(Thread.currentThread().getName()+"主线程执行完成");
        CompletableFuture.allOf(completableFutures).join();
        return CommonResp.ok(resultList);
    }

上面注释我写明我的疑问点,我一直认为这样写send方法是没有经过线程池的,为了证明send方式是走的同步还是异步,执行输出线程的名称就能确定了,输入结果如下:

我是线程:pool-1-thread-omsOrder4
http-nio-18086-exec-1主线程执行完成
我是线程:pool-1-thread-omsOrder5
我是线程:pool-1-thread-omsOrder6

从上面执行的结果来看,send方法就是通过线程池执行的,都快自闭了,怎么都想不明白,后来请教我同事龙哥,他给我说了情况,我才理解:首先Supplier是一个函数,这个函数有一个特征类似延迟加载,也可以理解成一个回调函数,只有执行Supplier.get()的时候这个函数才是真正的执行,所以我上面 omsOrderDOSupplierList.add(()->send(omsOrderDO));这行代码只是放了一个函数进去,没有真正的执行send方法,为了验证我同事的这个说法,我把线程池那一段代码注释掉在执行,看下send方法里面的日志是否会打印,代码如下:

    @Override
    public CommonResp sendSms2() {
        //基础数据
        List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
        //遍历基础数据的同事去调用send方法发送短信,将返回的结果放到集合
        List<Supplier<Pair<Long,String>>> omsOrderDOSupplierList = new ArrayList<>();
        for (OmsOrderDO omsOrderDO:omsOrderDOList){
            omsOrderDOSupplierList.add(()->send(omsOrderDO));
        }
        //遍历集合,将结果放入到CompletableFuture,这里我就有一个疑问了,这里放入的是一个发送结果,
        // 那是不是代码走到这里的时候就sen方法就已经同步的执行完了,没有走线程池呢?
        List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
/*        CompletableFuture[] completableFutures = omsOrderDOSupplierList.stream().map(sup ->
                CompletableFuture.supplyAsync(sup, ExecutorConfig.executorService).whenCompleteAsync((r, e) -> {
                    if (r != null){
                        resultList.add(r);
                    }
                }).exceptionally(e -> {
                    log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms2:e{}", e);
                    return null;
                })
        ).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(completableFutures).join();*/
        System.out.println(Thread.currentThread().getName()+"主线程执行完成");
        return CommonResp.ok(resultList);
    }
  • 重启服务,然后看下结果:
2022-05-11 17:21:00.908  INFO 1300 --- [           main] c.a.c.n.registry.NacosServiceRegistry    : nacos registry, DEFAULT_GROUP oms-server 10.31.3.154:18086 register finished
2022-05-11 17:21:00.995  INFO 1300 --- [           main] com.formssi.mall.order.OrderApplication  : Started OrderApplication in 9.394 seconds (JVM running for 10.426)
2022-05-11 17:21:13.794  INFO 1300 --- [io-18086-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2022-05-11 17:21:13.794  INFO 1300 --- [io-18086-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2022-05-11 17:21:13.804  INFO 1300 --- [io-18086-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 10 ms
http-nio-18086-exec-1主线程执行完成

果然没有打印send里面的线程日志,说明send方法确实没有执行,最后在看下omsOrderDOSupplierList.add(()->send(omsOrderDO));中的这个omsOrderDOSupplierList这个对象里面是不是放的对应的函数,如图:


微信图片_20220511172525.png

从上图来看,omsOrderDOSupplierList确实存放3个函数,好了收工!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容