【并发进阶】Future掌控未来之Callable跨线程返回结果和抛出异常的原理分析

大家好,上一篇我们讲到Future掌控未来之Java这个傻儿子Runnable的缺陷。我们知道线程的发起Thread.start() 实质上是,start()调用native方法 start0(), 然后唤起系统线程,在系统线程中回调 Runnable中的run()方法。而且整个过程是异步的,导致在Runnable的两个致命缺陷,第一个是不能返回结果,第二个是不能抛出异常。所以Java爸爸后来引入了Callable这个接口,这个接口旨在解决这两个缺陷。

至于怎么用Callable, 相信都已经轻车熟路了,作为一个高级程序员只知道怎么用是不够的,我们还需要知道为什么?那么我们先思考下面几个问题:

  1. call()方法是否也是和run()方法一样通过系统线程直接调用的?
  2. Callable是怎么把结果返回给主线程?
  3. Callable是怎么把异常抛出给主线程的?

相信在接下来的几分钟里,你会对Callable有全新的认识。

其实实现这些功能单单靠Callable一个接口是办不到的,还需要借助Future、FutureTask类来完成这个功能。

三个臭皮匠Future、FutureTask、Callable介绍

在介绍之前我们先来想一下,如果让你实现线程返回结果的功能你要怎么办?

如下图所示:


image.png

如图main线程中异步启动一个线程 Thread.start(), 事实上等线程执行完后,main线程早就结束了。所以说如果让我们来实现线程返回结果的功能我们得需要通过曲线救国的方式来实现,什么意思呢?

因为线程是异步的,要想获取结果,我们是不是需要阻塞主线程,然后等待线程结束后把结果回调到主线程上,自 Java 1.5 , Java爸爸给我们提供了一个接口Future,简单地说,Future类代表异步线程的未来结果。这个结果最终会在处理完成后出现在Future中。Future.get()方法就实现了阻塞的功能,具体可以看一下这个图

image.png

如图,main线程把任务FutureTask传给Thread, 并启动Thread.start(), 然后在run()方法中调用Callable.call()方法,得到返回值后通过Future.get()方法返回给主线程。

看一下整体的类关系图:

image.png

由类图可以看到,FutureTask是Future和Runnable的实现类,同时持有Thread和Callable实例,FutureTask实现了Future的功能,也就是说FutureTask不仅管理着线程的阻塞获取结果(get()),线程取消中断(cancel())等功能。

注意FutureTask中定义了个Object outcome变量来存放结果。
定义了个链表waiters来存放所有等待获取结果的线程

事实上我们可以把FutureTask看成一个任务管理中心,他融合了Thread, Callable的功能(这里算是一个适配器模式了)。那么我们结合着代码来分析一下具体的逻辑。

实现代码分析

带着上面的关系图和流程图还有问题看以下代码

首先我们来看一下简单的使用流程,这里不多做赘述

  1. 创建FutureTask实例。
  2. 把实例传递给Thread并启动
  3. 阻塞获取结果
public class FutureTest {
    public static void main(String[] args) {
        //1\. 创建FutureTask并传入callable实例
        Future<String> stringFutureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return "FutureTask";
            }
        });
        //2\. 把实例传递给Thread并启动
        new Thread(stringFutureTask).start();

        String s = "";
        try {
            //3\. 阻塞获取结果
            s = stringFutureTask.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.printf(s);
    }
}

接下来我们结合源码分析一下这三步。

1. 创建FutureTask并传入callable实例

public FutureTask(Callable<V> callable) {
    ...
    this.callable = callable;
    this.state = NEW;
}

把callable传给给FutureTask.callable变量。

2. 把实例传递给Thread并启动并调用run()方法

执行new Thread(stringFutureTask).start(),这里启动后实际上调用的是Runnable.run()方法,具体为啥是调用run(),可参照线程的实现方式, 我们看一下FutureTask.run() 源码

@Override
public void run() {
    //把当前线程赋值给FutureTask.runner 实例
    runner = Thread.currentThread();
    try {
      // 赋值变量
      Callable<V> c = callable;
      //启动状态为NEW
      if (c != null && state == NEW) {
        // 定义结果变量
        V result;
        boolean ran;
        try {
          //1\. 这里调用 callable.call() 方法,也就是第一步传入的callable. 并返回结果
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          //如果抛出异常,
          result = null;
          ran = false;
          //改变线程状态为 COMPLETING
          if (STATE.compareAndSet(this, NEW, COMPLETING)) {
                //2\. 把异常赋给 outcome 变量
              outcome = ex;
              // 改变线程状态为 EXCEPTIONAL
              STATE.setRelease(this, EXCEPTIONAL);
            }
        }
        if (ran) {
          //设置结果并通知所有等待的线程
          set(result);
        }
      }
    } finally {
      ...
    }
}

protected void set(V v) {
    // 改变线程状态为 EXCEPTIONAL
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
      //3\. 把结果赋给 outcome 变量
      outcome = v;
      // 改变线程状态为 NORMAL
      STATE.setRelease(this, NORMAL);
            // 4\. 遍历阻塞等待的获取锁的线程,通知他们锁已释放
      for (WaitNode q; (q = waiters) != null;) {
        if (WAITERS.weakCompareAndSet(this, q, null)) {
          for (;;) {
            Thread t = q.thread;
            if (t != null) {
              q.thread = null;
              // 通知锁已释放
              LockSupport.unpark(t);
            }
            FutureTask.WaitNode next = q.next;
            if (next == null) {
              break;
            }
            q.next = null;
            q = next;
          }
          break;
        }
      }
      callable = null;
    }
}

上面的就是线程运行的源码,核心点有4个,

  1. 就是在这里调用 call()方法。
  2. 如果抛出异常把异常存到 Object outcome变量里面
  3. 如果正常返回结果,把结果存到 Object outcome中。至此线程运行完毕。
  4. 遍历阻塞等待的获取锁的线程,通知他们锁已释放

其实就是线程运行完后 把正常结果或者异常结果存到 Object outcome 对像中,释放锁并通知所有等待的线程。

到这里就可以回答开篇的第一个问题 1\. call()方法是否也是和run()方法一样通过系统线程直接来调用的?,调用流程是:
Thread.start() --> native start0() --> run() -> call()
可以看出,call() 方法是通过 run() 来调用的,当然这也是在线程中。

3. 阻塞获取结果

这一步是调用 Future.get()方法阻塞线程,等待结果,我们看一下源码:

public V get() throws Exception {
    int s = state;
    if (s <= COMPLETING) {
        // 1\. 如果线程还在执行,就就到waiters 链表里面阻塞等待结果。
        s = awaitDone();
    }
    // 获取结果
    if (s == NORMAL) {
        // 2\. 如果正常就返回正常的结果 outcome
        return (V)outcome;
    }
    // 3\. 如果异常就直接抛出 outcome
    throw new Exception((Throwable)outcome);
}

// 这个方法的意思就是,如果线程还在执行,就到waiters 链表里面等待,
// 一直到被 LockSupport.unpark() 唤醒
private int awaitDone() {
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        int s = state;
        if (s > COMPLETING) {
          return s;
        } else if (s == COMPLETING) {
          Thread.yield();
        } else if (q == null) {
          q = new WaitNode();
        }  if (!queued) {
          queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
        } else {
          LockSupport.park(this);
        }
    }
}

上面的就是阻塞获取结果的源码,核心点有3个,

  1. 如果线程还在执行,就就到waiters 链表里面阻塞等待结果。
  2. 如果线程执行完并正常,就返回正常的结果 outcome
  3. 如果异常就直接抛出 outcome。

看到这里,我们再来回顾一下开篇的几个问题,你是不是有了答案了。

最后

到这里,Callable,Future 相关的都分析完了,源码解析都比较枯燥,写这么多也不容易,感谢大家看到这里,有什么意见或者建议可以留言一起讨论,看到后第一时间回复,也希望大家能给个赞,你的赞就是我写文章的动力,再次感谢。

作者:TodoCoder
链接:https://juejin.cn/post/7117031279178842148
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容