Future&Callable

使用场景

当需要获取另一个线程的的执行结果的时候,可以用Future + callable结合来使用。

获取线程执行结果使用案例
public class FutureTaskDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("执行 callable");
        TimeUnit.SECONDS.sleep(2);
        return "callable";
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 定义任务
        Callable<String> callable = new FutureTaskDemo();
        //2. 定义一个可以取消的异步计算过程
        FutureTask<String> future = new FutureTask<>(callable);
        //3. 通过线程执行异步计算过程
        new Thread(future).start();
        //4. 获取返回结果
        try {
            String result = future.get();
            System.out.println(result);
        } catch (ExecutionException | CancellationException e ) {
            System.out.println("任务被取消或者抛出了异常");
        }catch (InterruptedException e){
             //当前线程被中断了
            System.out.println("当前线程被中断了");
        }
        
    }
}
Future&Callable源码流程图
image-20200807111146446.png

Callable 与 Runable区别

Callable 一个能够返回结果的任务,只要一个没有参数的方法v call()。

Runable 只有一个run()方法,能被线程执行。

2个方法类似,不过一个有返回值一个没有返回值。 Runable没有抛出异常,Callable抛出了异常,需要自己处理。

一个线程执行了一个任务,因为线程的run()方法是没有返回结果的,所以我们要想得到返回结果需要在定义一个接口并实现这个接口。我们要想异步执行且获取执行的返回结果,这里有2个需求 1.通过线程执行 2. 获取返回数据。

看一段伪代码。

public class Test implements Runnable {

    private String item;

    public static void main(String[] args) {
        Test test = new Test();
        //线程执行
        new Thread(test).run();
        //2 获取返回结果
        System.out.println(test.getItem());
    }

    private String getItem(){
        return item;
    }
    @Override
    public void run() {
        item = "1";
    }

}

以上就是通过简单的伪代码来实现异步线程执行计算并获取计算的结果。那么在java中是通过一个RunableFuture 接口定义这种行为。

RunableFuture

一个接口继承了 Runable 接口与 Future接口。RunableFuture extends Runnable, Future<V> ** 其中Runable不描述了 。Future**接口定义了异步计算的结果。

FutureTask

RunableFuture接口的实现类。线程执行任务并获取任务的返回结果。这个类的方法可以开启和停止计算。get方法将阻塞知道计算完成。

image-20200805182656697.png

FutureTask的使用看获取线程执行结果使用案例。他有2个构造方法!

image-20200806110810326.png

  1. FutureTask(Callable<v>) 入参是Callable。
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
  1. public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    //Executors
    public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    //适配器模式 对Runable的封装。
    static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

2个构造函数入参不管是Runable 还是 Callable最终都会适配成Callable接口。并赋值给类的属性。

private Callable<V> callable;

FutureTask源码

FutrueTask肯定会涉及到多线程问题,包括线程的中断与唤醒。多线程问题肯定会有一个变量作为临界值,通过cas去更改这个临界值来实现多线程环境下的业务逻辑。FutureTask中通过 state 作为临界属性。下面是state中几种类型的变化。

*  状态的几种变化
 * 新建    完成           正常
 * NEW -> COMPLETING -> NORMAL  正常执行完成
 * 新建    完成          抛出异常  call() 抛出异常
 * NEW -> COMPLETING -> EXCEPTIONAL
 * 新建    取消   cancel(false)    
 * NEW -> CANCELLED
 *  新建   中断中           中断完成 cancel(true)      
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

在案例中的第三步骤,//3. 通过线程执行异步计算过程。把FutureTask交给了线程去执行。我们知道线程一定会执行Runable的run方法,而FutureTask是RunableFuture的实现类,RunableFuture继承Runable接口。所以FutureTask一定会有一个实现了Runable接口的run()方法。且线程执行的时候一定会执行这个方法。

public void run() {
    //cas 设置属性  runner 为当前线程
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        //FutureTask 构造函数中state=NEW代表状态是初始值。
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //1. 执行Callable的call方法 callable是内部属性,由构造方法传入。
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            //如果执行完 
            if (ran)
                //2.设置返回结果
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

这里的逻辑比较简单,就是执行内部的属性Callable接口的call方法。执行完成之后,步骤2设置返回值

protected void set(V v) {
     //1. 通过cas设置state状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
       //2. 属性outcome 指向返回值
        outcome = v;
        //3. 设置state为 NORMAL(正常)
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
private void finishCompletion() {
    //1. 循环获取 WaitNode
    for (WaitNode q; (q = waiters) != null;) {
        //2. cas设置WaitNode = null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
           //3. 自旋单项链表,一个个的唤醒Thread
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                   //4 唤醒
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

<span style='color:red'> 这里我有2个疑问步骤1通过cas设置state状态,因为state是用 volatitle修饰的索引修改对所有内存有效,如果不是呢?是否还对所有内存有效。换句话说cas修改的是主内存还是高速缓存。第2个问题。 cas没有自旋的操作,也就是说加入compareAndSwapInt失败,直接返回false,state状态就不会是完成时了 第3个问题 步骤3 的设置为什么不用 cas呢?。 </span>

如果这个时候call()被阻塞了。而主线程执行future.get();方法会阻塞直到被上面的步骤4唤醒。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //等待
        s = awaitDone(false, 0L);
    return report(s);
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    //1. 判断是否设置了超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //当前线程处于中断状态的情况下,返回true且线程复位。
        if (Thread.interrupted()) {
           //移除当前节点
            removeWaiter(q);
           //抛出异常
            throw new InterruptedException();
        }
    
        int s = state;
        //2. 结束自旋条件 业务执行结束或者已取消
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            //告诉cup参与竞争
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            //3. cas设置 属性waiters。相当于一个单项链表
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //移除当前节点
                removeWaiter(q);
                return state;
            }
            //6. 当前线程阻塞nanos秒
            LockSupport.parkNanos(this, nanos);
        }
        else
            //5. 阻塞当前线程
            LockSupport.park(this);
    }
}
cancel(boolean flag) 取消或者中断执行的任务
public boolean cancel(boolean mayInterruptIfRunning) {
    //任务执行完不可以取消  cas设置state状态为 中断或者取消
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    //如果入参是ture 则发起中断标识
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //通知node链表,唤醒等待线程
        finishCompletion();
    }
    return true;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354