Callable实现子线程获取函数返回值

Callable接口

Java中的子线程通常是通过Thread或者Runnable的方式实现的,但是这种方式只能通过回调,或者共享变量等方式来传递数据,而Callable则是可以获取返回结果的一种子线程实现方式。

Callable是一个接口,源码如下:

public interface Callable<V> {
    V call() throws Exception;
}

非常简单,只有一个方法,和一个泛型V,所以我们创建Callable对象的时候,也只需要指定返回类型并实现call方法就可以了。

Future接口

看完了Callable接口,会发现它非常简单,没有办法在子线程中直接通过它来获取到返回结果的,这时候就需要Future发挥作用了。源码如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • boolean cancel(boolean mayInterruptIfRunning) 在任务开始前取消,传入值表示任务开始后是否允许打断,返回值表示是否取消成功(任务已经开始不允许打断,已经运行结束,已经取消等等状态会返回失败)
  • boolean isCancelled() 是否已经被取消
  • boolean isDone() 是否完成任务
  • V get() 尝试获取返回结果,阻塞方法
  • V get(long timeout, TimeUnit unit) 同上,可以指定超时时间

可以看到,Future实际上可以理解为Callable的管理类。

在线程池中执行任务时,除了execute方法之外,还有一个submit方法:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

它返回的就是一个Future对象,可以通过它来回去Callable任务的执行结果:


Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Sub Thread is calculating...");
                Thread.sleep(10000);
                return 10;
            }
        };
        
Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
    try {
            System.out.println("Main Thread start waiting result... ");

            int res = future.get();
            System.out.println("Main Thread get result: " + res);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

//运行结果:
Main Thread start waiting result... 
Sub Thread is calculating...
Main Thread get result: 10

FutureTask

如果不用Java提供的线程池,直接用Thread怎样在子线程中运行Callable呢? 这时候就要用到FutureTask类了。

FutureTask实现了FutureRunnable接口,这就意味着,它既可以放在Thread中去运行,又能够对任务进行管理,下面是源码:


    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

可以看到,构造函数中传入了待运行的任务Callable对象或者Runnable对象和指定的返回结果,V result用来指定运行完成后的返回值,如果不想用指定值,可以用Future<?> f = new FutureTask<Void>(runnable, null)来返回null。采用Callable构造方法创建的FutureTask对象,执行完毕返回的是实际运算结果,而Runnable 构造函数返回值是传入的result。

task的状态

FutureTask中持有的任务对象,有以下几种状态:

    private static final int NEW          = 0; //新建或运行中
    private static final int COMPLETING   = 1;//任务运行结束,正在处理一些后续操作
    private static final int NORMAL       = 2;//任务已经完成,COMPLETING的下一个状态
    private static final int EXCEPTIONAL  = 3;//任务抛出异常,COMPLETING的下一个状态
    private static final int CANCELLED    = 4;//任务被取消
    private static final int INTERRUPTING = 5;//收到打断指令,还没有执行interrupt
    private static final int INTERRUPTED  = 6;//收到打断指令,也执行了interrupt

可能的状态变化主要有以下几种:

     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED

task的执行过程

FutureTask实现了Runnable接口,是可以直接放在Thread中执行的,实际上运行的就是它的run方法:

public void run() {
    //r如果当前状态不是NEW,说明任务已经执行完成了,直接返回
    //如果当前状态是NEW,尝试用CAS方式将当前线程赋值给RUNNER,赋值前RUNNER的值应该是null,否则赋值失败
    //赋值失败表示已经有线程执行了run方法,直接返回
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //运行中抛出异常
                    setException(ex);
                }
                //ran为true,说明正常运行结束,得到了返回结果
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

执行结果其实是比较简单的,通过RUNNER来记录执行任务的线程,从而保证只有一个线程可以执行该任务。运行结束后有两个出口:

  • setException(ex); 运行中出错,抛出异常
  • set(result); 任务执行完毕,获取到返回值
    protected void setException(Throwable t) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = t;
            U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //唤醒等待线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

这两个方法实际上是一样的,都是将状态赋值为COMPLETING,然后保存结果(运行结果或错误信息),再执行finishCompletion方法,通知WAITERS里记录的等待线程继续执行,并清空WAITERS

获取返回结果

获取返回结果是通过get()方法:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
            //cas机制,将新建节点q的next指向原来的节点workers,然后将workers更新为新建的节点。workers(WAITERS)实际上就是持有了所有等待线程的一个链表
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

get()方法比较简单,先判断当前状态,如果状态 < COMPLETING,说明任务没有执行完毕,直接调用awaitDone方法。

awaitDone方法可以接受两个参数,用来指定是否设置超时时间。它内部是一个无限for循环。下面是awaitDone方法的执行步骤(忽略超时设置):

  1. 进入awaitDone方法时,state一定是小于COMPLETING的,第一次会走else if (q == null)分支,创建一个WaitNode()对象用来保存当前线程

  2. 第二次循环q已经不是null了,如果任务仍然没有结束,会执行else if (!queued)分支,queued表示创建的WaitNode()是否已经添加到链表里,如果没有尝试添加,直到添加成功为止。

  3. 等待线程添加成功以后进入下一个循环,此时如果任务仍然没有结束,会走到else分支,挂起当前线程(阻塞)

  4. 此处阻塞的是等待结果的线程,也就是调用FutureTaskget()方法的线程,而不是执行任务的线程。阻塞线程用的是LockSupport.park(this)方法,唤醒的方法是LockSupport.unpark(),该方法在上边的finishCompletion()中出现了,也就是说,任务执行结束(运行完,抛出异常,被取消)时,等待的线程才会被唤醒,继续下一次循环。

  5. 任务结束以后,如果state是COMPLETING状态,说明一些清理任务还没有执行完,等待的线程会让出cpu,让其他线程优先执行

  6. 直到state 大于COMPLETING,说明FutureTask已经完全结束了,此时会会执行(s > COMPLETING)分支,把节点置空,并返回。

awaitDone返回以后,说明任务已经执行完成了,会进入report方法:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

可以看到,如果是正常结束,或者抛出异常结束,会返回结果,而如果是被取消,则会抛出异常。

使用

Callable<Integer> call = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("正在计算结果...");
                Thread.sleep(3000);
                return 1;
            }
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();
        Integer result = task.get();
        System.out.println("结果为:" + result);

总结

  1. FutureTask可以视为一个管理Callable任务的工具类,执行Callable任务的是FutureTaskrun方法,所以,可以通过 new Thread(futuretask)的方法来实现子线程执行任务

  2. 获取执行结果是通过FutureTaskget方法,调用该方法后,如果线程会被挂起,知道任务结束为止

  3. 获取结果的线程数量没有限定,可以是任意个线程

  4. 获取结果的线程被挂起以后,可以通过取消,超时等方法在任务执行完毕以前结束挂起状态。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,063评论 6 510
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,805评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,403评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,110评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,130评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,877评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,533评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,429评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,947评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,078评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,204评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,894评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,546评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,086评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,195评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,519评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,198评论 2 357

推荐阅读更多精彩内容