java 异步编程

前言

在 java 中你不了解异步编程,crud 完全没有问题,但是有的需求你无法优雅的实现。

js 也存在异步编程,当你理解了用同步的思维编写异步的代码时,相信你在编程上的造诣又更进一步。

大多人都在追捧微服务,可能他们只会用 RibbonFeign。微服务是一个架构上的选择,当你没有达到架构层次时,我认为你应该更加注重业务上的代码编写,即微服务中单体服务代码的编写。

单体服务性能极差,你的微服务整体性能也好不到哪里去,只能通过限流、熔断外加多部署机器来解决并发低的问题。在你想玩微服务之前,并发玩好了再考虑高并发。先把 java 中 juc 包下的并发相关的知识整的明明白白再进行下一步,这花不了几个时间。微服务是你进阶之后再学的。

本来打算继续写 Mysql,但实在提不起来我的兴致(还需要看书研究,毕竟是个黑盒研究),只好拿这篇完成任务了。

本文内容

  • js 中 Promise 和 async await 的一个列子
  • SpringBoot 中异步编程
  • Future
  • CompletableFuture

js 异步编程

要习惯使用 Promise ,避免把 fn 当成参数传递,避免回调地狱。这不仅仅是 api 调用的问题,这是你编程思想转变。

const awaitFunc = function _awaitFunc() {
    return Promise.resolve('awaitFunc').then(data => {
        console.log(data);
        return 'awaitFunc-then-return-data';
    });
};

const async = async function _async() {
    setTimeout(() => {
        console.log('验证加入了宏任务队列---1');
    }, 0);
    // 加不加 await 有什么区别?
    await awaitFunc().then(data => {
        console.log(data);
        setTimeout(() => {
            console.log('验证加入了宏任务队列---2');
        }, 0);
    });
    console.log('awaitFunc 执行完在打印');
};
async();

SpringBoot 中异步编程

在 SpringBoot @EnableAsync@Async 就可以助你异步编程。底层原理就是 ThreadPoolExecutorFuture 的封装。

<img src="http://oss.mflyyou.cn/blog/20201108120510.png?author=zhangpanqin" alt="1583165-20200710095540896-1284477865" style="zoom: 33%;" />

我们拿这个烧水举例子,当你同步串行执行,需要消耗 20 分钟。同步编程思维模型较简单,容易实现。

当你多线程异步执行,只需要消耗 16 分钟。异步编程思维模型稍微复杂一点,多线程之间通信异步转同步是一个挑战。

@GetMapping("/tea/async")
public RetUtil makeTeaAsync() throws InterruptedException, ExecutionException {
    // Stopwatch 用于计算代码执行时间
    final Stopwatch started = Stopwatch.createStarted();
    final Future asyncResult = makeTeaService.boilWater();
    final Future asyncResult1 = makeTeaService.washTeaCup();
    asyncResult.get();
    asyncResult1.get();
    final long elapsed = started.elapsed(TimeUnit.SECONDS);
    String str = StrUtil.format("任务执行了 {} 秒", elapsed);
    final MakeTeaVO makeTeaVO = new MakeTeaVO();
    makeTeaVO.setMessage(str);
    return RetUtil.success(makeTeaVO);
}

@Service
public class IMakeTeaServiceImpl implements IMakeTeaService {
    @Override
    @Async
    public AsyncResult<String> boilWater() throws InterruptedException {
        System.out.println("洗水壶");
        TimeUnit.SECONDS.sleep(1);
        System.out.println("烧开水");
        TimeUnit.SECONDS.sleep(15);
        return new AsyncResult("洗水壶->烧开水");
    }

    @Override
    @Async
    public AsyncResult<String> washTeaCup() throws InterruptedException {
        System.out.println("洗茶杯");
        System.out.println("洗茶壶");
        System.out.println("拿茶叶");
        TimeUnit.SECONDS.sleep(4);
        return new AsyncResult("洗茶杯,洗茶壶,拿茶叶");
    }
}

AsyncResultFuture 的实现类,当调用 Future.get 会阻塞等待结果的返回。@Async 也可以指定在那个线程池中执行任务。

final Future asyncResult = makeTeaService.boilWater();
final Future asyncResult1 = makeTeaService.washTeaCup();
asyncResult.get();
asyncResult1.get();

这个 Demo 的实现,需要调用两次 Furute.get() 算是个不优雅的实现。

@Override
public String makeTea() throws InterruptedException {
    final CountDownLatch count = new CountDownLatch(2);
    THREAD_POOL_EXECUTOR.execute(() -> {
        System.out.println("洗水壶");
        System.out.println("烧开水");
        try {
            TimeUnit.SECONDS.sleep(16);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            count.countDown();
        }
    });
    THREAD_POOL_EXECUTOR.execute(() -> {
        System.out.println("洗茶杯");
        System.out.println("洗茶壶");
        System.out.println("拿茶叶");
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            count.countDown();
        }
    });
    count.await();
    System.out.println("泡茶");
    return "";
}
@GetMapping("/tea/async2")
public RetUtil makeTeaAsync2() throws InterruptedException, ExecutionException {
    final Stopwatch started = Stopwatch.createStarted();
    makeTeaService.makeTea();
    final long elapsed = started.elapsed(TimeUnit.SECONDS);
    String str = StrUtil.format("任务执行了 {} 秒", elapsed);
    final MakeTeaVO makeTeaVO = new MakeTeaVO();
    makeTeaVO.setMessage(str);
    return RetUtil.success(makeTeaVO);
}

使用 CountDownLatch 将异步代码转换为同步返回,这只是另一个实现

Future

public interface Future<V> {
    /**
     * 尝试取消这个任务的执行.
     * 如果任务执行完成之后,调用 cancel 返回 false.
     * 如果任务已经被取消了,调用 cancel 也会返回 false
     *
     * 如果任务已经执行了, mayInterruptIfRunning 标志是否中断执行任务的线程.
     * mayInterruptIfRunning 为 true 会触发线程的中断(当线程睡眠,会抛出异常 InterruptedException),
     * 为 false 时不中断任务执行,只改变 Future 的状态
     * 
     * 调用了 cancel 方法,调用 get 方法会抛出异常
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 任务完成之前调用 cancel ,此方法返回 true
     */
    boolean isCancelled();

    /**
     * 任务完成返回 true
     */
    boolean isDone();

    /**
     * 等待任务完成,然后返回其结果
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException    if the computation threw an    exception
     * @throws InterruptedException  if the current thread was interrupted while waiting
     */

    V get() throws InterruptedException, ExecutionException;
    /**
     * 等待任务完成,然后返回其结果.超时没有返回,抛出异常 TimeoutException
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future.cancel(true) 会触发线程休眠的中断,即 TimeUnit.SECONDS.sleep(10); 会抛出异常。

Future.cancel(true) 或者 Future.cancel(false) 都会触发 Future.get() 异常。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    final Future<String> submit = THREAD_POOL_EXECUTOR.submit(() -> {
        System.out.println("任务开始执行");
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务执行完毕");
        return "ok";
    });

    THREAD_POOL_EXECUTOR.execute(() -> {
        System.out.println("执行 submit.cancel");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        submit.cancel(false);
    });

    // submit.get();
    System.out.println("整个流程执行结束");
}

JDK 提供 Future 的实现 FutureTask 源码相对较简单,不再展开。

CompletableFuture

由于 Future 使用的局限性:不能链式调用、多个异步计算的结果不能传递下一个异步任务(可以做到,但是编程稍微复杂),异步执行异常的捕获处理

JDK 1.8 开始,大佬 Doug Lea 带来了更加容易的异步编程模型,CompletableFuture。

CompletableFuture 可以做到

1、获取异步执行的结果链式传递下一个异步去执行

2、异步执行时,你有机会处理异步执行时发生的异常

总之,CompletableFuture 很想。

CompletableFuture 实现比较复杂,有的地方不是那么容易理解,当你理解其实现思想,你也算是一只脚迈入了响应式编程中去了。

开胃小菜

public class CompletableFutureBlog1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Stopwatch started = Stopwatch.createStarted();

        // 洗水壶,烧水
        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗水壶");
            System.out.println("烧水");
            try {
                TimeUnit.SECONDS.sleep(16);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "洗水壶 -> 烧水";
        });
        
        // 洗茶壶,洗茶杯 -> 拿茶叶
        CompletableFuture<String> completableFuture2 = 
                CompletableFuture.supplyAsync(() -> {
                System.out.println("洗茶壶");
                System.out.println("洗茶杯");
                System.out.println("拿茶叶");
                try {
                    TimeUnit.SECONDS.sleep(4);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "洗茶壶,洗茶杯 -> 拿茶叶";
            });
        
        // 组合二者异步运算的结果,传递给方法计算
        final CompletableFuture<String> completableFuture = completableFuture2.thenCombine(completableFuture1, (result2, result1) -> {

            System.out.println(StrUtil.format("result2 是 洗茶壶,洗茶杯 -> 拿茶叶: {}", result2));
            System.out.println(StrUtil.format("result1 是 洗水壶 -> 烧水: {}", result1));


            System.out.println("泡茶");
            return "泡茶";
        });
        completableFuture.get();
        System.out.println("执行时间: " + started.elapsed(TimeUnit.SECONDS));
    }
}

runAsync 和 supplyAsync 的区别

runAsyncsupplyAsync 区别就是你是否需要获取异步计算的结果。当你需要异步处理的结果,你需要 supplyAsync

public class CompletableFutureBlog2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Stopwatch started = Stopwatch.createStarted();

        final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行耗时的异步计算,消耗 3 秒");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });
        final Integer integer = ret.get();

        System.out.println(StrUtil.format("异步执行的结果: {}", integer));

        System.out.println("执行时间: " + started.elapsed(TimeUnit.SECONDS));
    }
}

thenApplyAsyncthenAcceptAsyncthenRunAsync

thenXX 都是为了在上一个异步计算的结束之后执行。

我们对异步计算的结果分为以下几个情况:

  • 需要依赖异步计算的结果,并且依赖异步计算的结果计算返回另个一个结果 thenApplyAsync

  • 依赖异步计算的结果,但是不会产生新的结果,thenAcceptAsync

  • 不依赖计算计算的结果,并且没有返回值 thenRunAsync

public class CompletableFutureBlog3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Stopwatch started = Stopwatch.createStarted();

        final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行耗时的异步计算,消耗 3 秒");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });

        final Integer result = ret.thenApplyAsync(data -> {
            System.out.println("依赖上一个异步计算,消耗 5 秒");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return data + 12;
        }).get();

        System.out.println(StrUtil.format("异步执行的结果: {}", result));

        System.out.println("执行时间: " + started.elapsed(TimeUnit.SECONDS));
    }
}

thenCombineAsync

结合另一个 CompletableFuture 异步计算,当两个异步计算执行完了,执行回调。

计算一个耗时的计算。将这个耗时计算拆成两个耗时的异步计算,当两个异步计算结束,在合并最终的结果

public class CompletableFutureBlog4 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Stopwatch started = Stopwatch.createStarted();

        final CompletableFuture<Integer> ret1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行耗时的异步计算,消耗 3 秒");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });
        final CompletableFuture<Integer> ret2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行耗时的异步计算,消耗 5 秒");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });

        final CompletableFuture<Integer> integerCompletableFuture = ret2.thenCombineAsync(ret1, (result1, result2) -> result1 + result2);
        final Integer result = integerCompletableFuture.get();
        
        System.out.println(StrUtil.format("异步执行的结果: {}", result));

        System.out.println("执行时间: " + started.elapsed(TimeUnit.SECONDS));
    }
}

allOfanyOf

可以组合多个 CompletableFuture ,当每个 CompletableFuture 都执行完,执行后续逻辑。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
   return andTree(cfs, 0, cfs.length - 1);
}

可以组合多个 CompletableFuture ,当任何一个 CompletableFuture 都执行完,执行后续逻辑。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

future,future2,future3 执行完之后,再执行后续逻辑。

public class CompletableFutureBlog5 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Stopwatch started = Stopwatch.createStarted();

        final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(1);
        });

        final CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(1);
        });

        final CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(1);
        });

        final CompletableFuture<Void> future1 = CompletableFuture.allOf(future3, future2, future);
        future1.get();
        System.out.println("执行时间: " + started.elapsed(TimeUnit.SECONDS));
    }
}

将上述 demo 中 allOf 替换为 anyOf,当任一 CompletableFuture 执行完毕,future1.get(); 就会返回结果。

别的方法看参数和注释就学会了。就不再一一列举了。

当使用的时候,先考虑要不要依赖异步计算的结果,要不要处理异常,要不要返回新的异步计算结果,从这几个方面就可以知道选择哪个 api 了。


本文由 张攀钦的博客 http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342