组合式异步编程

构造同步和异步操作

// 定义线程池。第一个参数是线程池的大小,第2个参数是线程工厂
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
        // 线程工厂 ThreadFactory.newThread返回线程变量,参数Runnable定义了线程的业务逻辑
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

List<CompletableFuture<String>> priceFutures = shops.stream()
            // 异步方式调用shop.getPrice()方法,第二个参数executor是线程池
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(), executor))
            // thenApply 同步调用上一个流的结果,因为Quote::parse是本地方法耗时低,函数参数是个同步方法
            .map(future -> future.thenApply(Quote::parse))
            // thenCompose 异步调用上一个流的结果,参数函数要异步包装
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(
                () -> Shop.applyDiscount(quote), executor)))
            .collect(toList());

合并两个独立线程的任务

// 线程2的任务输入不依赖于线程1的任务输出,可以线程1执行的同时,线程2不用等待直接执行
CompletableFuture.supplyAsync(() -> shop.task1(xxx))
  .thenCombine(
    CompletableFuture.supplyAsync(
      () -> service.task2(xxx)),
    // 线程1和线程2的输出在第二个参数执行
    (result1, result2) -> ...
  );

响应事件

CompletableFuture[] futures =
            // thenApply方法不会等待上一个流所有结果都出来才开始处理,而是只要有一个结果出来就开始处理
            (CompletableFuture[])findPrices2(shops).map(f -> f.thenAccept(
               s -> System.out.println(s + "done in " +
            ((System.nanoTime() - start) / 1_000_000))))
            .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
        System.out.println("end");

CompletableFuture.allOf(futures).join(); // 等待所有结果出来才往下走
CompletableFuture.anyOf(futures).join(); // 只要有一个结果出来就往下走
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容