JDK中提供了一个类Future,代表一个异步计算后的值,但是它实现还比较简陋,一般还要做一次异步值得轮询操作,或者说这个API是阻塞式操作,想要拿到值会阻塞当前的线程。
监听异步结果的值然后执行下一步操作会不会更好? 这样就可以减少等待的时间。
有一些常用的工具已经提供了这个功能,使用起来也很方便
- ListenableFuture 依赖Guava包
- CompletableFuture JDK自带
ListenableFuture
顾名思义,就是可以监听的Future,它主要有两个常用的功能:
- 监听Future返回的结果,成功或者失败都有回调
- 将异步结果串联起来,异步3需要异步2的结果,异步2需要异步1的结果,通过它可以很好的处理这种关系。
CompletableFuture
更完善的Future,它是对Future接口的扩充,并且实现了CompletionStage接口。Future代表一个异步的结果,CompletionStage代表着这个异步当前的完成阶段。同上也具备两个主要功能:
- 监听异步结果的值
- 串联异步操作
Demo
上面提到它们两个的功能点,贴下代码:
package me.aihe;
import com.google.common.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ListenableFutureDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// futureDemo();
// addListner();
// chaninFuture();
// completeFuture();
}
private static void futureDemo() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newFixedThreadPool(2);
Future<String> future = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("计算中");
Thread.sleep(1000);
return "计算后的值";
}
});
String result = future.get();
System.out.println(result);
Thread.sleep(3000);
service.shutdown();
}
private static void completeFuture() {
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
System.out.println("计算中");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "计算后的值";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
}
private static void chaninFuture() throws InterruptedException {
ListeningExecutorService service = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(5)
);
ListenableFuture<String> futureResult = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("计算中");
Thread.sleep(1000);
return "计算后的值";
}
});
ListenableFuture<String> nextFuture = Futures.transformAsync(futureResult, new AsyncFunction<String, String>() {
@Override
public ListenableFuture<String> apply(@Nullable String input) throws Exception {
System.out.println("async future");
return service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "input" + "nextFuture";
}
});
}
}, service);
Futures.addCallback(nextFuture, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
System.out.println(t.getMessage());
}
},service);
Thread.sleep(3000);
service.shutdown();
}
private static void addListner() throws InterruptedException {
ListeningExecutorService service = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(5)
);
ListenableFuture<String> futureResult = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("计算中");
Thread.sleep(1000);
return "计算后的值";
}
});
Futures.addCallback(futureResult, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println(result);
System.out.println("完成回调");
}
@Override
public void onFailure(Throwable t) {
System.out.println(t.getMessage());
}
},service);
Thread.sleep(3000);
service.shutdown();
}
}