多线程编程CompletableFuture与parallelStream

一、简介

平常在页面中我们会使用异步调用$.ajax()函数,如果是多个的话他会并行执行相互不影响,实际上Completable我理解也是和它类似,是java

8里面新出的异步实现类,CompletableFuture类实现了Future接口,CompletableFuture与Stream的设计都遵循了类似的设计模式:使用Lambda表达式以及流水线的思想,从这个角度可以说CompletableFuture与Future的关系类似于Stream与Collection的关系。

二、代码

直接上代码,运行之后可以看出CompletableFuture是调用的时候就开始执行,当后续代码调到get的取值方法时,如果内部已经返回结果则直接拿到,如果还没有返回将阻塞线程等待结果,可以设置超时时间避免长时间等待。

以下是模拟并行调用多个方法的场景,比如查询页可能会有多个条件选择,这些条件需要后台数据相互之间有没有联系的场景,就不需要串行执行,异步执行可以节省大量时间

import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; publicclass AppTest { publicvoid printlnConsole(String msg) {

        String time =newLocalDateTime().toString("yyyy-MM-dd HH:mm:ss");

        System.out.println(String.format("[%s]%s", time, msg));

    } /** * 多任务单次异步执行 */ @Test publicvoid testManyFunAsync() { longstart = System.nanoTime();//程序开始时间try { intid = 1;//模拟一个参数,如学校IdprintlnConsole("调用异步任务..."); //使用异步方式调用方法【调用时就会开始执行方法】CompletableFuture futureClassCount = CompletableFuture.supplyAsync(() -> getClassCount(id));

            CompletableFuture futureStudentCount = CompletableFuture.supplyAsync(() -> getStudentCount(id)); //do something 做了一些其他的事情超过了异步任务执行的时间printlnConsole("做一些其他的事情...");

            Thread.sleep(3000);

            printlnConsole("其他事情完成"); //下面获取异步任务的结果,就会立即拿到返回值printlnConsole("获取异步任务结果...");

            Object classCount = futureClassCount.get(); //Object classCount = futureClassCount.get(2, TimeUnit.SECONDS);//可以设置超时时间,超过这个时间时将不再等待,返回异常Object studentCount = futureStudentCount.get(); //Object studentCount = futureStudentCount.get(2, TimeUnit.SECONDS);printlnConsole("异步任务结果获取完成");

            printlnConsole("ClassCount:" + classCount);

            printlnConsole("StudentCount:" + studentCount);

        } catch (Exception e) {

            e.printStackTrace();

        } longend = System.nanoTime();//程序结束时间longtime = (end - start) / 1000000;//总耗时System.out.println("运行时间:" + time);

    } publicintgetClassCount(int id) { try {

            Thread.sleep(2000);

            printlnConsole("getClassCount(" + id + ")执行完毕");

        } catch (InterruptedException e) {

            e.printStackTrace();

        } return20;

    } publicintgetStudentCount(int id) { try {

            Thread.sleep(1000);

            printlnConsole("getStudentCount(" + id + ")执行完毕");

        } catch (InterruptedException e) {

            e.printStackTrace();

        } return100;

    }

}

http://www.developcls.com

http://www.developcls.com/qa/03b9b0dcebcb47f39a782a15b2b5bd5e.html

anyOf()为任意一个子任务线程执行完毕后返回

allOf()为等待所有子任务线程全部执行完毕后返回

getNow()表示我需要立即拿到结果,如果当前的线程并未执行完成,则使用我传入的值进行任务调用,参数为无法获取结果时使用我传入的值

get()获取子线程运算的结果,会抛出检查到的异常

join()获取子线程运算的结果,不会抛出异常

package com.ysl; import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; importjava.util.concurrent.*; import java.util.stream.Collectors; publicclass AppTest { publicvoid printlnConsole(String msg) {

        String time =newLocalDateTime().toString("yyyy-MM-dd HH:mm:ss");

        System.out.println(String.format("[%s]%s", time, msg));

    } /** * 并行执行等待全部结果或等待任意结果 */ @Test publicvoid testAllOfAnyOf() { longstart = System.nanoTime(); try {

            printlnConsole("调用异步任务...");

            List ids = Arrays.asList(1, 3, 5);//准备的请求参数 //创建异步方法数组CompletableFuture[] futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id))).toArray(size ->new CompletableFuture[size]); //指定该异步方法数组的子任务线程等待类型CompletableFuture.anyOf(futures).join();//anyOf()为任意一个子任务线程执行完毕后返回 //CompletableFuture.allOf(futures).join();//allOf()为等待所有子任务线程全部执行完毕后返回 printlnConsole("做一些其他的事情...");

            Thread.sleep(2000);

            printlnConsole("其他事情完成");

            printlnConsole("获取异步任务结果:"); for (CompletableFuture f : futures) { //Object obj = f.getNow(1);//getNow()表示我需要立即拿到结果,如果当前的线程并未执行完成,则使用我传入的值进行任务调用,参数为无法获取结果时使用我传入的值Object obj = f.get();//get()获取子线程运算的结果,会抛出检查到的异常 //Object obj = f.join();//join()获取子线程运算的结果,不会抛出异常 printlnConsole(String.valueOf(obj));

            }

        } catch (Exception e) {

            e.printStackTrace();

        } longend = System.nanoTime(); longtime = (end - start) / 1000000;

        System.out.println("运行时间:" + time);

    } publicString getClassName(int id) { try {

            Thread.sleep(id * 1000);

            printlnConsole("getClassName(" + id + ")执行完毕");

        } catch (InterruptedException e) {

            e.printStackTrace();

        } return"taiyonghai-" + id;

    }

}

下面是并行流的演示parallelStream也是java 8新特性

 ids.stream()转化为流.map()映射每个元素对应的结果.collect(Collectors.toList)把结果归纳为List;还有.filter()过滤元素.sorted()对元素进行排序.limit()获取指定数量元素;也可以toArray(size

-> new Class[size])转化为数组

以下是模拟根据Id查询学生名称的场景,接收到的是一个集合又都是调用同一个方法获取,就可以使用并行流同时异步请求等待返回结果

import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; publicclass AppTest { publicvoid printlnConsole(String msg) {

        String time =newLocalDateTime().toString("yyyy-MM-dd HH:mm:ss");

        System.out.println(String.format("[%s]%s", time, msg));

    } /** * 单任务多次并行流执行 */ @Test publicvoid testParallelStream() { longstart = System.nanoTime(); try {

            printlnConsole("调用异步任务...");

            List ids = Arrays.asList(1, 2, 3, 4, 5);//准备的请求参数 //串行执行会等待每一个方法执行完毕后在继续执行下一个 //List<String> names = ids.stream().map(id -> getStudentName(id)).collect(Collectors.toList()); //并行执行会同时调用多个方法待全部执行完毕后一起返回(parallelStream是非线程安全的,配合collect达到线程安全,后续验证一下)List names = ids.parallelStream().map(id -> getStudentName(id)).collect(Collectors.toList()); //无论stream()或者parallelStream()调用时均会阻断线程执行printlnConsole("做一些其他的事情...");

            Thread.sleep(3000);

            printlnConsole("其他事情完成");

            printlnConsole("获取异步任务结果:");

            names.forEach(item -> printlnConsole(item));

        } catch (Exception e) {

            e.printStackTrace();

        } longend = System.nanoTime(); longtime = (end - start) / 1000000;

        System.out.println("运行时间:" + time);

    } publicString getStudentName(int id) { try {

            Thread.sleep(2000);

            printlnConsole("getStudentName(" + id + ")执行完毕");

        } catch (InterruptedException e) {

            e.printStackTrace();

        } return"taiyonghai-" + id;

    }

}

 上面能看到并行流虽然是并行执行但等待结果是阻塞线程的,所以可以利用异步CompletableFuture配合串行流来实现

以下是采用串行流配合异步实现的并发处理

import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; publicclass AppTest { publicvoid printlnConsole(String msg) {

        String time =newLocalDateTime().toString("yyyy-MM-dd HH:mm:ss");

        System.out.println(String.format("[%s]%s", time, msg));

    } /** * 单任务多次异步执行 */ @Test publicvoid testOneFunAsync() { longstart = System.nanoTime(); try {

            printlnConsole("调用异步任务...");

            List ids = Arrays.asList(1, 2, 3, 4, 5);//准备的请求参数 //ids.stream()转化为流.map()映射每个元素对应的结果.collect()把结果归纳为List;还有.filter()过滤元素.sorted()对元素进行排序.limit()获取指定数量元素;List> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id))).collect(Collectors.toList()); //不用并行流parallelStream()调用时就不会阻断线程执行printlnConsole("做一些其他的事情...");

            Thread.sleep(3000);

            printlnConsole("其他事情完成");

            printlnConsole("获取异步任务结果:");

            futures.forEach(f -> { try {

                    Object obj = f.get();

                    printlnConsole(String.valueOf(obj));

                } catch (InterruptedException e) {

                    e.printStackTrace();

                } catch (ExecutionException e) {

                    e.printStackTrace();

                }

            });

        }catch (Exception e){

            e.printStackTrace();

        } longend = System.nanoTime(); longtime = (end - start) / 1000000;

        System.out.println("运行时间:" + time);

    } publicString getStudentName(int id) { try {

            Thread.sleep(2000);

            printlnConsole("getStudentName(" + id + ")执行完毕");

        } catch (InterruptedException e) {

            e.printStackTrace();

        } return"taiyonghai-" + id;

    }

}

 当我的并行任务数量超过了我机器的核心数就会产生等待,我电脑是8核使用并行流执行数量就可以开8个子线程,当多余这个数量时剩下的就需要等待前面线程执行完再执行

当需要并行执行的任务数量大于核心数的时候,产生的等待是我们不想看到的,这时CompletableFuture就更加适用,它可以手动这只线程池大小,避免并行任务过多时的等待

我们将代码做些修正

以下是源码,这样就可以提高对多任务并行处理的支持了

import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; importjava.util.concurrent.*; import java.util.stream.Collectors; publicclass AppTest { publicvoid printlnConsole(String msg) {

        String time =newLocalDateTime().toString("yyyy-MM-dd HH:mm:ss");

        System.out.println(String.format("[%s]%s", time, msg));

    } /** * 手动配置线程执行器的线程池大小 */privatefinalExecutor myExecutor = Executors.newFixedThreadPool(20,new ThreadFactory() {

        @Override public Thread newThread(Runnable r) {

            Thread t =new Thread(r); //使用守护线程保证不会阻止程序的关停t.setDaemon(true); return t;

        }

    }); /** * 单任务多次异步执行 */ @Test publicvoid testOneFunAsync() { longstart = System.nanoTime(); try {

            printlnConsole("调用异步任务...");

            List ids = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);//准备的请求参数 //ids.stream()转化为流.map()映射每个元素对应的结果.collect()把结果归纳为List;还有.filter()过滤元素.sorted()对元素进行排序.limit()获取指定数量元素;List> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id), myExecutor)).collect(Collectors.toList()); //不用并行流parallelStream()调用时就不会阻断线程执行printlnConsole("做一些其他的事情...");

            Thread.sleep(3000);

            printlnConsole("其他事情完成");

            printlnConsole("获取异步任务结果:");

            futures.forEach(f -> { try {

                    Object obj = f.get();

                    printlnConsole(String.valueOf(obj));

                } catch (InterruptedException e) {

                    e.printStackTrace();

                } catch (ExecutionException e) {

                    e.printStackTrace();

                }

            });

        } catch (Exception e) {

            e.printStackTrace();

        } longend = System.nanoTime(); longtime = (end - start) / 1000000;

        System.out.println("运行时间:" + time);

    } publicString getStudentName(int id) { try {

            Thread.sleep(2000);

            printlnConsole("getStudentName(" + id + ")执行完毕");

        } catch (InterruptedException e) {

            e.printStackTrace();

        } return"taiyonghai-" + id;

    }

}


java 8的新特性也只做到了会用,很多深入的还不了解,还望指导谢谢,下面备份一下别人的总结我觉得挺有用的:

选择正确的线程池大小

《Java并发编程实战》中给出如下公式:

Number = NCpu * Ucpu * ( 1 + W/C)

Number : 线程数量

NCpu : 处理器核数

UCpu : 期望cpu利用率

W/C : 等待时间与计算时间比

我们这里:99%d的时间是等待商店响应 W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推断出 number = 800。但是为了避免过多的线程搞死计算机,我们选择商店数与计算值中较小的一个。

并行流与CompletableFuture

目前,我们对集合进行计算有两种方式:1.并行流 2.CompletableFuture;

1、而CompletableFuture更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待IO而发生阻塞。

书上给出的建议如下:如果是计算密集型的操作并且没有IO推荐stream接口,因为实现简单效率也高,如果所有的线程都是计算密集型的也就没有必要创建比核数更多的线程。

2、反之,如果任务涉及到IO,网络等操作:CompletableFuture灵活性更好,因为大部分线程处于等待状态,需要让他们更加忙碌,并且再逻辑中加入异常处理可以更有效的监控是什么原因触发了等待。

参考地址:https://segmentfault.com/a/1190000012833183

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,063评论 6 510
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,805评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,403评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,110评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,130评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,877评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,533评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,429评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,947评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,078评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,204评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,894评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,546评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,086评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,195评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,519评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,198评论 2 357

推荐阅读更多精彩内容