Stream知识手册

1.流(Stream)

    流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理。

    流到底是什么呢?简短的定义就是“从支持数据处理操作的源生成的元素序列”

    1.元素序列——就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。

    2.源——流会使用一个提供数据的源,如集合、数组或输入/输出资源。

    3.数据处理操作——常用操作,如filter、map、reduce、find、match、sort等且支持并行

    4.流水线——很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线

    5.内部迭代——与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的


    概念代码:


流示例

    在本例中,我们先是对menu调用stream方法,由菜单得到一个流。数据源是菜肴列表(菜单),它给流提供一个元素序列。接下来,对流应用一系列数据处理操作:filter、map、limit和collect。除了collect之外,所有这些操作都会返回另一个流,这样它们就可以接成一条流水线,于是就可以看作对源的一个查询。最后,collect操作开始处理流水线,并返回结果(它和别的操作不一样,因为它返回的不是流,在这里是一个List)。在调用collect之前,没有任何结果产生,实际上根本就没有从menu里选择元素。请注意,和迭代器类似,流只能遍历一次


2.流操作

    Stream的操作可以分为两大类:

        1是filter、map和limit可以连成一条流水线。

         2是collect触发流水线执行并关闭它。

    流的使用一般包括三件事:

        1.一个数据源(如集合)来执行一个查询;

        2.一个中间操作链,形成一条流的流水线;

        3.一个终端操作,执行流水线,并能生成结果。

筛选和切片

    filter: 该操作会接受一个谓词(一个返回boolean的函数)作为参数,并返回一个包括所有符合谓词的元素的流

    distinct:它会返回一个元素各异(根据流所生成元素的hashCode和equals方法实现)的流。

    limit(n):,该方法会返回一个不超过给定长度的流。所需的长度作为参数传递给limit。

    skip(n):返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一个空流。

映射

    map:它会接受一个函数(Function)作为参数。这个函数会被应用到每个元素上,并将其映射成一个新的元素(使用映射一词,是因为它和转换类似,但其中的细微差别在于它是“创建一个新版本”而不是去“修改”).。

    flatMap:将各个生成流扁平,即扁平化多个流化为单个流。

查找和匹配

    另一个常见的数据处理套路是看看数据集中的某些元素是否匹配一个给定的属性。StreamAPI通过allMatch、anyMatch、noneMatch、findFirst和findAny方法提供了这样的工具。

归约

    归约操作使用reduce方法,reduce接受两个参数,一个是初始值,一个是BinaryOperator<T>来将两个元素结合起来产生一个新值。

    int sum = numbers.stream().reduce(0, Integer::sum);    //求合

    Optional<Integer> max = numbers.stream().reduce(Integer::max);    求最大值

    reduce还有一个重载的变体,它不接受初始值,但是会返回一个Optional对象

    Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));

无状态和有状态

    诸如map或filter等操作会从输入流中获取每一个元素,并在输出流中得到0或1个结果。这些操作一般都是无状态的

    但诸如reduce、sum、max等操作需要内部状态来累积结果。不管流中有多少元素要处理,内部状态都是有界的。我们把这些操作叫作有状态操作。

中间操作与终端操作

构建流

    1:Stream静态方法Stream.of(T t),通过显式值创建一个流。它可以接受任意数量的参数。

    Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");

    2:可以使用empty得到一个空流,如下所示:

    Stream<String> emptyStream = Stream.empty();

   :3:数组创建流,Arrays的静态方法Arrays.stream从数组创建一个流。它接受一个数组作为参数:

    int[] numbers = {2, 3, 5, 7, 11, 13};

    int sum = Arrays.stream(numbers).sum();

    4:文件生成流,Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。java.nio.file.Files中的很多静态方法都会返回一个流。


   5: 函数生成流:Stream API提供了两个静态方法来从函数生成流:Stream.iterate和Stream.generate。这两个操作可以创建所谓的无限流:不像从固定集合创建的流那样有固定大小的流。

    iterate示例:

    Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println);

    iterate方法接受一个初始值(在这里是0),还有一个依次应用在每个产生的新值上的Lambda(UnaryOperator<t>类型)这里,我们使用Lambda n -> n + 2。。请注意,此操作将生成一个无限流——这个流没有结尾,因为值是按需计算的,可以永远计算下去。需要使用limit方法来显式限制流的大小。

    generate示例:

    Stream.generate(Math::random).limit(5).forEach(System.out::println);

    它接受一个Supplier<T>类型的Lambda提供新的值。


3.收集器

    预定义收集器

    预定义收集器,也就是那些可以从Collectors类提供的工厂方法(例如groupingBy)创建的收集器。它们主要提供了三大功能:

    1.归约和汇总    2.元素分组    3.元素分区 

      计数可以使用Collectors.counting()方法,Collectors.maxBy和Collectors.minBy,来计算流中的最大或最小值。汇总后的返回值为Optional<T>它是一个容器,可以包含也可以不包含值。

    Collectors类专门为汇总提供了一个工厂方法:Collectors.summingInt。它可接受一个把对象映射为求和所需int的函数,并返回一个收集器。此外还有averagingInt求平均数,summarizing求总和、平均值、最大值和最小值。

    示例:list.stream().collect(Collectors.summingInt(Item::getNo)    //汇总

    连接字符串,joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。

    示例:list.stream().map(Item::getName).collect(Collectors.joining())    //连接 


广义的归约汇总(reducing)

    Collectors.reducing工厂方法需要三个参数:

    1.第一个参数是起始值,也是流中没有元素时的返回值,所以很显然对于数值和而言0是一个合适的值。

    2.第二个参数是转换函数

    3.第三个参数是累积函数BinaryOperator,将两个项目累积成一个同类型的值。

    字符串连接改造示例1: 

    list.stream().collect(Collectors.reducing(new String() , Item::getName , (a,b)->{return a+b;}))                                                                                  初始值            转换函数              累加器

    字符串连接改造示例2:

    list.stream().map(Item::getName).collect(Collectors.reducing((a,b)->{return a + b;}))                                                                                                     只有累加器,当不需要转换时使用


分组(groupingBy)

    Collectors.groupingBy有三个重载方法 :

    groupingBy(Function classifier) 

    groupingBy(Function classifier,Collector downstream)

    groupingBy(Function classifier,Supplier mapFactory,Collector downstream)


简单分组 groupingBy(Function classifier) ,

    Function称为分类函数,它用来把流中的元素分成不同的组。分组操作的结果是一个Map,把分组函数返回的值作为映射的键,把流中所有具有这个分类值的项目的列表作为对应的映射值。

    示例:以Item的name进行分组

    Map<String, List<Item>> collect = list.stream().collect(Collectors.groupingBy(Item::getName));


二级分组 groupingBy(Function classifier,Collector downstream)

        要实现多级分组,我们可以使用一个由双参数版本的Collectors.groupingBy工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受collector类型的第二个参数。

    示例:以item的no为奇偶数进行分组,再以item的name进行分组

    Map>> collect = list.stream().collect(Collectors.groupingBy(a -> {

        if (a.getNo() %2 ==0) {

            return "偶数组";

       }else return "奇数组";

   }, Collectors.groupingBy(Item::getName))); 


把收集器的结果转换为另一种类型 groupingBy(Function classifier,Supplier mapFactory,Collector downstream)

 groupingBy(Function classifier,Supplier mapFactory,Collector downstream)

 使用示例:将str=‘aaabbbccdd’进行分组

TreeMap result = Arrays.stream(str.split(""))

.sorted()

.collect(Collectors.groupingBy(Function.identity(), TreeMap::new, Collectors.counting()));           

                                                    分类函数(Function)收集器 (Supliter)  转换函数(Collector)


分区partitioningBy

    分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组,false是一组.

    示例:按照Item.No是否为偶数进行分区

    list.stream().collect(Collectors.partitioningBy(a -> {

         if (a.getNo() %2 ==0) {

    `        return true;

        }else return false;

    }));


4.并行流

    介绍:通过对收集源调用parallelStream方法来把集合转换为并行流(内部通过fork/join)。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。

    下面通过给定N生成1到给定值N的累积值

改造前
改造后

    配置并行流使用的线程池

        并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?

        并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量, 这个值是由Runtime.getRuntime().available-Processors()得到的。

       可以通过系统属性 java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小             如:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

    使用并行流不能发生数据争用的情况,如改变了某些共享状态


错误的使用并行流

5.fork/join(分支/合并框架)

    分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

    使用fork/join框架必须创建要RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(当然它可能会更新其他非局部机构)。要定义RecursiveTask,只需实现它唯一的抽象方法compute。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容