FutureTask源码解析

简介

FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,可以把它理解为是可以返回结果的Runnable。

使用FutureTask的优势有:

1、可以获取线程执行后的返回结果;
2、提供了超时控制功能。

缺陷:
1、虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果
2、想完成一些复杂的任务可能就比较难。比如下面一些例子:
①将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
②当Future集合中某个任务最快结束时,返回结果。
③等待Future结合中的所有任务都完成。

实现的接口

实现了Runnable接口和Future接口:

例子

public class FutureTaskTest {

    public static void main(String[] args) {
        Callable<Integer> call = new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                System.out.println("process ...");
                Thread.sleep(3000);
                return 1;
            }
        };
        FutureTask<Integer> task = new FutureTask<>(call);

        Thread thread = new Thread(task);
        thread.start();
        Thread thread1 = new Thread(task);
        thread1.start();

        System.out.println("do something...");

        try {
            Integer result = task.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}
输出:
process ...
do something...
1

上面代码在在任务执行时,不需要一直等待其运行结束返回结果,而是可以先去处理其他的事情,然后再获取返回结果。

原理

FutureTask构造器

 public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
 }
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

可以看出FutureTask提供两种方式来执行任务:
第一种是直接通过callable。
第二种构造方法传入一个Runnable对象和一个返回值对象,然后把他们封装为callable(RunnableAdapter),所以本质上还是通过callable来调用的。另因为Runnable是没有返回值的,所以要通过result参数在执行完之后返回结果。

核心成员变量

volatile int state;//表示对象状态,volatile关键字保证了内存可见性。futureTask中定义了7种状态,代表了7种不同的执行状态
private static final int NEW          = 0; //任务新建和执行中
private static final int COMPLETING   = 1; //任务将要执行完毕
private static final int NORMAL       = 2; //任务正常执行结束
private static final int EXCEPTIONAL  = 3; //任务异常
private static final int CANCELLED    = 4; //任务取消
private static final int INTERRUPTING = 5; //任务线程即将被中断
private static final int INTERRUPTED  = 6; //任务线程已中断

run方法

 public void run() {
        /*
     * 首先判断状态,如果不是初始状态,说明任务已经被执行或取消;
     * runner是FutureTask的一个属性,用于保存执行任务的线程,
     * 如果不为空则表示已经有线程正在执行,这里用CAS来设置,失败则返回。
     */
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //这里调用目标方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //设置返回结果对象
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
          // 无论是否执行成功,把runner设置为null
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            // 如果被中断,则说明调用的cancel(true),
          // 这里要保证在cancel方法中把state设置为INTERRUPTED
          // 否则可能在cancel方法中还没执行中断,造成中断的泄露
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
   //设置返回结果对象
     protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            //当前任务执行完成(不管成功还是失败),唤醒等待队列中的下一个任务
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

总结一下run方法的执行过程

1、只有state为NEW的时候才执行任务;
2、执行前要设置runner为当前线程,使用CAS来设置是为了防止竞争;
3、如果任务执行成功,任务状态从NEW转换为COMPLETING,如果执行正常,设置最终状态为NORMAL;如果执行中出现了异常,设置最终状态为EXCEPTIONAL
4、任务不管成功或者失败都唤醒并删除Treiber Stack中的所有节点;
5、如果调用了cancel(true)方法进行了中断,要确保在run方法执行结束前的状态是INTERRUPTED

finishCompletion方法

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

在调用get方法时,如果任务还没有执行结束,则会阻塞调用的线程,然后把调用的线程放入waiters中,这时,如果任务执行完毕,也就是调用了finishCompletion方法,waiters会依次出栈并逐个唤醒对应的线程。

get方法

 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

1、这两个方法类似,首先判断状态,如果s <= COMPLETING,说明任务已经执行完毕,但set方法或setException方法还未执行结束(还未设置状态为NORMALEXCEPTIONAL),这时需要将当前线程添加到waiters中并阻塞。

2、第二种get提供了超时功能,如果在规定时间内任务还未执行完毕或者状态还是COMPLETING,则获取结果超时,抛出TimeoutException。而第一种get会一直阻塞直到state > COMPLETING
3、两个方法最终都调用report方法返回结果:如果state==NORMAL,将返回结果,否则抛异常

cancel方法

 public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // mayInterruptIfRunning参数表示是否要进行中断
        if (mayInterruptIfRunning) {
            try {
                // runner保存着当前执行任务的线程
                Thread t = runner;
                // 中断线程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 设置最终状态为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

如果状态不是NEW,或者设置状态为INTERRUPTINGCANCELLED失败,则取消失败,返回false。

简单来说有一下两种情况:

1、如果当前任务还没有执行,那么state == NEW,那么会尝试设置状态,如果设置状态失败会返回false,表示取消失败;
2、如果当前任务已经被执行了,那么state > NEW,也就是!state == NEW为true,直接返回false。也就是说,如果任务一旦开始执行了(state != NEW),那么就不能被取消。
如果mayInterruptIfRunning为true,要中断当前执行任务的线程。

总结

1、FutureTask是线程安全的,在多线程下任务也只会被执行一次;
2、get方法调用时,如果任务没有结束,要阻塞当前线程,阻塞的线程将保存在一个Treiber Stack中;
3、get方法超时功能如果超时未获取成功,会抛出TimeoutException;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容