import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.*;
@Slf4j
public class CompletableFutureTest {
ExecutorService EXECUTOR = new ThreadPoolExecutor(5, 20, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* runAsync 无返回值
* 如果在参数中包含指定executor的话,任务在这个executor执行;
* 如果没有指定executor,在ForkJoinPool.commonPool()线程池中运行
*/
@Test
public void runAsyncMethod() {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("current thread:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("result:" + i);
}, EXECUTOR);
}
/**
* supplyAsync有返回值
* 如果在参数中包含指定executor的话,任务在这个executor执行;
* 如果没有指定executor,在ForkJoinPool.commonPool()线程池中运行
* whenComplete 能感知异常,能感知结果,但没方法给返回值
* exceptionally能感知异常,不能感知结果,能给返回值。相当于,如果出现异常就返回这个值
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void supplyAsyncMethod() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread:" + Thread.currentThread().getId());
//int i = 10 / 2;
int i = 10 / 0;
System.out.println("result:" + i);
return i;
}, EXECUTOR).whenComplete((res, exception) -> {
System.out.println("async task end,result:" + res + ";exception:" + exception);
}).exceptionally(throwable -> {
return 10;
});
System.out.println("result:" + future.get());
}
/**
* supplyAsync有返回值
* 如果在参数中包含指定executor的话,任务在这个executor执行;
* 如果没有指定executor,在ForkJoinPool.commonPool()线程池中运行
* handle能拿到返回结果,也能的到异常信息,也能修改返回值
* 带有async的方法,如果指定线程池,就使用指定的线程池,如果没有指定线程池,就使用默认的线程池
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void supplyAsyncMethodByHandle() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("result:" + i);
return i;
}, EXECUTOR).handle((res, exception) -> {
if (null != exception) {
return 0;
} else {
return res * 2;
}
});
System.out.println("result:" + future.get());
}
/**
* 任务线性化
* thenRunAsync:不能接收上一次的执行结果,也没返回值
*
* @throws InterruptedException
*/
@Test
public void taskSerial() throws InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("current thread:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("result:" + i);
return i;
}, EXECUTOR).thenRunAsync(() -> {
System.out.print("task 2 start....");
}, EXECUTOR);
}
/**
* 任务线性化
* thenAcceptAsync:能接收上一次的执行结果,但没返回值
*/
@Test
public void taskSerialAccept() {
CompletableFuture.supplyAsync(() -> {
System.out.println("current thread:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("result:" + i);
return i;
}, EXECUTOR).thenAcceptAsync(res -> {
System.out.print("task 2 start...." + res);
}, EXECUTOR);
}
/**
* 任务线性化
* 前面两个任务都完成,才执行任务3
* thenApplyAsync:能接收上一次的执行结果,有可以有返回值
*/
@Test
public void taskSerialApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("result:" + i);
return i;
}, EXECUTOR).thenApplyAsync(res -> {
System.out.println("task 2 start...." + res);
return "hello:" + res + "\n";
}, EXECUTOR);
System.out.print("result:" + future.get());
}
/**
* runAfterBothAsync
* 任务1和任务2都完成后,在执行任务3,不感知任务1,2的结果,也没有返回值
*
* @throws InterruptedException
*/
@Test
public void taskBianPai() throws InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("task 1 end...");
return i;
}, EXECUTOR);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 2:" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
System.out.println("task 2 end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, EXECUTOR);
future1.runAfterBothAsync(future2, () -> {
System.out.println("task 3 start...");
}, EXECUTOR);
}
/**
* thenAcceptBothAsync
* 任务1和任务2都完成后,在执行任务3,感知任务1,2的结果,也没有返回值
*/
@Test
public void taskBianPaiAccept() {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("task 1 end...");
return i;
}, EXECUTOR);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 2:" + Thread.currentThread().getId());
System.out.println("task 2 end...");
try {
Thread.sleep(3000);
System.out.println("task 2 end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, EXECUTOR);
CompletableFuture<Void> completableFuture = future1.thenAcceptBothAsync(future2, (f1, f2) -> {
System.out.println(f1);
System.out.println(f2);
System.out.println("task 3 start...,f1:" + f1 + ",f2:" + f2);
}, EXECUTOR);
}
/**
* thenCombineAsync
* 任务1和任务2都完成后,在执行任务3,感知任务1,2的结果,可以自己带返回值
*/
@Test
public void taskBianPaiCombine() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("task 1 end...");
return i;
}, EXECUTOR);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 2:" + Thread.currentThread().getId());
System.out.println("task 2 end...");
try {
Thread.sleep(3000);
System.out.println("task 2 end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, EXECUTOR);
CompletableFuture<String> completableFuture = future1.thenCombineAsync(future2, (f1, f2) -> {
return "task 3 start...,f1:" + f1 + ",f2:" + f2;
}, EXECUTOR);
System.out.println(completableFuture.get());
}
/**
* 三任务组合,前两个任务只要有一个完成,就执行任务3
*/
/**
* thenCombineAsync
* 两个任务只要有一个完成,就执行任务3,不感知结果,自己没返回值
*/
@Test
public void taskRun() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("task 1 end...");
return i;
}, EXECUTOR);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 2:" + Thread.currentThread().getId());
System.out.println("task 2 end...");
try {
Thread.sleep(3000);
System.out.println("task 2 end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, EXECUTOR);
CompletableFuture<Void> completableFuture = future1.runAfterEitherAsync(future2, () -> {
System.out.println("task 3 start.....");
}, EXECUTOR);
}
/**
* acceptEitherAsync
* 两个任务只要有一个完成,就执行任务3,感知结果,自己没返回值
*/
@Test
public void taskAccept() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("task 1 end...");
return String.valueOf(i);
}, EXECUTOR);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 2:" + Thread.currentThread().getId());
System.out.println("task 2 end...");
try {
Thread.sleep(3000);
System.out.println("task 2 end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, EXECUTOR);
CompletableFuture<Void> completableFuture = future1.acceptEitherAsync(future2, (res) -> {
System.out.println("task 3 start....." + res);
}, EXECUTOR);
}
/**
* applyToEitherAsync
* 两个任务只要有一个完成,就执行任务3,感知结果,自己有返回值
*/
@Test
public void taskApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 1:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("task 1 end...");
return String.valueOf(i);
}, EXECUTOR);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task 2:" + Thread.currentThread().getId());
System.out.println("task 2 end...");
try {
Thread.sleep(3000);
System.out.println("task 2 end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, EXECUTOR);
CompletableFuture<String> completableFuture = future1.applyToEitherAsync(future2, (res) -> {
System.out.println("task 3 start....." + res);
return "task 3 end...";
}, EXECUTOR);
System.out.println(completableFuture.get());
}
/**
* 多任务组合
*/
/**
* allOf 所有任务都执行完
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void multiTaskCombine() throws ExecutionException, InterruptedException {
CompletableFuture<String> productInfo = CompletableFuture.supplyAsync(() -> {
System.out.println("product info");
return "product info";
}, EXECUTOR);
CompletableFuture<String> productProperty = CompletableFuture.supplyAsync(() -> {
System.out.println("product property");
return "product property";
}, EXECUTOR);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("查询商品介绍信息");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "华为...";
}, EXECUTOR);
CompletableFuture<Void> allOf = CompletableFuture.allOf(productInfo, productProperty, future);
allOf.get();
}
/**
* anyOf 其中有一个任务执行完就可以
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void multiTaskCombineAny() throws ExecutionException, InterruptedException {
CompletableFuture<String> productInfo = CompletableFuture.supplyAsync(() -> {
System.out.println("product info");
return "product info";
}, EXECUTOR);
CompletableFuture<String> productProperty = CompletableFuture.supplyAsync(() -> {
System.out.println("product property");
return "product property";
}, EXECUTOR);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("查询商品介绍信息");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "华为...";
}, EXECUTOR);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(productInfo, productProperty, future);
anyOf.get();
}
}
CompletableFuture
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- CompletableFuture常见用法,CompletableFuture使用示例,CompletableFu...
- ThreadPool############################################## ...
- CompletableFuture实现异步并阻塞获取返回结果,巧用CompletableFuture返回值解决性能...
- 前提概要 在java8以前,我们使用java的多线程编程,一般是通过Runnable中的run方法来完成,这种方式...