在JDK1.8中 提供了CompletableFuture类来进行异步编程,下面我们一起看看怎么实现
1.创建异步任务
package com.wwj.test.thread;
import java.util.Optional;
import java.util.concurrent.*;
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
//创建有返回值的异步线程
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
//具体的业务逻辑
int a = 10 / 5;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
return a;
}, executorService).handle((value,thr)-> {
//使用handle进行处理也可以用其它方法
//CompletableFuture.whenComplete():用于接收带有返回值的CompletableFuture对象,无法修改返回值。
//CompletableFuture.exceptionally():用于处理异常,只要异步线程中有抛出异常,则进入该方法,修改返回值。
//CompletableFuture.handle():用于处理返回结果,可以接收返回值和异常,可以对返回值进行修改。
if (thr != null) {
return -1;
}
System.out.println("后续线程处理");
System.out.println(Thread.currentThread().getName());
//handle对线程的后续处理
Optional<Integer> value1 = Optional.ofNullable(value);
Integer integer = value1.get();
return integer;
});
//创建无返回值线程任务
CompletableFuture.runAsync(()->{
//具体的业务逻辑
int a = 10 / 2;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
System.out.println(a);
},executorService);
System.out.println("main 线程工作");
int result = integerCompletableFuture.get();
System.out.println(result);
}
}
2.多异步任务进行组合
2.1多异步任务串行
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
//创建有返回值的异步线程
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
//具体的业务逻辑
System.out.println(Thread.currentThread().getName());
System.out.println("任务1执行");
return 0;
});
//在上一个异步线程完成后执行下一个异步线程(异步线程的串行)
CompletableFuture<Integer> integerCompletableFuture1 = integerCompletableFuture.thenApplyAsync((value -> {
System.out.println(Thread.currentThread().getName());
System.out.println("任务1的返回值" + value);
return 2;
}), executorService);
System.out.println("main 线程工作");
int result = integerCompletableFuture1.get();
System.out.println(result);
//注:
// 使线程串行执行,无入参,无返回值
//public CompletableFuture<Void> thenRun(Runnable action);
//public CompletableFuture<Void> thenRunAsync(Runnable action);
//public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
//// 使线程串行执行,有入参,无返回值
//public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
//public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
//public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
//// 使线程串行执行,有入参,有返回值
//public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
//public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
//public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
}
}
2.2两个任务并行执行完成后再执行下一个任务
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
// 任务1
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getName());
int i = 10 / 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1结束:");
return i;
}, executorService);
// 任务2
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程:" + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结束");
return 0;
}, executorService);
// 线程并行执行完成,并且执行新任务action,新任务无入参,无返回值
//public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
// 线程并行执行完成,并且执行新任务action,新任务有入参,无返回值
//public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
//public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
//public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
// 线程并行执行完成,并且执行新任务action,新任务有入参,有返回值
//public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
//public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
//public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
CompletableFuture<Integer> integerCompletableFuture = completableFuture1.thenCombineAsync(completableFuture2, (value1, value2) -> {
System.out.println("接收前任务参数:" + value1 + ":" + value2);
System.out.println("任务3线程:" + Thread.currentThread().getName());
return 3;
},executorService);
System.out.println("main 线程工作");
int result = integerCompletableFuture.get();
System.out.println(result);
}
}
2.3 两个异步任务只要其中一个执行完就执行下一个异步任务
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
// 任务1
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getName());
int i = 10 / 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1结束:");
return i;
}, executorService);
// 任务2
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程:" + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结束");
return 0;
}, executorService);
// 任务并行执行,只要其中有一个执行完,就开始执行新任务action,新任务无入参,无返回值
//public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
// 任务并行执行,只要其中有一个执行完,就开始执行新任务action,新任务有入参(入参类型为Object,因为不确定是哪个任务先执行完成),无返回值
//public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
//public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
//public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);
// 任务并行执行,只要其中有一个执行完,就开始执行新任务action,新任务有入参(入参类型为Object,因为不确定是哪个任务先执行完成),有返回值
//public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
//public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
//public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);
CompletableFuture<Integer> integerCompletableFuture = completableFuture1.applyToEitherAsync(completableFuture2, (value) -> {
System.out.println("接收前任务参数:" + value.toString());
System.out.println("任务3线程:" + Thread.currentThread().getName());
return 3;
},executorService);
System.out.println("main 线程工作");
int result = integerCompletableFuture.get();
System.out.println(result);
}
}
2.4 多任务组合(超过两个任务)
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
// 任务1
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> 1, executorService);
// 任务2
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2, executorService);
// 任务3
CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> 3, executorService);
//返回3个任务中最快执行任务完成的结果
CompletableFuture<Object> anyOf = completableFuture1.anyOf(completableFuture1, completableFuture2, completableFuture3);
Object o1 = anyOf.get();
Object o2 = anyOf.join();
//等待3个任务全部执行完毕,在逐一拿回返回结果
//completableFuture1.allOf(completableFuture1, completableFuture2, completableFuture3);
System.out.println("main 线程工作");
System.out.println(o1+":"+o2);
}
}