FutureTask 可获取结果的异步请求

1. FutureTask是什么?

FutureTask是一个可取消的异步计算。FutureTask实现了RunnableFuture接口,间接实现了Future和Runnable接口。在Future的基本实现中,提供了开始和取消一个计算的方法,同时可以查询计算是否完成并获取到计算结果。 仅当完成计算之后才能获取到计算结果,否则get方法将阻塞,直到计算完成。 计算完成之后这个计算将不能够重启或者取消,除非调用RunAndReset方法。
Runnable的实现,赋予了FutureTask包裹Runnable或Callable的能力,因此FutureTask可以被提交到Executor中执行。

interface RunnableFuture<V> extends Runnable, Future<V>{}
class FutureTask<V> implements RunnableFuture<V>{}

FutureTask的状态转换

FutureTask任务的运行状态,最初为NEW。运行状态仅在set、setException和cancel方法中转换为终端状态。在完成过程中,状态可能呈现出瞬时值INTERRUPTING(仅在中断运行程序以满足cancel(true)的情况下)或者COMPLETING(在设置结果时)状态时。从这些中间状态到最终状态使用 cheaper ordered/lazy writes(无锁同步CAS)策略, 原因是这些状态值是固定不可修改的。

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

2. FutureTask实现

这个类在实现上jdk 1.8 版本前后版本实现方式有些不同,之前使用的策略是AbstractQueuedsynchronizer也就是和ReentrantLock、ThreadPoolExecutor中的worker策略相同。 1.8版本是使用Usafe包中的Compare-And-Swap(CAS)特性实现。

Usafe包?
sun.misc.Unsafe 是sun公司的一个未开源的组件包,提供了些可以绕开JVM的一些方法,其中就包含了同步相关的一些方法。比如monitorEnter(),tryMonitorEnter(),monitorExit(),compareAndSwapInt(),putOrderedInt()。

什么是CAS?
CAS是CPU提供的 用于解决多线程并行情况下使用锁造成性能损耗 的机制。

如此优秀的特性,是不是尽情用就好了,当然不会尽善尽美,首先在Unsafe包中所有操作绕开了JVM意味着不会有人替你管理内存。 而且CAS会引入ABA问题。 更多内容请参考资料[1]

毋庸置疑根据前面描述的特性,FutureTask至少有一个cancel(), V get() 的public方法。

public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              U.compareAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

 /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


cance()方法很好理解,就做量两件事1. 还能不能取消了,要不要终止线程取消。 2. 设置对应的状态。
get()方法是去获取执行结果,但是现在如果没执行完呢? 前面说如果没执行完会阻塞等待,显然这是s = awaitDone(false, 0L);干的事情。

好像也不复杂。 对喽,稍微复杂的点地方在怎么去处理这个等待,怎么正确的同步处理状态。

/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion or at timeout
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 下面的代码巧妙的实现了一下目标:
        // 每次请求到来的时候获取下精确的纳秒时间 nanoTime. (纳秒随机开始可能获取到负数)
        // 因此针对不同的值不同处理。
        // 被意外唤醒那么等一会。
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();  //用户方传入等待和时间两个参数 且时间>0 创建等待节点
            }
            else if (!queued)
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {        // 等待 传入参数nanos
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else { 
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking 
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

这一堆看着是很长,发现注释只有一句话。
Awaits completion or aborts on interrupt or timeout.
这个函数就干了这个事情, 等完成、等取消、等取消、等超时。 有人可能会说胡说,明明只有三个return。
throw new InterruptedException();这不是还有一个抛出异常。
然后removeWaiter()又做了什么?

    /**
     * Tries to unlink a timed-out or interrupted wait node to avoid
     * accumulating garbage.  Internal nodes are simply unspliced
     * without CAS since it is harmless if they are traversed anyway
     * by releasers.  To avoid effects of unsplicing from already
     * removed nodes, the list is retraversed in case of an apparent
     * race.  This is slow when there are a lot of nodes, but we don't
     * expect lists to be long enough to outweigh higher-overhead
     * schemes.
     */
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!U.compareAndSwapObject(this, WAITERS, q, s))
                        continue retry;
                }
                break;
            }
        }
    }

3. FutureTask使用案例

ExecutorService service = Executors.newSingleThreadExecutor();
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
     @Override
     public String call() throws Exception {
         return "futureTask say HelloWorld!!!";
     }
});
service.execute(futureTask);
System.out.println(futureTask.get());

参考资料

[1] Unsafe : http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/
[2] synchronized 实现方式 : https://blog.csdn.net/qq_36520235/article/details/81176536
[3] objectMonitor.hpp 源码 : https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/da3a1f729b2b/src/share/vm/runtime/objectMonitor.hpp
[4] usafe https://blog.csdn.net/zyzzxycj/article/details/89877863

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容