jdk中的异步编程--探究Runnable、Callable、Future以及FutureTask的原理

最近在用nodejs写一个操作数据库的脚本,nodejs中充斥着大量的异步调用,也踩了不少坑,毕竟主语言还是java,所以来探究下java中的异步编程的视线。

所谓异步编程,简单理解就是把耗时的I/O操作和快速的cpu执行代码的操作分离开来,应用程序执行过程中遇到耗时的操作就将这个操作扔给I/O线程去做,自己继续执行后面的代码逻辑;举一个简单的例子:学生A在1点30分的时候从家里出发去学校,在路上A突然想起来要去买一支铅笔,此时A通知学生B去帮忙去买这支铅笔,自己则继续去往学校。
把上面A看成主线程,B看成I/O线程,这就是简单的异步编程思路。

  1. 简单看一段代码先:
    private static ExecutorService executorService = Executors.newFixedThreadPool(5);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> task = new Callable<String>() {
            @Override
            public String call() {
                try {
                    Thread.sleep(3000);
                    LogUtil.log("Hello word");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "test";
            }
        };
        long syncStart = System.currentTimeMillis();
        doSyncWork(task);
        long syncEnd = System.currentTimeMillis();
        LogUtil.log("同步执行: "+(syncEnd-syncStart));

        long asynStart = System.currentTimeMillis();
        doAsynWork(task);
        long asynEnd = System.currentTimeMillis();
        LogUtil.log("异步执行: "+(asynEnd-asynStart));
    }
    private static void doAsynWork(Callable work) throws ExecutionException, InterruptedException {
        Future future = executorService.submit(work);
        // future.get();
    }
    private static void doSyncWork(Callable work) {
        try {
            work.call();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
image.png

代码中可以看出,task里面值做了一件事情,就是阻塞线程3s,同步执行直接调用task的run方法,此时先打印出了hello word,再打印了前后执行的时间差,从结果可以看出是先sleep了3s,在继续往下执行;相对的,异步执行代码直接执行到了代码结尾,最后才打印了task里面的hello word。

这里得出一个显而易见的结论,同步执行过程中始终只有一个master thread在执行,因此同步执行过程中sleep的是主线程,而异步执行过程中sleep的不是主线程,而是由executorService从线程池里面取出来的用来执行task的线程,这也说明了直接调用run方法是不会产生新的线程的。这里不探究线程池的原理和实现,先看一下这个简单例子中使用到的几个类和接口。

  1. FutureTask


    image.png

    去掉阻塞的future.get()方法的注释,上面代码从debug的结果可以看出,线程池执行task的结果是返回一个FutureTask对象,在详细学习这个对象之前,先做一下几个准备工作:
    a. 关于Runnable, Callable和Future接口
    先看一下Runnable:

public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

注释不难理解,Runnable 接口是用来创建一个线程的,线程在启动过程中会调用run方法,为什么验证这个说法,就顺便简单看一下Thread类的start方法:

public class Thread implements Runnable {
    /* What will be run. */
    private Runnable target;

    /**
     * Causes this thread to begin execution; the Java Virtual Machine
     * calls the <code>run</code> method of this thread.
     * <p>
     * The result is that two threads are running concurrently: the
     * current thread (which returns from the call to the
     * <code>start</code> method) and the other thread (which executes its
     * <code>run</code> method).
     * <p>
     * It is never legal to start a thread more than once.
     * In particular, a thread may not be restarted once it has completed
     * execution.
     *
     * @exception  IllegalThreadStateException  if the thread was already
     *               started.
     * @see        #run()
     * @see        #stop()
     */
    public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
    private native void start0();

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
}

可以看到,Thread本身实现了Runnable接口并保存了一个Runnable对象的引用,其执行run方法其实就是执行该对象引用的run方法,经常的就是我们自己实现的run方法,如上面代码中的task。

再看看我们常用的Thread.start()方法做了哪些事情,注释很详细:首先,说明start的行为是jvm会调用该线程的run方法;其次,start的结果是会有两个线程并发运行,一个是执行start的线程,另一个是执行run方法的线程,上面executorService.submit(work) 其实调用顺序就是 submit-->execute-->start-->run, 真正执行run方法的线程已经不是最开始submit的线程,因此这种情况下如果不使用future.get()阻塞等待异步执行结果,主线程会一直往下执行。

真正创建线程的操作是使用jdk封装的native方法start0()来调用操作系统的创建线程的操作,当线程得到CPU的时间片后,便会执行线程的run()方法(具体如何联系起来,这里不作详细研究),这也就说明了为什么上面doSyncWork直接调用call(Callable和Runnable类似)方法会阻塞主线程的原因,因为该过程并未产生新的线程。

再看下Callable:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable和Runnable功能非常类似,不同在于:
Runnable属于撒手没,你开启另一个线程去执行run方法之后,主线程就完全不用再关心run方法里面具体执行了什么;
Callable接口注释表明了接口设计的目的,Computes a result 计算得到一个接口并会抛出计算过程中所有可能的异常,再看到接口里面call方法返回的泛型结果,可以很清楚作者的意图,就是要返回一个结果,至于如何得到返回的结果,就要看下Future这个接口了。

最后看下Future这个接口:

public interface Future<V> {

    /**
     * 尝试去取消任务的执行,一下几种情况无法取消
     * the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. 
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * Returns {@code true} if this task was cancelled before it completed
     */
    boolean isCancelled();

    /**
     * Returns {@code true} if this task completed.
     */
    boolean isDone();

    /**
     * 阻塞等待task执行的结果
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     *  阻塞等待task执行的结果,最大等待timeout的时间
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

提供了五个方法,简单概括来说就是:取消当前任务执行,跟踪当前任务的完成状态,获取任务执行结果(阻塞)。

b. FutureTask概括


image.png

FutureTask实现了Runnable和Future接口,这就表示它可以提交给executorService线程池去执行,并且它可以跟踪task的状态以及get task执行的结果。

c. FutureTask的具体实现

先看一下Future的成员变量:

    /**
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

一共7个状态,其中New为初始状态,COMPLETING和INTERRUPTING 为中间状态,NORMAL(正常结束)、EXCEPTIONAL(异常结束)、CANCELLED(调用cancel结束)以及INTERRUPTED(中断结束)为终点状态。
再来看一下这段简单的程序:

  public class FutureWorkPrincipleCase {

    private static ExecutorService executorService = new ThreadPoolExecutor(5, 5, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

    static class TimeConsumingTask<String> implements Callable<java.lang.String>{

        @Override
        public java.lang.String call() throws Exception {
            Thread.sleep(100000);
            return "hello future!";
        }
    }

    public static void main(String[] args){
        doTimeConsumingTask(new TimeConsumingTask());
        LogUtil.log("main end");
    }

    private static void doTimeConsumingTask(TimeConsumingTask timeConsumingTask) {
        try {
            // 线程池executorService执行submit方法,初始化一个FutureTask对象,
            // 并调用execute方法尝试将该task投入到工作队列中, 最终会调用FutureTask的run()方法
            // 此处会初始化FutureTask对象中的callable成员变量
            Future<String> future = executorService.submit(timeConsumingTask);
            // future.get(), 阻塞等待线程池执行task的结果,所有调用future.get()的线程会被add到FutureTask对象的成员变量waiters中
            // waiters是一个单链表结构, 在run完成之后会调用finishCompletion方法,唤醒waiters中所有等待future结果的线程
            String result = future.get();
            LogUtil.log("doTimeConsumingTask get sync result "+result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
  }

先简单看下executorService的submit过程,如下新建了一个Futureask对象,并调用了execute方法

  public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
  }
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里不对线程池的原理详细做探究,简单概括一下execute的行为,
若当前工作线程数量<线程池配置的最大线程数量:尝试去新建一个线程并执行当前入队task;
如果入队成功,线程池还会做一次double-check,因为自上次检查后现有的线程已经死亡或者自从进入此方法后池关闭了;
如果入队失败,线程池会再次尝试创建一个新的线程,如果创建失败,则拒绝该task。

铺垫了那么多,线程池最终会执行到FutureTask的run方法,那么就看下FutureTask方法的具体实现:

  public void run() {
        if (state != NEW ||
            // 将runner线程设置为currentThread
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 这里调用我们自己实现的call方法执行具体的业务计算逻辑
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 每次执行结束将runner置位null,主要是为了每次在执行run方法的时候都会做原子判断
            // 防止线程执行多次自己的task
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

可以看到,在call方法执行完之后得到结果result会调用内部的set方法,赋值给成员变量outcome

  protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // 将outcome通知给waiters链上的所有线程
            finishCompletion();
        }
    }

最后,对FutureTask的四个成员变量做一个小小的总结:
Callable<V> callable: run方法中具体执行task的业务逻辑;
Object outcome: 楼上Callable的返回结果;
Thread runner:调用run方法的当前线程;
WaitNode waiters:所有等待outcome的等待线程队列(单链表结构)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容

  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,235评论 4 56
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 5,084评论 0 23
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    小徐andorid阅读 2,797评论 3 53
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,662评论 0 12
  • 只有做错事的人才会哭呢 因为他们没有底气 就只能软弱
    Venchin_阅读 201评论 0 0