概述
CompletableFuture是1.8加入的异步处理的Future,我们知道之前有个Future接口,但调用future.get方法时,主线程会阻塞,有时还是影响性能,能不能给你个回调函数处理完后接着处理就好。CompletableFuture就这样诞生了。
类结构
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // 异步处理的结果或者异常数据
volatile Completion stack; // Top of Treiber stack of dependent actions
。。。
}
上面简单看了CompletableFuture的类结构,可以看到CompletableFuture实现了Future、CompletionStage接口,我们着重看下CompletionStage中的方法,提供的异步处理能力。
例子
private static void testUseAsyncMethod() throws InterruptedException, ExecutionException {
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
sleepOneSecond();
return "CC";
}).thenApply(name -> {
return "Hello " + name;
}).thenApply(greeting -> {
return greeting + ", Welcome to the World";
});
System.out.println(result.get());
}
例子仍然能够在github中下载,这里贴出来一部分方便我们串讲源码。
我们先看第一个使用的supplyAsync方法。
//静态方法,如果没有传入线程池,使用ForkJoinPool的common线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
//新建CompletableFuture对象
CompletableFuture<U> d = new CompletableFuture<U>();
//构造AsyncSupply对象,线程池提交AsyncSupply任务
e.execute(new AsyncSupply<U>(d, f));
//将CompletableFuture对象返回
return d;
}
//可以看到AsyncSupply是一个Runnable对象
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
//重写exec方法、 run方法
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
//CompletableFuture对象的result为空时
if (d.result == null) {
try {
//调用传入的supplier的get方法,并将结果放入result字段
//注意:这是在线程池中提交的,所以是异步处理的
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//处理完依赖方法后,处理栈顶方法,会和后面的回调方法入栈呼应
d.postComplete();
}
}
}
final void postComplete() {
//弹出依赖的栈顶数据进行处理
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
再看下例子中thenApply方法,异步处理完supplyAsync后的回调方法。
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
//线程池为空,则尝试看依赖的方法有没有处理完成,或发生异常
//线程池不为空或者依赖的方法还没有处理完成,则将回调方法构造成UniApply,push到等待栈里面
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
//尝试处理目标回调方法
c.tryFire(SYNC);
}
return d;
}
一起看看uniApply方法依赖的方法处理完成则尝试处理回调方法
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
//a是依赖方法,result为空,说明还没有处理完成返回false
if (a == null || (r = a.result) == null || f == null)
return false;
//如果依赖的方法发生了异常,处理异常并返回
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
//判断是否可以执行,不可以执行则直接返回false
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
//到了这里说明依赖的方法处理完成了,调用回调方法调用用依赖方法的结果result做参数
//注意这里都是线程池中的线程在执行,所以是异步执行
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
push 方法
final void push(UniCompletion<?,?> c) {
if (c != null) {
//将回调方法C push到栈顶
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
}
}
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
tryFire方法
//再次尝试判断依赖方法是否处理完成,处理完成则调用目标回调方法
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
//成功处理完依赖方法和回调方法后,进行处理,可能唤醒其他的回调方法或者清理栈
return d.postFire(a, mode);
}
CompletableFuture其他还有很多方法,但都类似,主要都用了上面的方法。