手写理解Callable,Future,Executor

前言

Callable,Future,Executor都是java.util.concurrent包下的工具类,作者李二狗,为了彻底吃透它们的概念,今天就假设这些类都不存在,自己通过实际场景封装出这些工具的山寨版

需求

假设你需要写一个简单的方法,两个值求和,非常简单

public int sum(int x, int y) {
    return x + y;
}

但需求增加了,需要计算的过程在一个新线程中执行,这代码该怎么写?就会出现以下两个问题:

  • 怎么获取到线程执行的结果?
  • 怎么知道新线程什么时候执行完?

实现

首先第一个问题,如何获取新线程结果,这个也好解决,虽然新线程里的变量我取不到,但内存是线程共享的啊,只要提前定义一个结果变量即可

private volatile int outcome;

public int sum(int x, int y) {
    new Thread(()->{
        outcome = x + y;
    }).start();
    return outcome;
}

很明显,最终结果肯定不对,因为返回的时候新线程可能都没开始运行,这就是第二个问题:怎么知道新线程什么时候执行完?

解决的方案当然也很多,只要线程之间进行通讯一下即可,我们用LockSupport方法来实现通讯

public class OperationTest {

    private volatile int outcome;

    private Thread waitThread;

    public int sum(int x, int y) throws InterruptedException {
        waitThread = Thread.currentThread();
        new Thread(() -> {
            outcome = x + y;
            // 计算完成通知等待线程
            LockSupport.unpark(waitThread);
        }).start();
        // 等待计算完成
        LockSupport.park(this);
        return outcome;
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println(new OperationTest().sum(2,3));
    }
}

此时我们就完成了这个需求,但看一下代码真的好麻烦啊,明明就是一个1+1等于几的事,写了这么多代码,如果明天再来个写减法的需求,我还要重写这么一堆

封装

其实上面解决的两个问题,完全就是通用性的问题,加法这样处理,减法一样也是这个解决思路,那么我们就可以把加法、减法等有返回值的方法用函数式接口给抽象化

// 使用泛型兼容各种类型返回
@FunctionalInterface
public interface Callable<V> {
    V call();
}

接下来我们就要封装一个开启新线程执行它的工具,提供如下功能:

  • 只要传入一个方法作为参数,就可以开启一个新线程去执行这个方法,并返回执行结果
  • 也可以开启新线程执行普通的Runable方法

这个工具命名为ExecutorService

public interface ExecutorService {
    /**
     * 执行callable并返回执行结果
     * @param task
     * @param <T>
     */
    <T> T submit(Callable<T> task);

    /**
     * 也可以执行Runnable
     * @param runnable
     */
    void execute(Runnable runnable);
}

然后开始实现这个工具,暂时叫做NewThreadExecutor(新线程执行器)

public class NewThreadExecutor implements ExecutorService {

    private volatile Object outcome;

    private Thread waitThread;

    @Override
    public <T> T submit(Callable<T> task) {
        waitThread = Thread.currentThread();
        execute(()->{
            outcome = task.call();
            // 计算完成通知等待线程
            LockSupport.unpark(waitThread);
        });
        // 等待计算完成
        LockSupport.park(this);
        return (T) outcome;
    }

    @Override
    public void execute(Runnable runnable) {
        // 执行方式就是开启一个线程去执行
        new Thread(runnable).start();
    }

}

这时我们再实现上面的需求就轻而易举了

int x = 2;
int y = 3;
Integer sub = new NewThreadExecutor().submit(() -> {
    return x + y;
});
System.out.println(sub);

这样通过我们的逻辑和执行解耦,可以方便使用工具执行减法、乘法或其它复杂运算逻辑

多线程

再回头看一下我们这个工具,实际上非常不合理,开了一个新线程去执行函数,整个过程主线程却全程傻等

相当于一个主管带一个员工干活,而员工干活时,主管干不了别的事只能等着,那干脆主管自己干得了呗,何必聘请这么一个员工

而我们希望开启新线程后主线程可以去干别的(比如分配新任务给其它线程执行),等全分配完任务再统一获取结果,这样才算是多线程并行作业

那么如何改造代码呐?

首先,调用submit方法不能阻塞,应该直接返回一个对象,主线程再想要获取的时候,才通过这个对象阻塞获取结果

这个对象不是运行结果,但通过它可以获得结果,他就像一个未来的约定,我们先使用代码给它抽象出来,命名为Future

public interface Future<V> {
    // 是否运行完成
    boolean isDone();
    // 获取运行结果
    V get();
}

此时我们的ExecutorService返回结果变为Future对象

public interface ExecutorService {
    /**
     * 执行callable并返回future
     * @param task
     * @param <T>
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 也可以执行Runnable
     * @param runnable
     */
    void execute(Runnable runnable);
}

那么此时如何改造NewThreadExecutor这个实现呐?

首先要实现Future抽象,这个对象可以获取到执行结果,那么它肯定可以访问到存储执行结果的对象(outcome)和等待线程对象(waitThread),那不妨就把这两个对象放入Future实现中,同时最终执行的Runable方法也要可以访问到这两个对象,那不妨就让Future的实现同时就是最终执行的Runable,即可执行的Future,取名为FutureTask

public class FutureTask<V> implements Future<V>, Runnable {

    private Callable<V> callable; // 要执行的方法

    private volatile Object outcome; // 执行结果

    private Thread waitThread; // 等待的线程

    public FutureTask(Callable<V> callable) {
        this.callable = callable;
    }

    @Override
    public boolean isDone() {
        return outcome!=null;
    }

    @Override
    public V get() {
        waitThread = Thread.currentThread();
        if (isDone()) { // 如果已经执行完直接返回
            return (V) outcome;
        }
        // 否则等待
        LockSupport.park(this);
        return (V) outcome;
    }

    @Override
    public void run() {
        // 开始执行
        outcome = callable.call();
        // 计算完成通知等待线程
        LockSupport.unpark(waitThread);
    }
}

此时NewThreadExecutor改造如下

public class NewThreadExecutor implements ExecutorService {

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> futureTask = new FutureTask<>(task);
        execute(futureTask);
        return futureTask; // 直接返回
    }

    @Override
    public void execute(Runnable runnable) {
        // 执行方式就是开启一个线程去执行
        new Thread(runnable).start();
    }

}

这时我们就可以让两个子线程分别同时计算两个结果,最终主线程求和(真正的做到多线程计算)

Future<Integer> future1 = new NewThreadExecutor().submit(() -> {
    return 3 + 4;
});
Future<Integer> future2 = new NewThreadExecutor().submit(() -> {
    return 1 + 2;
});
System.out.println(future1.get()+future2.get());

扩展

以上封装的工具,达到了传入一个方法开启一个新线程计算的功能,并且使用future概念避免了阻塞

但工具还能再扩展一下,比如有一天领导让实现传入一个方法指定某一线程执行,或传入方法从几个固定线程中选一个空闲的去执行(线程池)

由于我们做到了逻辑和执行的分离解耦,所以只要重写一下execute就可以了,而无论如何执行submit的逻辑是不变的,我们可以继续给它抽象出来,命名为AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> futureTask = new FutureTask<>(task);
        execute(futureTask);
        return futureTask; // 直接返回
    }

}

此时它的继承者就可以传入方法并返回future,只需关注如何执行即可,比如我们的开启新线程执行工具

public class NewThreadExecutor extends AbstractExecutorService {

    @Override
    public void execute(Runnable runnable) {
        // 执行方式就是开启一个线程去执行
        new Thread(runnable).start();
    }

}

再比如使用线程池去执行

public class ThreadPoolExecutor extends AbstractExecutorService {
    @Override
    public void execute(Runnable runnable) {
        // 从线程池中选一个线程去执行
    }
}

最后

以上代码的命名基本参照jdk的源码,可以自行对照,相信再看源码就会非常清晰,也可以结合Executor源码详解解读源码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容