Future三重奏第二章:FutureTask源码解析

Future系列文章

Future三重奏第一章:Future设计模式及代码示例
Future三重奏第二章:FutureTask源码解析
Future三重奏第三章:FutureTask中一些知识点的总结与补充

FutureTask是做什么的

futureTask是一种可取消的异步任务,通过调用get()方法获取异步执行的返回结果,如果异步任务还没有完成,get()方法将会阻塞调用线程,将当前线程挂起,直到任务执行结束,将会唤醒被挂起的线程,完成整个调用过程

FutureTask是如何贯彻future模式的思想

在上一章节中,我们有几个基础类来共同实现了future设计模式,实际上这些过程在futureTask类的源码中都帮我们做了

  • Main类:发起请求,构建一个Client类并且返回一个date对象
  • Client类:启动一个线程,并且创建一个业务类,该业务类返回实际获取的数据,在获取实际数据后,Client创建futureData类通过set方法将返回数据保存进futureData对象中,并返回futureData对象
  • RealData类:实际业务类
  • FutureData类:实现得到数据和设置数据的中间类,里面实现了set()方法和get()方法,set()方法将数据写入并返回,get()方法获取装载的数据

如果要实现future模式,需要通过这些类来实现,但是futureTask的源码中已经帮我们实现了这套繁杂的逻辑,下面通过源码的解析来说下futureTask是如何做的

FutureTask源码解析

futureTask使用方式

  • FutureTask + Thread
        EventOrderTask task=new EventOrderTask(eventId);//实际的业务实现类
        FutureTask futureTask=new FutureTask(task);//将任务构建一个futureTask对象
        Thread thread =new Thread(futureTask);//构建并启动线程
        thread.start();

  • Future + ExecutorService
//step1 ......
//step2:创建计算任务
Task task = new Task();
//step3:创建线程池,将Callable类型的task提交给线程池执行,通过Future获取子任务的执行结果
ExecutorService executorService = Executors.newCachedThreadPool();
final Future<Boolean> future = executorService.submit(task);
//step4:通过future获取执行结果
boolean result = (boolean) future.get();
  • FutureTask + ExecutorService
//step1 ......
//step2 ......
//step3:将FutureTask提交给线程池执行
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(futureTask);
//step4 ......

FutureTask源码的基本构成

  1. FutureTask实现了RunnableFuture接口
  • RunnableFuture继承Runnable, Future接口,所以FutureTask是这两个接口的实现类,具备future接口和runnable的特性,需要重写callable接口
  1. 在变量的组成中state代表了当前任务的状态的转变:
  • NEW -> COMPLETING -> NORMAL 初始化 -> 计算完成(还没有将计算结果赋值给) -> 执行完成
  • NEW -> COMPLETING -> EXCEPTIONAL 初始化 -> 计算完成(还没有将计算结果赋值给) -> 抛出异常
  • NEW -> CANCELLED 初始化 -> 取消任务
  • NEW -> INTERRUPTING -> INTERRUPTED 初始化 -> 打断
public class FutureTask<V> implements RunnableFuture<V> {
        private volatile int state;
        private static final int NEW          = 0;//初始化时是NEW
        private static final int COMPLETING   = 1;//任务完成但是尚未赋值给outcome
        private static final int NORMAL       = 2;//任务完成且赋值给outcome
        private static final int EXCEPTIONAL  = 3;//异常状态
        private static final int CANCELLED    = 4;//任务取消
        private static final int INTERRUPTING = 5;//任务打断
        private static final int INTERRUPTED  = 6;//


        // The underlying callable; nulled out after running
        // 是构造函数中传入的计算数据的任务类,该类实现了Callable接口
        private Callable<V> callable;

        // The result to return or exception to throw from get()
        // 实际数据值
        private Object outcome; // non-volatile, protected by state reads/writes

        // The thread running the callable; CASed during run()
        // 运行callable的线程(实际计算的线程)
        private volatile Thread runner;


        //FutureTask的构造方法,传入实现了callable的数据类,对callable进行赋值,在run方法中进行调用
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
    }
    }

解析future实现异步执行任务并获取任务结果

在主函数中启动线程,将会调用futureTask中的run方法启动异步线程,调用数据类,并返回计算结果,将数据封装进futureTask方法中
run方法的执行逻辑
构造futureTask对象的时候,构造函数中传入实现了callable接口的任务类,该类重写了call方法,在run方法中调用重写的call方法并返回计算结果,如果数据获取成功,调用set(result)方法将获取到的数据赋值给outcome变量

public void run() {
        //检查当前future线程的任务执行状态是否是可执行状态,如果不是则直接返回
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;   //获取callable数据类
            if (c != null && state == NEW) {    //如果数据类不是null且状态为NEW
                V result;   //实际返回数据类
                boolean ran;
                try {
                    result = c.call();  //调用实际业务类重写的call方法,获取实际数据
                    ran = true;         //将ran设置为true
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);    //获取到实际参数后调用set方法将数据存进futureTask中
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

在这里我们要重点看下set(result)这个进行赋值的操作
这个方法比较简单,首先是找到当前state在内存中的地址,然后将state的状态变成COMPLETING,注意的是state是volatile变量类型,所以多个子线程在运行的过程中看到的值都是同一个
当state状态被改成COMPLETING成功后,则将异步任务的计算结果赋值给变量outcome,赋值成功后,将state更改成为NORMAL
最后调用方法finishCompletion()唤醒等待返回结果的线程,释放资源

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;//将最终结果赋值给outcome
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();//
        }
    }

调用finishCompletion()的主要作用是唤醒所有等待链表中的等待节点,让所有被挂起的线程可以继续执行awaitDone方法
当waiters不为空的时候,首先将其置为null,然后在获取内部的线程,将所有链表中的线程全部唤醒,在该方法中,唤醒的是该futureTask对象中的所有被挂起的子线程

private void finishCompletion() {
        // assert state > COMPLETING;
        //waiters不为空的时候,waiters是封装该futureTask对象的线程
        for (WaitNode q; (q = waiters) != null;) {
            //如果当前等待线程就是当前线程则置为null
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                //当前循环体唤醒了当前futureTask中所有等待队列中的节点(挂起的线程),目的是什么呢,被唤醒的线程做什么呢
                //被唤醒的线程继续执行各自线程栈中的awaitDone方法,尝试获取对应的结果
                for (;;) {
                    Thread t = q.thread;//获取当前节点中的线程
                    if (t != null) {//如果线程不为空
                        q.thread = null;//将等待节点的线程设置为null
                        LockSupport.unpark(t);//唤醒等待节点中的线程
                    }
                    WaitNode next = q.next;//获取等待节点的下一个节点
                    if (next == null)//如果等待队列已经没有等待节点了,则直接跳出当前循环
                        break;
                    q.next = null; // unlink to help gc
                    q = next;   //将下一个等待节点赋值给q节点
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

至此,futureTask对象则完成了获取任务,计算任务,并将任务进行赋值的过程,在上面的方法中,我们会将被挂起的线程进行唤醒操作,线程被挂起是因为在计算结果没出来之前,通过futureTask对象调用get()方法去获取执行结果了,当执行线程发现结果还没有出来,线程就会被挂起,代码如下:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);   //p2-1:等待计算结果
        return report(s);//返回实际数据
    }

当有线程访问get方法的时候,首先会去判断当前任务状态,如果任务还没有执行完成,则会调用awaitDone对当前线程进行阻塞

private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;//阻塞队列元素,被挂起的线程都会被封装成为该对象
        boolean queued = false;
        //1:如果一旦该线程被中断了,将会移除阻塞队列中的任务,因为futureTask设计有取消任务的操作

        //情景:任务还没有被完成
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) { //2:如果任务完成或是已经取消了
                if (q != null) //如果队列元素有值,则将队列元素设置为null,并返回当前线程任务的状态
                    q.thread = null;
                return s;
            }

            else if (s == COMPLETING) // 3:如果任务完成了(正常/异常)还没有赋值给指定接受元素(不能超时)
                Thread.yield(); //重新选择可执行的线程,emmmmmm
            else if (q == null) //4:如果等待节点为空,则创建一个等待队列元素
                q = new WaitNode();
            //5:如果这个节点还没有加入到等待队列中,则将这个节点加入到队列的头部
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);//waiters是等待队列的头结点

            //6:如果设置了超时时间等待,则在指定的时间内移除等待对垒
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            //7:否则阻塞线程,等待唤醒
            else
                LockSupport.park(this);
        }
    }

代码看起来很多,但其实简单点分析主要做了三步

  • 第一次循环,这时候s<COMPLETING,此时q==null,这时应该创建一个WaitNode对象,节点对象中的线程就是当前线程,继续进行循环
  • 第二次循环,进入方法5,将创建的q加入到链表首位置当中
  • 第三次循环,是否设置了超时时间,然后将线程挂起

至此,就是整个futureTask完成异步任务的执行过程,值得注意的是,futureTask通过cas+volatile的方式,进行数据的同步和通知

  • 如果我的文章确实有帮助到你,请不要忘了点一下文末的喜欢
  • 技术理解不到或有错误请直(bu)接(yao)指(ma)出(wo)
  • 写作不易!


    恒大三叉戟
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352