什么是CompletableFuture?
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。
通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。
什么是Supplier?
DK提供了大量常用的函数式接口以丰富Lambda的典型使用场景,它们主要在 java.util.function 包中被提供。
java.util.function.Supplier<T> 接口仅包含一个无参的方法: T get() 。用来获取一个泛型参数指定类型的对象数据。
下面通过2个Demo列出疑问点和最终问题的解答
- 首先定义一个线程池,严谨点不要使用JDK自带的工具类
/**
* OMS订单线程池
*/
public static final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(
50,
100,
60L,
TimeUnit.SECONDS,new LinkedBlockingQueue(20000),
new DefaultThreadFactory("omsOrder"),
new ThreadPoolExecutor.CallerRunsPolicy()));
}
- 为了方便验证,我重写了DefaultThreadFactory线程池工厂自定义线程名称,这样方便测试
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* 自定义线程名称
* @param threadName
*/
DefaultThreadFactory(String threadName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-"+threadName;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
- Demo1:
@Override
public CommonResp sendSms() {
//准备基础数据,一个订单集合,多少个订单就发送多个条短信
List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
//批量发送
List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
CompletableFuture[] completableFutures = omsOrderDOList.stream().map(omsOrderDO ->
CompletableFuture.supplyAsync(() -> test(omsOrderDO, this::send), ExecutorConfig.executorService).whenCompleteAsync((val, e) -> {
if (val != null) {
resultList.add(val);
}
}).exceptionally(e -> {
log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms:e{}", e);
return null;
})
).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
System.out.println(Thread.currentThread().getName()+"主线程执行完成");
resultList.forEach(System.out::println);
return CommonResp.ok(resultList);
}
上面几行代码的逻辑是:到数据库查询订单集合,遍历通过CompletableFuture线程池批量调用send方法发送短信,然后打印主线程的线程名,send方面也打印了当前执行的线程名称,代码如下:
private Pair<Long,String> send(OmsOrderDO omsOrderDO){
//调用短信服务发送短信
MessageCmd messageCmd = new MessageCmd();
messageCmd.setTos(Arrays.asList(omsOrderDO.getBillReceiverEmail()));
messageCmd.setSubject(omsOrderDO.getOrderSn());
messageCmd.setText(omsOrderDO.getReceiverName());
CommonResp smsCaptcha = commonClient.sendBatchSMS(messageCmd);
//打印当前线程的名称
System.out.println("我是线程:"+Thread.currentThread().getName());
return new Pair(omsOrderDO.getId(),smsCaptcha.getCode());
}
然后执行,看结果:
我是线程:pool-1-thread-omsOrder1
我是线程:pool-1-thread-omsOrder2
我是线程:pool-1-thread-omsOrder3
http-nio-18086-exec-3主线程执行完成
- 下面来看Demo2:
@Override
public CommonResp sendSms2() {
//基础数据
List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
//遍历基础数据的同事去调用send方法发送短信,将返回的结果放到集合
List<Supplier<Pair<Long,String>>> omsOrderDOSupplierList = new ArrayList<>();
for (OmsOrderDO omsOrderDO:omsOrderDOList){
omsOrderDOSupplierList.add(()->send(omsOrderDO));
}
//遍历集合,将结果放入到CompletableFuture,这里我就有一个疑问了,这里放入的是一个发送结果,
// 那是不是代码走到这里的时候就sen方法就已经同步的执行完了,没有走线程池呢?
List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
CompletableFuture[] completableFutures = omsOrderDOSupplierList.stream().map(sup ->
CompletableFuture.supplyAsync(sup, ExecutorConfig.executorService).whenCompleteAsync((r, e) -> {
if (r != null){
resultList.add(r);
}
}).exceptionally(e -> {
log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms2:e{}", e);
return null;
})
).toArray(CompletableFuture[]::new);
System.out.println(Thread.currentThread().getName()+"主线程执行完成");
CompletableFuture.allOf(completableFutures).join();
return CommonResp.ok(resultList);
}
上面注释我写明我的疑问点,我一直认为这样写send方法是没有经过线程池的,为了证明send方式是走的同步还是异步,执行输出线程的名称就能确定了,输入结果如下:
我是线程:pool-1-thread-omsOrder4
http-nio-18086-exec-1主线程执行完成
我是线程:pool-1-thread-omsOrder5
我是线程:pool-1-thread-omsOrder6
从上面执行的结果来看,send方法就是通过线程池执行的,都快自闭了,怎么都想不明白,后来请教我同事龙哥,他给我说了情况,我才理解:首先Supplier是一个函数,这个函数有一个特征类似延迟加载,也可以理解成一个回调函数,只有执行Supplier.get()的时候这个函数才是真正的执行,所以我上面 omsOrderDOSupplierList.add(()->send(omsOrderDO));这行代码只是放了一个函数进去,没有真正的执行send方法,为了验证我同事的这个说法,我把线程池那一段代码注释掉在执行,看下send方法里面的日志是否会打印,代码如下:
@Override
public CommonResp sendSms2() {
//基础数据
List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
//遍历基础数据的同事去调用send方法发送短信,将返回的结果放到集合
List<Supplier<Pair<Long,String>>> omsOrderDOSupplierList = new ArrayList<>();
for (OmsOrderDO omsOrderDO:omsOrderDOList){
omsOrderDOSupplierList.add(()->send(omsOrderDO));
}
//遍历集合,将结果放入到CompletableFuture,这里我就有一个疑问了,这里放入的是一个发送结果,
// 那是不是代码走到这里的时候就sen方法就已经同步的执行完了,没有走线程池呢?
List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
/* CompletableFuture[] completableFutures = omsOrderDOSupplierList.stream().map(sup ->
CompletableFuture.supplyAsync(sup, ExecutorConfig.executorService).whenCompleteAsync((r, e) -> {
if (r != null){
resultList.add(r);
}
}).exceptionally(e -> {
log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms2:e{}", e);
return null;
})
).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();*/
System.out.println(Thread.currentThread().getName()+"主线程执行完成");
return CommonResp.ok(resultList);
}
- 重启服务,然后看下结果:
2022-05-11 17:21:00.908 INFO 1300 --- [ main] c.a.c.n.registry.NacosServiceRegistry : nacos registry, DEFAULT_GROUP oms-server 10.31.3.154:18086 register finished
2022-05-11 17:21:00.995 INFO 1300 --- [ main] com.formssi.mall.order.OrderApplication : Started OrderApplication in 9.394 seconds (JVM running for 10.426)
2022-05-11 17:21:13.794 INFO 1300 --- [io-18086-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2022-05-11 17:21:13.794 INFO 1300 --- [io-18086-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2022-05-11 17:21:13.804 INFO 1300 --- [io-18086-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 10 ms
http-nio-18086-exec-1主线程执行完成
果然没有打印send里面的线程日志,说明send方法确实没有执行,最后在看下omsOrderDOSupplierList.add(()->send(omsOrderDO));中的这个omsOrderDOSupplierList这个对象里面是不是放的对应的函数,如图:
从上图来看,omsOrderDOSupplierList确实存放3个函数,好了收工!