FutureTask源码分析

对于java的并发编程来说,我们都知道Thread和runnable,这是创建一个线程最基本的两种方法,但是这两种方法创建的线程是不支持对线程的执行结果进行返回的。虽然我们可以通过传递引用的方式实现,但是实现起来未免太复杂。这个时候我们可能要用到Callable,callable是一个JDK提供的一个支持线程返回结果的一个接口,通过实现call方法,能返回指定泛型的变量。

class CallableTask implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        System.out.println("call runing");
        Thread.sleep(5000);
        return 1;
    }
}
public class CallableTest {
    
    public static void main(String args[]){
        
        CallableTask task = new CallableTask();
        try {
            System.out.println("call start");
            ExecutorService service = Executors.newSingleThreadExecutor();
            Future fu = service.submit(task);
            System.out.println(fu.get());
            service.shutdown();
            System.out.println("call end");
        } catch (Exception e) {
            e.printStackTrace();
        }
}
}

可以通过线程池去实现任务的提交,任务提交后会返回future对象,通过get方法即可获得返回值。
注意:这里其实是不推荐调用call方法的,实际上直接调用call方法和runnable的run方法效果是一样的。

其实JDK提供了一种更好的提交方式,它可以将Runnable和Callable进行封装,以便于提交到线程池。并且可以对线程有更好的控制,比如取消线程的执行,它就是FutureTask。

FutureTask只是简单的对Callable以及Runnable进行了封装,提供了额外的对线程控制的功能以及阻塞获取请求结果的功能,其实对于线程池的submit方法,对于每一个任务都会封装成一个FutureTask来运行。

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    /**
     * Returns a <tt>RunnableFuture</tt> for the given callable task.
     *
     * @param callable the callable task being wrapped
     * @return a <tt>RunnableFuture</tt> which when run will call the
     * underlying callable and which, as a <tt>Future</tt>, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task.
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

那么FutureTask到底是怎么实现的呢?

首先看构造方法:

    /**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Callable</tt>.
     *
     * @param  callable the callable task
     * @throws NullPointerException if callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    /**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

FutureTask可以接受Runnable以及Callable两种类型的参数,在初始化的时候内部构造了一个Sync的AQS实现类的实例,对于runnable类型的线程需要转化成Callable,同时可以指定返回值。
当我们再观察其他方法的时候,几乎都是委托Sync去处理的,那么重点就放在了Sync上。
首先看看Sync里面有几个状态:

        /** State value representing that task is ready to run */
        private static final int READY     = 0;//准备就绪
        /** State value representing that task is running */
        private static final int RUNNING   = 1;//正在运行
        /** State value representing that task ran */
        private static final int RAN       = 2;//运行完毕
        /** State value representing that task was cancelled */
        private static final int CANCELLED = 4;//任务取消

一个FutureTask的实例就在上面几个状态之间进行轮转,当执行线程时调用run方法,run方法又委托Syn的innerRun方法:

    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    public void run() {
        sync.innerRun();
    }
   //首先CAS将status置为RUNING,可以防止结束前重复提交
        void innerRun() {
            if (!compareAndSetState(READY, RUNNING))
                return;

            runner = Thread.currentThread();
            //double check 防止在此之前被cancel
            if (getState() == RUNNING) { // recheck after setting thread
                V result;
                try {
                    result = callable.call();
                } catch (Throwable ex) {
                    setException(ex);
                    return;
                }
                //设置结果
                set(result);
            } else {
                //清除runner,唤醒阻塞线程
                releaseShared(0); // cancel
            }
        }

当执行线程的时候,首先做的是将AQS的状态由READY变成RUNNING,因为Sync是AQS的实现类,这个也是改变AQS的状态,改变状态之后进行double check,此时是为了防止在这之前有Cancel的请求。如果Cancel了,那么releaseShared清除状态并且唤醒get等待的线程。如果为Running状态,接下来调用call方法,这里也就是为什么要提交到线程池执行了,注意call方法调用只是一个方法调用,而不像Thread.start那样会直接返回,并且开启新线程执行。当执行完毕之后,调用Set,Set其实也是委托给Sync的innerSet:

    /**
     * Sets the result of this Future to the given value unless
     * this future has already been set or has been cancelled.
     * This method is invoked internally by the <tt>run</tt> method
     * upon successful completion of the computation.
     * @param v the value
     */
    protected void set(V v) {
        sync.innerSet(v);
    }

        void innerSet(V v) {
            for (;;) {
                int s = getState();
                if (s == RAN)
                    return;
                //收到取消信号,不设置结果,直接返回
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
                //设置结果,并设置当前的状态为RAN
                if (compareAndSetState(s, RAN)) {
                    //设置内容
                    result = v;
                    //唤醒阻塞线程
                    releaseShared(0);
                    done();
                    return;
                }
            }
        }

这里在Set的时候呢,首先也是判断状态如果是RAN直接返回,如果取消了,那么唤醒get等待的线程,并且返回。如果都没有,那么设置FutureTask状态为RAN,表示线程执行完了,同时设置restult为返回值,唤醒所有的等待线程。
上面其实在执行前和执行后都做了Cancel的检查,如果取消,无论执行前后都是没有结果set给result的。
接下来看看是怎么实现阻塞等待结果的,首先看get方法:

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

            V innerGet() throws InterruptedException, ExecutionException {
            //共享锁,没有完成会阻塞在这
            acquireSharedInterruptibly(0);
            //如果已经取消,那么抛出异常
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }

同样是委托机制,其实关键在于acquireSharedInterruptibly方法。

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //如果目前是RAN状态或者是Cancel状态的话标识已经完成或者结束
            doAcquireSharedInterruptibly(arg);//等待Task运行结束,唤醒阻塞队列
    }
       /**
         * Implements AQS base acquire to succeed if ran or cancelled
         */
       protected int tryAcquireShared(int ignore) {
            return innerIsDone() ? 1 : -1;
        }
        boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }
        private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }

其实这里还是使用了委托的机制,同时呢采用了一个共享锁去实现同步,共享锁有一个特点就是允许多个线程获取锁,其实这里对于get操作,其实多个线程同时get是没有问题的,并且如果使用独占锁会降低性能,这里引入共享锁感觉是比较巧妙的。

上面代码将的是,首先线程回去check当前FutureTask的状态,如果是RAN或者Cancel,表示线程已经结束,那么直接返回,如果当前不是上面状态,证明此时线程没执行或者没执行完,那么需要阻塞等待,所以执行doAcquireSharedInterruptibly,让线程等待,等待innerSet之后或者Cancel之后的releaseShared。releaseShared会逐步的唤醒所有阻塞在get上的线程,这样所以线程都能get到结果。提高了效率。

FutureTask实现不但简单而且巧妙(比如巧妙的运用了共享锁),最重要的是使用的也是十分广泛:

  1. 做异步处理,对于下载,或者生成PDF这种比较重的场景,我们可以通过将请求异步化,抽象成FutureTask提交到线程池中运行,从而避免占用大量的Worker线程(Tomcat或者RPC框架),导致后面的请求阻塞。

  2. 对于服务的同步调用,我们可以利用FutureTask进行服务的并行调用,而在最后进行结果的汇总,这样就能变串行调用为并行调用,大大的减小请求的时间(类似于Fork-Join)。

最后,异步线程处理和并行处理是个好东西,需要用起来!!!。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容