作者: 一字马胡
转载标志 【2017-12-08】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-12-08 | 学习Future的总结 | 关于Future的深入学习内容 |
导入
深度学习Java Future 系列:
第一篇文章基于FutureTask的Future基本实现来分析了Java Future的基本原理,FutureTask只是Future接口的一个基本实现,并且是作为一个Task对象存在的,FutureTask本身并不管理执行线程池相关的内容,我们生成一个FutureTask对象的动机是我们希望将我们的task包装成一个FutureTask对象,使得我们可以借助FutureTask的特性来控制我们的任务。虽然FutureTask较为简单,但是可以从FutureTask的具体实现中学习一些Future的知识,至少对于Future的定位应该是更进一步的,在进行接下来的内容之前,需要再次重申的是,Future是一个可以代表异步计算结果的对象,并且Future提供了一些方法来让调用者控制任务,比如可以取消任务的执行(当然可能取消会失败),或者设置超时时间来取得我们的任务的运行结果。本文是深度学习Java Future 系列的第二篇文章,和第一篇文章借助FutureTask的具体实现来学习一样,本文也将借助一个具体的Future实现来分析总结,因为CompletableFuture在平时的开发中使用的频率较高,所以本文将选择使用CompletableFuture的具体实现来继续分析Future,试图通过分析CompletableFuture的某些方法的实现来学习关于Future更为深层次的知识。
下面的图片展示了CompletableFuture的类图关系:
可以看到,CompletableFuture同时实现了两个接口,分别为Future和CompletionStage,CompletionStage是CompletableFuture提供的一些非常丰富的接口,可以借助这些接口来实现非常复杂的异步计算工作,基于本文的主题是Future,所以本文不会过多的分析关于CompletionStage的内容,如果想要了解CompletableFuture中关于CompletionStage的一些细节内容,可以参考文章Java CompletableFuture,该文章详细完整的描述了CompletableFuture中关于CompletionStage接口的实现情况。
CompletableFuture
首先来分析一下CompletableFuture的get方法的实现细节,CompletableFuture实现了Future的所有接口,包括两个get方法,一个是不带参数的get方法,一个是可以设置等待时间的get方法,首先来看一下CompletableFuture中不带参数的get方法的具体实现:
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
result字段代表任务的执行结果,所以首先判断是否为null,为null则表示任务还没有执行结束,那么就会调用waitingGet方法来等待任务执行完成,如果result不为null,那么说明任务已经成功执行结束了,那么就调用reportGet来返回结果,下面先来看一下waitingGet方法的具体实现细节:
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
这个方法的实现时比较复杂的,方法中有几个地方需要特别注意,下面先来看一下spins是做什么的,根据注释,可以知道spins是用来在多核心环境下的自旋操作的,所谓自旋就是不断循环等待判断,从代码可以看出在多核心环境下,spins会被初始化为1 << 8,然后在自旋的过程中如果发现spins大于0,那么就通过一个关键方法ThreadLocalRandom.nextSecondarySeed()来进行spins的更新操作,如果ThreadLocalRandom.nextSecondarySeed()返回的结果大于0,那么spins就减1,否则不更新spins。ThreadLocalRandom.nextSecondarySeed()方法其实是一个类似于并发环境下的random,是线程安全的。
接下来还需要注意的一个点是Signaller,从Signaller的实现上可以发现,Signaller实现了ForkJoinPool.ManagedBlocker,下面是ForkJoinPool.ManagedBlocker的接口定义:
public static interface ManagedBlocker {
/**
* Possibly blocks the current thread, for example waiting for
* a lock or condition.
*
* @return {@code true} if no additional blocking is necessary
* (i.e., if isReleasable would return true)
* @throws InterruptedException if interrupted while waiting
* (the method is not required to do so, but is allowed to)
*/
boolean block() throws InterruptedException;
/**
* Returns {@code true} if blocking is unnecessary.
* @return {@code true} if blocking is unnecessary
*/
boolean isReleasable();
}
ForkJoinPool.ManagedBlocker的目的是为了保证ForkJoinPool的并行性,具体分析还需要更为深入的学习Fork/Join框架。继续回到waitingGet方法中,在自旋过程中会调用ForkJoinPool.managedBlock(ForkJoinPool.ManagedBlocker)来进行阻塞工作,实际的效果就是让线程等任务执行完成,CompletableFuture中与Fork/Join的交叉部分内容不再本文的描述范围,日后再进行分析总结。总得看起来,waitingGet实现的功能就是等待任务执行完成,执行完成返回结果并做一些收尾工作。
现在来看reportGet方法的实现细节,在判断任务执行完成之后,get方法会调用reportGet方法来获取结果:
/**
* Reports result using Future.get conventions.
*/
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException {
if (r == null) // by convention below, null means interrupted
throw new InterruptedException();
if (r instanceof AltResult) {
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
x = cause;
throw new ExecutionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
如果result为null,说明任务时被中断的,抛出中断异常,如果result类型为AltResult,代表执行过程中出现异常了,那么就抛出相应的异常,否则,返回result。
分析完了不带参数的get方法(阻塞等待)之后,现在来分析一下带超时参数的get方法的具体实现细节:
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}
和不带参数的get方法一样,还是会判断任务是否已经执行完成了,如果完成了会调用reportGet方法来返回最终的执行结果(或者抛出异常),否则,会调用timedGet来进行超时等待,timedGet会等待一段时间,然后抛出超时异常(或者执行结束返回正常结果),下面是timedGet方法的具体细节:
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
在timedGet中不再使用spins来进行自旋,因为现在可以确定需要等待多少时间了。timedGet的逻辑和waitingGet的逻辑类似,毕竟都是在等待任务的执行结果。
除了两个get方法之前,CompletableFuture还提供了一个方法getNow,代表需要立刻返回不进行阻塞等待,下面是getNow的实现细节:
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
getNow很简单,判断result是否为null,如果不为null则直接返回,否则返回参数中传递的默认值。
分析完了get部分的内容,下面开始分析CompletableFuture最为重要的一个部分,就是如何开始一个任务的执行。下文中将分析supplyAsync的具体执行流程,supplyAsync有两个版本,一个是不带Executor的,还有一个是指定Executor的,下面首先分析一下不指定Executor的supplyAsync版本的具体实现流程:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
可以看到supplyAsync会调用asyncSupplyStage,并且指定一个默认的asyncPool来执行任务,CompletableFuture是管理执行任务的线程池的,这一点是和FutureTask的区别,FutureTask只是一个可以被执行的task,而CompletableFuture本身就管理者线程池,可以由CompletableFuture本身来管理任务的执行。这个默认的线程池是什么?
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
首先会做一个判断,如果条件满足就使用ForkJoinPool的commonPool作为默认的Executor,否则会使用一个ThreadPerTaskExecutor来作为CompletableFuture来做默认的Executor。
接着看asyncSupplyStage,我们提交的任务会被包装成一个AsyncSupply对象,然后交给CompletableFuture发现的Executor来执行,那AsyncSupply是什么呢?
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
观察到AsyncSupply实现了Runnable,而Executor会执行Runnable的run方法来获得结构,所以主要看AsyncSupply的run方法的具体细节,可以看到,run方法中会试图去获取任务的结果,如果不抛出异常,那么会调用CompletableFuture的completeValue方法,否则会调用CompletableFuture的completeThrowable方法,最后会调用CompletableFuture的postComplete方法来做一些收尾工作,主要来看前两个方法的细节,首先是completeValue方法:
/** Completes with a non-exceptional result, unless already completed. */
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t);
}
completeValue方法会调用UNSAFE.compareAndSwapObject来讲任务的结果设置到CompletableFuture的result字段中去。如果在执行任务的时候抛出异常,会调用completeThrowable方法,下面是completeThrowable方法的细节:
/** Completes with an exceptional result, unless already completed. */
final boolean completeThrowable(Throwable x) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
encodeThrowable(x));
}
指定Executor的supplyAsync方法和没有指定Executor参数的supplyAsync方法的唯一区别就是执行任务的Executor,所以不再赘述。
到这里,可以知道Executor实际执行的代码到底是什么了,回到asyncSupplyStage方法,接着就会执行Executor.execute方法来执行任务,需要注意的是,asyncSupplyStage方法返回的是一个CompletableFuture,并且立刻返回的,具体的任务处理逻辑是有Executor来执行的,当任务处理完成的时候,Executor中负责处理的线程会将任务的执行结果设置到CompletableFuture的result字段中去。
本文的内容到此也就结束了,上文中提到,CompletableFuture提供了大量实用的方法来支持我们的异步任务,具体提供的方法可以参考上文中提供的链接,或者直接参考jdk源码、javadoc来获取更为详细的内容,本文的目的是解析CompletableFuture的任务处理流程,并且试图分析Future在CompletableFuture中的使用,以更深入的理解Future,结合第一篇深度学习Java Future系列的文章,希望可以更加深入的理解Future,并且知道Future在java并发编程、异步计算中的重要作用。