作为一个java为主语言的后端开端,实际使用异步的场景也不多,对于Futrue等仅仅在会使用的阶段,内部的原理一知半解,借着学习CompletableFutrue的机会,重新整理自己的知识体系
一、Future
参考资料
Future、FutureTask实现原理浅析[https://blog.csdn.net/u012881584/article/details/85121144]
1.1 Future和FutureTask
Future只是一个接口,且只有一个get方法
FutureTask是真正的内部实现,他是一个实现Runnable, Future<V>的类
- 本质上是调用线程,调用后,会park住,而真正的执行线程完成执行后,会将这些线程全部唤醒
1.2 Future.get时发生了什么?
对于使用者来说,我们基本上会用future.get即可,那么此时到底发什么了什么?
查看FutureTask.get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
大致上,可以看到FutureTask有一个state的标记位,当state满足要求时,返回结果。
当不满足时,会awaitDown,等待执行完成或超时
- awaitDown是如何实现的
FutureTask内部有一个单向链表,可以认为value为当前的线程
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
可以得知,主流程就是第一次进来创建一个节点,然后将当前线程赋值,如果状态还是执行中,就调用park方法将当前线程暂停。如果有多个线程还要get,则将这些线程放到单向列表中
这就起到了阻塞的作用
1.3 FutureTask如何异步获得结果
上文的get方法最终会阻塞,那么肯定会有一个方法执行后去唤醒暂停的线程
查看FutureTask的run方法,前面都是常规的调用实际方法,获得result,关键看获得result后干了什么
这里有一个状态的改变state从NEW->COMPLETING
可以看到,这里会通过一个for循环,将所有waitnode里的线程全部唤醒,异步调用完成
二、CompletableFuture
了解了Future的使用,这里就要谈谈Future的局限性。Future很难直接表述多个Future 结果之间的依赖性,开发中,我们经常需要达成以下目的:
Future vs CompletableFuture
Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future的主要缺点如下:
- 不支持链式调用
这个指的是对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。 - 不支持多个Future合并
比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。 - 不支持异常处理
Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。
2.1 CompletableFuture的使用
1 .因为CompletableFuture实现了Future接口,所以你自然可以把他当future使用,实现原理基本相同,只是没有利用futuretask,而是在类自身就完成了
2.因为CompletableFuture实现了CompletionStage,所以可以完成链式调用
如
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).thenApply(r -> {
System.out.println("r");
return r + "2";
});
System.out.println(a.get());
}
2.2 CompletableFuture的实现
异步执行的思路主要是以下3步:
1.执行任务
2.添加任务完成之后的动作(回调方法)
3.执行回调
2.2.1执行任务
和future相同,supplyAsync
public final boolean exec() {
CompletableFuture<U> d; U u; Throwable ex;
if ((d = this.dst) != null && d.result == null) {
try {
//执行任务的代码
u = fn.get();
ex = null;
} catch (Throwable rex) {
ex = rex;
u = null;
}
d.internalComplete(u, ex);
}
return true;
}
2.2.2执行回调
再看这个肯定会有一个执行回调的方法,就是在d.internalComplete(u, ex)里最终调用的postComplete,顾名思义就是在执行完成后调用
/**
* Removes and signals all waiting threads and runs all completions.
*/
final void postComplete() {
WaitNode q; Thread t;
while ((q = waiters) != null) {
if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
!从任务链的头部开始执行回调任务,这里的语法比较精炼,nodes不断遍历,h只是一个保存中间状态的节点
CompletionNode h; Completion c;
while ((h = completions) != null) {
if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
(c = h.completion) != null)
c.run();
}
}
可以看到waitNode和future是一样的,唤醒所有调用get方法park住的线程。但后面还有个CompletionNode ,这个node就是一个任务链,属性completion实现了Runnable,是一个可以调度的任务
static final class CompletionNode {
final Completion completion;
volatile CompletionNode next;
CompletionNode(Completion completion) { this.completion = completion; }
}
// Opportunistically subclass AtomicInteger to use compareAndSet to claim.
@SuppressWarnings("serial")
abstract static class Completion extends AtomicInteger implements Runnable {
}
2.2.3添加回调任务
既然已经看懂了回调任务的执行,那么肯定会有一个环节添加任务完成之后的动作
例子中thenApply()就是一个异步任务执行完后需要完成的动作,即回调,我们可以看看内部实现,是什么时候添加进去的
private <U> CompletableFuture<U> doThenApply
(Function<? super T,? extends U> fn,
Executor e) {
if (fn == null) throw new NullPointerException();
CompletableFuture<U> dst = new CompletableFuture<U>();
!ThenApply是一个继承Completion的任务
ThenApply<T,U> d = null;
Object r;
!如果当前任务没完成,则创建一个任务节点,添加任务,
!将任务节点追加的当前completableFuture的任务链中,可以看到这里追加是追加在头部的
if ((r = result) == null) {
CompletionNode p = new CompletionNode
(d = new ThenApply<T,U>(this, fn, dst, e));
while ((r = result) == null) {
if (UNSAFE.compareAndSwapObject
(this, COMPLETIONS, p.next = completions, p))
break;
}
}
!如果当时任务已经完成了,我们主要看没有报错的情况,直接执行 u = fn.apply(t);直接当前线程就执行了
if (r != null && (d == null || d.compareAndSet(0, 1))) {
T t; Throwable ex;
if (r instanceof AltResult) {
ex = ((AltResult)r).ex;
t = null;
}
else {
ex = null;
@SuppressWarnings("unchecked") T tr = (T) r;
t = tr;
}
U u = null;
if (ex == null) {
try {
if (e != null)
execAsync(e, new AsyncApply<T,U>(t, fn, dst));
else
u = fn.apply(t);
} catch (Throwable rex) {
ex = rex;
}
}
if (e == null || ex != null)
dst.internalComplete(u, ex);
}
helpPostComplete();
return dst;
}
从以上分析可以看出,其实你的任务真正是哪个线程执行的,还和你前序任务的执行时间有关,我们做个测试
- 第一种情况,前序的任务执行很快
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("ThreadName:"+Thread.currentThread().getName());
return "result";
}).thenApply(r -> {
System.out.println("ThreadName:"+Thread.currentThread().getName());
return r + "2";
});
System.out.println(a.get());
}
结果为,可以看到是主线程执行的
ThreadName:ForkJoinPool.commonPool-worker-1
ThreadName:main
result2
- 第二种情况,前序的任务执行比较慢
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("ThreadName:"+Thread.currentThread().getName());
try {
Thread.sleep(3*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result";
}).thenApply(r -> {
System.out.println("ThreadName:"+Thread.currentThread().getName());
return r + "2";
});
System.out.println(a.get());
}
结果为,可以看到是第一个异步任务的线程执行了
ThreadName:ForkJoinPool.commonPool-worker-1
ThreadName:ForkJoinPool.commonPool-worker-1
result2
2.3 CompletableFuture的异常处理
多线程的异常总是比较复杂的,当你真正要在生产运行时,你肯定得考虑到各种异常分支,特别是当你大量使用链式调用后,此时如果某一个任务出现了异常,后续会如何处理?
先看执行结果为result,可以看到要么是真正的result,要么是一个 AltResult,里面其实就是exception
volatile Object result; // Either the result or boxed AltResult
static final class AltResult {
final Throwable ex; // null only for NIL
AltResult(Throwable ex) { this.ex = ex; }
}
- 当程序出现异常发生时,result会变成什么?即最终的CompletableFuture会变成什么?
这里三元运算符比较多,得慢慢理。我们来理一下有异常,即ex!=null
1.先判断ex是否为null,此处不为null
2.取 new AltResult((ex instanceof CompletionException) ? ex :
new CompletionException(ex)));
3.此处ex假设不是CompletionException,则转化为CompletionException
4.最终包装成AltResult
/**
* Triggers completion with the encoding of the given arguments:
* if the exception is non-null, encodes it as a wrapped
* CompletionException unless it is one already. Otherwise uses
* the given result, boxed as NIL if null.
*/
final void internalComplete(T v, Throwable ex) {
if (result == null)
UNSAFE.compareAndSwapObject
(this, RESULT, null,
(ex == null) ? (v == null) ? NIL : v :
new AltResult((ex instanceof CompletionException) ? ex :
new CompletionException(ex)));
postComplete(); // help out even if not triggered
}
- 现在我们知道了异常最终也会包含在CompletaFuture中,那么实际业务代码要如何编写呢?
public T get() throws InterruptedException, ExecutionException {
Object r; Throwable ex, cause;
if ((r = result) == null && (r = waitingGet(true)) == null)
throw new InterruptedException();
if (!(r instanceof AltResult)) {
@SuppressWarnings("unchecked") T tr = (T) r;
return tr;
}
if ((ex = ((AltResult)r).ex) == null)
return null;
if (ex instanceof CancellationException)
throw (CancellationException)ex;
if ((ex instanceof CompletionException) &&
(cause = ex.getCause()) != null)
ex = cause;
throw new ExecutionException(ex);
}
当你最后调用get方法时,会抛出异常。这时候你可以通过handle进行处理,他的功能时最强大的
参考文档:https://mincong.io/2020/05/30/exception-handling-in-completable-future/
如:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("ThreadName:" + Thread.currentThread().getName());
int b=1/0;
return "result";
}).thenApply(r -> {
System.out.println("ThreadName:" + Thread.currentThread().getName());
return r + "2";
}).handle((result,ex)->{
if (ex!=null){
System.out.println("Error:"+ex.toString());
return null;
}else {
System.out.println("success:"+result);
return result;
}
});
System.out.println(a.get());
Thread.sleep(10*1000);
}