前面分析了一些java并发控制的源码,接下来换个口味分析一下并发运行的一些相关类。
废话少说,发车,今天分析一下FutureTask的源码,之所以分析他是因为这个类我个人认为这个类跟很多接口有一定关联,因此可以方便一起梳理出来。
说到FutureTask应该不少小伙伴都很熟悉了,他的一个特色其实是能够提供阻塞的线程执行返回结果的,既然说到了返回结果那就不得不说一下Callable接口,Callable接口是Runnable的补充。提供线程执行结果的返回。
先贴一下类关系图
通过类关系图我们就能看得很清楚
这里面为了方便梳理,简单写一下为什么会多线程时候定义Runnable接口,为什么Callable接口的存在。(假装走进了大哥们的内心)
很多人在谈到接口时候会说接口的作用是为了制定一个标准方便其他开发者按照自己的意图去开发相应实现。在并发中Runnable还有Callable的出现的价值也是这样,这里面可能不得不说一下设计模式里面的代理模式了,因为确实有点像先看一个图
1、通过这个图就能看到Callable其实就是帮忙定义了一个规范,当有Callable类型实现类传入时候,程序通过调用run()方法 得以调用实现类里面的代码。通过这种模式将程序实现的逻辑开放给开发人员。Runnable接口类似。
2、结合这张图的几个类成员变量做一个简单介绍,
callable持有一个Callable类型实现类的引用是执行的主要逻辑代码的载体。
outcome 负责接收线程执行结果,可能会使异常所以使用Object类型,而不是V泛型
runner 执行当前代码的线程
waiters 阻塞在当前线程执行结果返回的线程队列,通俗点的一种场景就是 有一个task启动,同时有十几个线程调用get()方法,此时负责计算的线程没有返回结果,那么这十几个线程就要进行阻塞并排队。waiters就是负责排队的一个指向队列头的引用。
waiters队列是doug lea优化后新增的之前用的是AQS的形式进行线程阻塞的,贴一下老版本的实现代码以备温习
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package java.util.concurrent;
import java.util.concurrent.locks.*;
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the <tt>get</tt>
* method will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled.
*
* <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
* {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
* implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
* submitted to an {@link Executor} for execution.
*
* <p>In addition to serving as a standalone class, this class provides
* <tt>protected</tt> functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's <tt>get</tt> method
*/
public class FutureTask<V> implements RunnableFuture<V> {
/** Synchronization control for FutureTask */
private final Sync sync;
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Callable</tt>.
*
* @param callable the callable task
* @throws NullPointerException if callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt>
* @throws NullPointerException if runnable is null
*/
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
public boolean isCancelled() {
return sync.innerIsCancelled();
}
public boolean isDone() {
return sync.innerIsDone();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}
/**
* Protected method invoked when this task transitions to state
* <tt>isDone</tt> (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }
/**
* Sets the result of this Future to the given value unless
* this future has already been set or has been cancelled.
* This method is invoked internally by the <tt>run</tt> method
* upon successful completion of the computation.
* @param v the value
*/
protected void set(V v) {
sync.innerSet(v);
}
/**
* Causes this future to report an <tt>ExecutionException</tt>
* with the given throwable as its cause, unless this Future has
* already been set or has been cancelled.
* This method is invoked internally by the <tt>run</tt> method
* upon failure of the computation.
* @param t the cause of failure
*/
protected void setException(Throwable t) {
sync.innerSetException(t);
}
// The following (duplicated) doc comment can be removed once
//
// 6270645: Javadoc comments should be inherited from most derived
// superinterface or superclass
// is fixed.
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
public void run() {
sync.innerRun();
}
/**
* Executes the computation without setting its result, and then
* resets this Future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
* @return true if successfully run and reset
*/
protected boolean runAndReset() {
return sync.innerRunAndReset();
}
/**
* Synchronization control for FutureTask. Note that this must be
* a non-static inner class in order to invoke the protected
* <tt>done</tt> method. For clarity, all inner class support
* methods are same as outer, prefixed with "inner".
*
* Uses AQS sync state to represent run status
*/
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;
/** State value representing that task is ready to run */
private static final int READY = 0;
/** State value representing that task is running */
private static final int RUNNING = 1;
/** State value representing that task ran */
private static final int RAN = 2;
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;
/** The underlying callable */
private final Callable<V> callable;
/** The result to return from get() */
private V result;
/** The exception to throw from get() */
private Throwable exception;
/**
* The thread running task. When nulled after set/cancel, this
* indicates that the results are accessible. Must be
* volatile, to ensure visibility upon completion.
*/
private volatile Thread runner;
Sync(Callable<V> callable) {
this.callable = callable;
}
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}
/**
* Implements AQS base acquire to succeed if ran or cancelled
*/
protected int tryAcquireShared(int ignore) {
return innerIsDone() ? 1 : -1;
}
/**
* Implements AQS base release to always signal after setting
* final done status by nulling runner thread.
*/
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
boolean innerIsCancelled() {
return getState() == CANCELLED;
}
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
void innerSet(V v) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
releaseShared(0);
done();
return;
}
}
}
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0);
done();
return true;
}
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
boolean innerRunAndReset() {
if (!compareAndSetState(READY, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, READY);
} catch (Throwable ex) {
setException(ex);
return false;
}
}
}
}
回过头来分析新版本的代码
报告执行结果以及抛出异常report方法
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
report方法主要负责反馈执行结果,如果执行正常就通过类型转化将执行结果返回,否则抛出异常,这也是为什么outcome使用object类型,因为不确定是异常还是正常执行的返回结果
两个构造方法进行成员变量赋值
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
构造方法初始化成员变量,如果传入Runnable类型参数,则转化为callable并进行初始化
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
返回当前task是否被取消 4、5、6
public boolean isCancelled() {
return state >= CANCELLED;
}
返回当前任务是否执行了,不保证执行完 真正执行完且返回正常结果是NORMAL
public boolean isDone() {
return state != NEW;
}
核心方法run()方法
public void run() {
如果当前任务状态不为新任务或者不能为任务添加当前线程为执行者则返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
调用执行逻辑进行代码执行
result = c.call();
ran = true;
} catch (Throwable ex) {
执行异常,将异常赋值给outcome
result = null;
ran = false;
setException(ex);
}
if (ran)
将执行结果赋值给成员变量outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
关联方法 set(result);setException(ex)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
这里一个小细节是两次原子操作,中间包含着赋值操作,李大爷这样做的原因是为了保障在状态判断的时候能够得到outcome的正确引用,(在state修改为COMPLETING后并不一定能保障outcome值被赋值成功)所以多加一个原子操作,然后通过第二个原子操作确保outcome被赋值成功,保程序得到预期结果
取消任务方法cancel(boolean mayInterruptIfRunning)
public boolean cancel(boolean mayInterruptIfRunning) {
当前任务状态为新任务,并且修改为终端中或者取消状态失败
意味着其他线程执行状态修改,那么将无法阻止线程执行,返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
当前线程为新任务并且能够成功修改任务状态,那么就将任务的执行线程打上中断标记
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
阻塞等待结果的方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
将这三个方法放到一起比较简单,当线程调用get()方法时候如果当前任务状态未完成则进入等到完成方法中如果执行完成则调用report方法进行结果返回(就是对象内成员变量的一个赋值)。
执行并不返回执行结果
具体作用没get到,有懂得可以帮忙留言赐教
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
让已经取消的任务让出cpu时间片,通过循环判断任务状态,调用线程yield方法进行时间片让度
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
任务执行完调用方法 finishCompletion()
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
将阻塞在waiters队列中的线程唤醒,唤醒后当然是执行get()操作调用的代码喽,进行结果的获取。这里里面支持子类扩展done()方法,设计自己的逻辑。
多线程通过get阻塞逻辑实现代码
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
默认没有入队列
boolean queued = false;
死循环进行调用
for (;;) {
当前执行任务线程如果被标记为中断,也就是任务取消
从队列中移除当前节点并抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
当前线程可能执行完成,或者取消等状态,将线程赋值为空
帮助gc
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
任务执行完了可能还没完成结果的赋值操作,先让取数线程让出时间片
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
如果当前排队节点是空则创建一个新的等待节点准备入队
else if (q == null)
q = new WaitNode();
入队进行排队等待
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
如果是设置了超时,超时则取消排队
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
未设置超时等待,进行park线程挂起
LockSupport.park(this);
}
}
取消队列中等待线程
private void removeWaiter(WaitNode node) {
if (node != null) {
释放节点对应的线程
node.thread = null;
分散执行,顺带检查其他节点中thread为空的,进行释放
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
最后做个简单总结,FutureTask之所以能够做到多线程实行是因为实现了Runnable接口,但是实现run() 方法我们日常变成是要程序员自行定义逻辑的,这就不灵活了,所以持有一个Callable引用,让程序员将执行逻辑封装到,call方法之中。并且在对象内部定义了outcome对象,负责接收call()方法返回的结果。为了提升并发,做到一个线程运算,其他线程也能阻塞到当前get()方法,李大爷设计了一个waiters队列,用于进行结果获取线程的排队。当计算线程完成计算,进行结果赋值时候会唤醒等到队列,等待队列中的线程回去尝试获取结果并返回。