从上图可以看出,FutureTask实现了RunnableFuture接口,而RunnableFuture则继承了Runnable和Future接口。由于Runnalbe接口大家都比较熟悉,因此,咱们就从Future说起。
Future初探
Future是什么?从java语言本身来讲,它就是一个简单的接口,定义了如下几个方法。
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get()throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
那么这个接口的用途是什么呢?从这个类的注释就可以看出:Future代表了异步计算的结果。
也就是说,可以通过Future的实现类封装异步计算的结果,然后可以通过get方法获取计算结果、通过cancle取消异步计算,也可以通过isCanclled、isDone等方法来判断此次计算是否已经被取消或者执行完毕。
FutureTask
FutureTask是目前Future接口的实现类,下面我们看下FutureTask的实现。
主要看一下get方法,其用来获取计算结果,当计算没完成时,则等待程序执行完成。
public V get()throws InterruptedException, ExecutionException {
int s =state;
if (s <=COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
逻辑很简单,如果状态<=COMPLETING,则等待任务执行完成,然后获取执行结果,返回。
重点看一下awaitDone方法:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos :0L;
WaitNode q =null;
boolean queued =false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s =state;
if (s >COMPLETING) {
if (q !=null)
q.thread =null;
return s;
}
else if (s ==COMPLETING)// cannot time out yet
Thread.yield();
else if (q ==null)
q =new WaitNode();
else if (!queued)
queued =UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next =waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <=0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
暂且不关心任务被中断的情况。上述代码根据state字段进行逻辑控制:
如果>COMPLETING,表明任务执行成功,除了返回状态外,如果WaitNode不为空,还将其thread置为null,为何这么操作,后面再来分析;
如果当前任务正在COMPLETING状态,执行yield方法,释放cpu时间片。之所以执行yield方法,而不是sleep方法,是因为COMPLETING到NOMAL时间较短,通过使线程转为就绪状态,相对于sleep方法使线程变为阻塞状态,效率更高。
如果状态为未完成,则进行一次循环,如果一次循环后,还是未完成状态,则通过UNSAFE.compareAndSwapObject方法,将当前线程加入到WaitNode中:
UNSAFE.compareAndSwapObject(this, waitersOffset, q.next =waiters, q);
compareAndSwapObject的意思为:通过cas方法,当前对象waiters属性等于waiters,让waiters属性等于q。
如果入队后,还是未完成状态,则调用LockSupport.park方法,阻塞当前线程。
简单介绍一下LockSupport。park方法会阻塞当前线程,unpark函数则是将指定线程唤醒。与Object的wait/notify相比,优势在于操作更精准,可以准确地唤醒某一个线程。
既然状态这么重要,咱们就看看FutureTask的状态是怎么变更的。
首先是新建任务:
public FutureTask(Callable callable) {
if (callable ==null)
throw new NullPointerException();
this.callable = callable;
this.state =NEW; // ensure visibility of callable
}
不如是通过哪个构造方法,state都置为NEW。
再看看run方法执行对于state的影响:
public void run() {
if (state !=NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c =callable;
if (c !=null &&state ==NEW) {
V result;
boolean ran;
try {
result = c.call();
ran =true;
}catch (Throwable ex) {
result =null;
ran =false;
setException(ex);
}
if (ran)
set(result);
}
}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner =null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s =state;
if (s >=INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
如果非NEW状态,则直接返回;
如果执行线程不为空,表明已经有线程在执行,直接返回,否则将runner置为当前线程
调用Callable的call方法,执行成功之后,将状态置为COMPLETING,然后将outcome设置为执行结果,再讲状态置为NORMAL。
下面重点来了:
finishCompletion方法,会遍历watiers,逐个调用unpark方法,唤醒上面get方法中被阻塞的线程。
总结
到这里,核心流程就分析完了。总结一下。
Callable通过增加返回值,解决了以往线程无法有返回值的问题。而Future接口,则是为了获取Callable的响应值。其核心逻辑为,调用get方法时,如果Callable任务没执行完,则阻塞get方法,直至任务执行完成。FutureTask为Future的实现类,其通过LockSupport的park和unpark,提升get的效率。FutureTask还实现了Runnable接口,这样其除了可以通过ExecuteService执行,还可以用过Thread来进行执行。