Future系列文章
Future三重奏第一章:Future设计模式及代码示例
Future三重奏第二章:FutureTask源码解析
Future三重奏第三章:FutureTask中一些知识点的总结与补充
FutureTask是做什么的
futureTask是一种可取消的异步任务,通过调用get()方法获取异步执行的返回结果,如果异步任务还没有完成,get()方法将会阻塞调用线程,将当前线程挂起,直到任务执行结束,将会唤醒被挂起的线程,完成整个调用过程
FutureTask是如何贯彻future模式的思想
在上一章节中,我们有几个基础类来共同实现了future设计模式,实际上这些过程在futureTask类的源码中都帮我们做了
- Main类:发起请求,构建一个Client类并且返回一个date对象
- Client类:启动一个线程,并且创建一个业务类,该业务类返回实际获取的数据,在获取实际数据后,Client创建futureData类通过set方法将返回数据保存进futureData对象中,并返回futureData对象
- RealData类:实际业务类
- FutureData类:实现得到数据和设置数据的中间类,里面实现了set()方法和get()方法,set()方法将数据写入并返回,get()方法获取装载的数据
如果要实现future模式,需要通过这些类来实现,但是futureTask的源码中已经帮我们实现了这套繁杂的逻辑,下面通过源码的解析来说下futureTask是如何做的
FutureTask源码解析
futureTask使用方式
- FutureTask + Thread
EventOrderTask task=new EventOrderTask(eventId);//实际的业务实现类
FutureTask futureTask=new FutureTask(task);//将任务构建一个futureTask对象
Thread thread =new Thread(futureTask);//构建并启动线程
thread.start();
- Future + ExecutorService
//step1 ......
//step2:创建计算任务
Task task = new Task();
//step3:创建线程池,将Callable类型的task提交给线程池执行,通过Future获取子任务的执行结果
ExecutorService executorService = Executors.newCachedThreadPool();
final Future<Boolean> future = executorService.submit(task);
//step4:通过future获取执行结果
boolean result = (boolean) future.get();
- FutureTask + ExecutorService
//step1 ......
//step2 ......
//step3:将FutureTask提交给线程池执行
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(futureTask);
//step4 ......
FutureTask源码的基本构成
- FutureTask实现了RunnableFuture接口
- RunnableFuture继承Runnable, Future接口,所以FutureTask是这两个接口的实现类,具备future接口和runnable的特性,需要重写callable接口
- 在变量的组成中state代表了当前任务的状态的转变:
- NEW -> COMPLETING -> NORMAL 初始化 -> 计算完成(还没有将计算结果赋值给) -> 执行完成
- NEW -> COMPLETING -> EXCEPTIONAL 初始化 -> 计算完成(还没有将计算结果赋值给) -> 抛出异常
- NEW -> CANCELLED 初始化 -> 取消任务
- NEW -> INTERRUPTING -> INTERRUPTED 初始化 -> 打断
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;//初始化时是NEW
private static final int COMPLETING = 1;//任务完成但是尚未赋值给outcome
private static final int NORMAL = 2;//任务完成且赋值给outcome
private static final int EXCEPTIONAL = 3;//异常状态
private static final int CANCELLED = 4;//任务取消
private static final int INTERRUPTING = 5;//任务打断
private static final int INTERRUPTED = 6;//
// The underlying callable; nulled out after running
// 是构造函数中传入的计算数据的任务类,该类实现了Callable接口
private Callable<V> callable;
// The result to return or exception to throw from get()
// 实际数据值
private Object outcome; // non-volatile, protected by state reads/writes
// The thread running the callable; CASed during run()
// 运行callable的线程(实际计算的线程)
private volatile Thread runner;
//FutureTask的构造方法,传入实现了callable的数据类,对callable进行赋值,在run方法中进行调用
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
}
解析future实现异步执行任务并获取任务结果
在主函数中启动线程,将会调用futureTask中的run方法启动异步线程,调用数据类,并返回计算结果,将数据封装进futureTask方法中
run方法的执行逻辑
构造futureTask对象的时候,构造函数中传入实现了callable接口的任务类,该类重写了call方法,在run方法中调用重写的call方法并返回计算结果,如果数据获取成功,调用set(result)方法将获取到的数据赋值给outcome变量
public void run() {
//检查当前future线程的任务执行状态是否是可执行状态,如果不是则直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; //获取callable数据类
if (c != null && state == NEW) { //如果数据类不是null且状态为NEW
V result; //实际返回数据类
boolean ran;
try {
result = c.call(); //调用实际业务类重写的call方法,获取实际数据
ran = true; //将ran设置为true
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); //获取到实际参数后调用set方法将数据存进futureTask中
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在这里我们要重点看下set(result)这个进行赋值的操作
这个方法比较简单,首先是找到当前state在内存中的地址,然后将state的状态变成COMPLETING,注意的是state是volatile变量类型,所以多个子线程在运行的过程中看到的值都是同一个
当state状态被改成COMPLETING成功后,则将异步任务的计算结果赋值给变量outcome,赋值成功后,将state更改成为NORMAL
最后调用方法finishCompletion()唤醒等待返回结果的线程,释放资源
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;//将最终结果赋值给outcome
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();//
}
}
调用finishCompletion()的主要作用是唤醒所有等待链表中的等待节点,让所有被挂起的线程可以继续执行awaitDone方法
当waiters不为空的时候,首先将其置为null,然后在获取内部的线程,将所有链表中的线程全部唤醒,在该方法中,唤醒的是该futureTask对象中的所有被挂起的子线程
private void finishCompletion() {
// assert state > COMPLETING;
//waiters不为空的时候,waiters是封装该futureTask对象的线程
for (WaitNode q; (q = waiters) != null;) {
//如果当前等待线程就是当前线程则置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//当前循环体唤醒了当前futureTask中所有等待队列中的节点(挂起的线程),目的是什么呢,被唤醒的线程做什么呢
//被唤醒的线程继续执行各自线程栈中的awaitDone方法,尝试获取对应的结果
for (;;) {
Thread t = q.thread;//获取当前节点中的线程
if (t != null) {//如果线程不为空
q.thread = null;//将等待节点的线程设置为null
LockSupport.unpark(t);//唤醒等待节点中的线程
}
WaitNode next = q.next;//获取等待节点的下一个节点
if (next == null)//如果等待队列已经没有等待节点了,则直接跳出当前循环
break;
q.next = null; // unlink to help gc
q = next; //将下一个等待节点赋值给q节点
}
break;
}
}
done();
callable = null; // to reduce footprint
}
至此,futureTask对象则完成了获取任务,计算任务,并将任务进行赋值的过程,在上面的方法中,我们会将被挂起的线程进行唤醒操作,线程被挂起是因为在计算结果没出来之前,通过futureTask对象调用get()方法去获取执行结果了,当执行线程发现结果还没有出来,线程就会被挂起,代码如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); //p2-1:等待计算结果
return report(s);//返回实际数据
}
当有线程访问get方法的时候,首先会去判断当前任务状态,如果任务还没有执行完成,则会调用awaitDone对当前线程进行阻塞
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;//阻塞队列元素,被挂起的线程都会被封装成为该对象
boolean queued = false;
//1:如果一旦该线程被中断了,将会移除阻塞队列中的任务,因为futureTask设计有取消任务的操作
//情景:任务还没有被完成
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { //2:如果任务完成或是已经取消了
if (q != null) //如果队列元素有值,则将队列元素设置为null,并返回当前线程任务的状态
q.thread = null;
return s;
}
else if (s == COMPLETING) // 3:如果任务完成了(正常/异常)还没有赋值给指定接受元素(不能超时)
Thread.yield(); //重新选择可执行的线程,emmmmmm
else if (q == null) //4:如果等待节点为空,则创建一个等待队列元素
q = new WaitNode();
//5:如果这个节点还没有加入到等待队列中,则将这个节点加入到队列的头部
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);//waiters是等待队列的头结点
//6:如果设置了超时时间等待,则在指定的时间内移除等待对垒
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//7:否则阻塞线程,等待唤醒
else
LockSupport.park(this);
}
}
代码看起来很多,但其实简单点分析主要做了三步
- 第一次循环,这时候s<COMPLETING,此时q==null,这时应该创建一个WaitNode对象,节点对象中的线程就是当前线程,继续进行循环
- 第二次循环,进入方法5,将创建的q加入到链表首位置当中
- 第三次循环,是否设置了超时时间,然后将线程挂起
至此,就是整个futureTask完成异步任务的执行过程,值得注意的是,futureTask通过cas+volatile的方式,进行数据的同步和通知
- 如果我的文章确实有帮助到你,请不要忘了点一下文末的喜欢
- 技术理解不到或有错误请直(bu)接(yao)指(ma)出(wo)
-
写作不易!