【翻译】Java 8 Stream API 教程

本文翻译自The Java 8 Stream API Tutorial

1. 简介

  本教程志在细致入微、深入底层,你将体验从Stream的创建开始(creation)到并行执行(parallel execution)的完整过程,以此体会Stream API的实际用处。

  为了理解下面的文章,读者需要掌握Java 7基础知识(Lambda表达式、Optional、方法引用)以及熟悉Stream API,如果你并不熟悉它们甚至一无所知,建议你先阅读我们之前的文章-Java8 新特性 以及 Java 8 Streams 介绍

2. 创建Stream

  创建一个Stream实例有多种方式,每种创建方式对应Stream的一个来源。但单个Stream实例每次创建之后,其来源将无法修改,这意味着Stream实例具备源头不可变性,不过我们却可以从单个源创建多个Stream实例。

2.1 Empty Stream - 空Stream

  方法empty()被用于创建一个Empty Stream:

  Stream<String> streamEmpty  = Stream.empty;

  上述代码段创建的Empty Stream通常被用于避免null对象或零元素对象的streams(streams with no element)返回结果为null:

public Stream<String> streamOf(List<String> list){
  return lsit == null || list.isEmpty() ? Stream.empty() : list.streams();
}

2.2 Stream of Collection - 集合Steram

  我们可以创建任意Collection接口衍生类(Collection->List、Set、Queue)的Streams:

Collections<String> collection = Arrays.asList("a", "b", "c");
Stream<Stirng> streamOfCollection = collection.stream();

2.3 Stream of Array - 数组Stream

  接下来的这段代码展示的是数组Stream:

Stream<String> streamOfArray = Stream.of("a", "b", "c");

  当然我们可以先创建熟悉的数组类型,再以它为源创建Stream,而且我们可以选择Stream中包含的元素数量:

String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3);

2.4 Stream.builder() - 构建器

  当builder被用于指定参数类型时,应被额外标识在声明右侧,否则方法build()将创建一个Stream(Object)实例:

Stream<String> streamBuilder = Stream.<String>builder().add("a").add("b").add("c").build();

2.5 Stream.generator() - 生成器

  方法generator()接受一个供应器Supplier<T>用于元素生成。由于生产流(resulting stream)被定义之后属于无限流(即无止境地不断生产),开发者必须指定stream拥有流的目标大小,否则方法generator()将持续生产直到jvm内存到达顶值(memory limit):

Stream<String> streamOfGenerated = Stream.generate( () -> "element").limit(10);

  上述代码将创建十个内容为“element”的生成流。

2.6 Stream.iterate() - 迭代器

  另一种创建无限流的方法是通过调用方法iterate(),同样的它也需要使用方法limit()对目标流的元素大小进行限制:

Stream<Integer> streamItreated = Stream.iterate(40, n -> n + 2).limit(20);

  迭代流即采用迭代的方法作为元素生产方式,类似于高中数学中的f(x),f(f(x)),etc。上述例子中,生成流的第一个元素是迭代器iterate()中的第一个元素40,从第二个元素开始的每个新元素都与上个元素有关,在此例中,生成流中的元素为:40、42、44、...78、80。

2.7 Stream of Primitives - 基元流

  Java8提供了创建三大基础数据类型(int、long、double)stream的方式。由于Stream<T>是一个类接口,我们无法采用泛型传参的方式声明基础数据类型的stream,因此三个特殊的接口就被创造出来了:IntStream、LongStream、DoubleStream。
使用它们能够避免不必要的自动装箱1以提高生产效率。

IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);

  方法range(int startInclusive, int endInclusive)创建了一个有序流(从startInclusive到endInclusive)。它使后面的值每个增加1,但却不包括最后一个参数,即此方法的结果是具备上限的。方法rangeClosed(int startInclusive, int endInclusive)与range()大致相同,但它却包含了最后一个值。
这两个方法用于生成三大基本数据类型的stream。

  此外,Java8之后,类Random也提供了拓展方法用于生成基础数据类型的stream。例如,下述代码创建了一个含有三个随机值的DoubleStream:

Random random = new Random();
DoubleStream doubleStream = random.doubles(3);

2.8 Stream of String - 字符串流

  String类型也可以作为生成stream的源,这得益于方法chars()的帮助,此外由于JDK中没有CharStream接口,IntStream也被用来表示字符流(stream of chars)

IntStream streamOfChars = "abc".chars();

  下例中通过特征的正则表达式将一个字符串割裂成(break into)其子串。

Stream<String> streamOfString = 
  Pattern.compile(", ").spitAsStream("a", "b", "c");

2.9 Stream of File - 文件流

  Java NIO2类文件允许通过方法lines()生成文本文件的Stream<String>。文本的每一行都会变成stream的一个元素:

Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfString = Files.lines(path);
Stream<String> streamWithCharset = Files.lines(path, Charset.forName("utf-8"));

ps:在方法lines()中也可以通过Charset设置文件编码。

3. Referencing a Stream - 引用stream

  只要调用生成操作(中间操作)就会实例化一个stream并生成一个可获取的引用,但执行终端操作会使得stream无法访问。为了证明这一点,我们不妨先忘记它,毕竟实践是检验真理的唯一标准。
以下代码如果不考虑冗长的话将是有效的:

Stream<String> stream = Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();

  但是倘若我们在执行终端操作后重新使用相同的引用,则会不可避免的触发IllegalStateException。

Optional<String> firstElement = stream.findFirst();

  IllegalStateException是一个运行时异常(RuntimeException),即编译器将不会提示此错误。因此必须记得,JAVA8 不允许重复使用stream
这一设计是合乎逻辑的,因为stream从设计上旨在提供一个将有限操作(指函数体中元素的相关操作)的序列,而不是存储元素。
因此想让以前的代码正常工作我们得先改一改:

List<String> elements =
  Stream.of("a", "b", "c").filter(element -> element.contains("b"))
    .collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();

4. Stream Pipeline - 流的管道

  想要执行源数据集的操作集并聚合它们,你需要以下三个部分——源(Source)、中间操作(Intermediate operations)和终结操作(terminal operation)。
中间操作返回的是一个新的可操作stream。举个例子,为了在一个包含少量元素Stream的基础之上新建Stream,我们可以调用方法skip():

Stream<String> oneModifiedStream = Stream.of("abcd", "bbcd", "cbcd").skip(1);

  如果需要多次修改,则可以采用多次中间操作。假如我们还需要将Stream<String>中每个字符串替换为其子串subString(0, 3),则可以使用skip()和map()相连的方式完成:

Stream<String> twiceModifiedStream = stream.skip(1).map(element -> element.subString(0, 3));

  正如你所见,上例中map()使用Lambda表达式作为其参数对stream中的各元素进行处理。
stream本身是毫无价值的,编程人员最感兴趣的其实是终结操作(terminal operation),它可以是一个元素也可以是一个行为。只有在终结操作里才能对每个stream进行使用。正确的且最方便的stream操作方式就是Stream Pipeline,即stream源->中间操作->终结操作。如例:

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
  .map(element -> element.substring(0, 3)).sorted().count();

5. Lazy Invocation - 懒式调用

  中间操作是懒式调用的,这意味着只有在终结操作需要它们的时候中间操作才会被唤醒。
为了证明这个事实,假象我们有个方法wasCalled(),每当它被唤醒时使内部变量counter自增。

private long counter;
private void wasCalled() {
  counter++;
}

  接下来让我们在filter()操作中唤起wasCalled():

List<String> list = Arrays.asList(“abc1”, “abc2”, “abc3”);
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
    wasCalled();
    return element.contains("2");
});

  由于有三个变量,想象中filter()中的代码块将被执行三次,wasCalled()执行三次之后counter的值应为3,但是执行之后counter并未发生改变,仍然为0,也就是说filter()一次也没有被唤醒,这个原因就是缺失了终结操作(terminal operation)。
那接下来我们不妨再上述代码的基础之上添加一次map()操作和一个终结操作——findFirst(),并采用打日志的方式帮助我们了解方法调用时机及顺序。

Optional<String> stream = list.stream().filter( element -> {
    log.info("filter() was called!");
    return element.contains("2");
  }).map(element -> {
    log.info("map() was called!");
    return element.toUpperCase();
  }).findFirst();

  日志结果显示filter()被唤醒了两次,而map()仅仅被调用一次,这是由于管道流是垂直执行的。在此例中第一个元素不满足filter()的要求,因此filter()被调用第二次以查找合适的结果,通过之后即进行map()操作,此时就没有第三次机会执行filter()操作了。findFirst()就能找出源数据集中第一个含有“2”的字符串的全大写字符串了。因此,懒调用使得不必相继调用两个中间操作(filter()和map())才能完成任务了。

6. Order of Execution - 执行顺序

  从性能的角度考虑,正确的执行顺序是采用上文提到的流式管道(Stream Pipeline):

long size = list.stream().map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).skip(2).count();

  执行这段代码将使counter自增长3次,这意味着stream的方法map()将被调用3次,但最终size的值为1。这意味着结果流(resulting stream)中仅仅只有一个元素,毫无疑问在三次消息处理中程序跳过了两次处理。
如果我们改变skip()和map()的执行顺序,counter将只自增长一次。也即是map()只被调用一次:

long size = list.stream().skip(2).map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).count();

  以上示例告诉我们一个规则:用于减少流中元素数量的中间操作,应当放置在处理操作之前。因此,保证在你的Stream Pipeline规则中按照这样的顺序编码:skip() --> filter() --> distinct()

7. Stream Reduction - 流的聚合

  API提供了大量的终端操作用以聚合一个stream为一种数据类型或变量。比如:count()、max()、min()、sum(),但是这些方法都是预定义的。但如果用户需要自定义一个stream的聚合操作呢?官方提供了两个方法用以实现此类需求:reduce() 和 collect()。

7.1 reduce()方法

  此方法提供了三种变种,不同之处是它们的签名以及返回类型。reduce()方法具有下列参数:
identify(标识器) - 累积器的初始值或当stream为空时的默认值。
accumulator(累积器) - 提供设定聚合元素之逻辑的功能,每次规约(reducing)累积器都会创建一个新的值,新值的大小等于stream的大小,并且只有上一个值是可用的。这非常有助于提升性能。
combiner(组合器) - 提供聚合accumulator(累积器)中元素的功能,combiner是唯一一个能从不同线程以并行模式聚合累积器中结果的方法。
好,让我们来实战一下吧:

OptionalInt reduced =
    IntStream.range(1, 4).reduce((a, b) -> a + b);

reduced = 6 = 1 + 2 + 3。

int reducedTwoParams =
    IntStream.range(1, 4).reduce(10, (a, b) -> a + b);

reducedTwoParams = 16 = 10 + 1 + 2 + 3。

int reducedParams = Stream.of(1, 2, 3)
  .reduce(10, (a, b) -> a + b, (a, b) -> {
     log.info("combiner was called");
     return a + b;
  });

  这一结果与上文中的16一样,并且不会打出日志,因为combiner没有被唤起。为了唤醒combiner,stream应当是并行的:

int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
    .reduce(10, (a, b) -> a + b, (a, b) -> {
       log.info("combiner was called");
       return a + b;
    });

  此时,结果变为36,并且combiner被唤起了两次。规约(reduce)运转的算法为:每当stream中的元素通过identify(标识器)时accumulator(累积器)均被调用,最终累积器调用了3次。上述行为是并行完成的,因此造成了(10+1=11; 10+2=12; 10+3=13;)。最终combiner(组合器)混合了三次的结果,通过两次迭代完成运算(12+13=25; 25+11=36;)。

7.2 collect()方法

  stream的规约也可以被其他的终结方法执行——collect()。它接收了一个名为collector的参数,此参数注明规约的流程。官方已经创建了预定义的收集器,我们可以在这些收集器的帮助下访问它们。
下面我们将看到使用List作为所有stream的来源:

List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
  new Product(14, "orange"), new Product(13, "lemon"),
  new Product(23, "bread"), new Product(13, "sugar"));

转换一个stream为Collection集合(Collection、List、Set、Queue、etc)。

List<String> collectorCollection = 
  productList.stream().map(Product::getName).collect(Collectors.toList());

规约为String类型:

String listToString = productList.stream().map(Product::getName)
  .collect(Collectors.joining(", ", "[", "]"));

  join()方法拥有三个参数(delimiter, prefix, suffix),使用join()最便捷之处在于程序员不需要考虑stream的起始与结束甚至界定符,Collector会考虑到这些的。
计算stream中所有数字元素的平均值

double averagePrice = productList.stream()
  .collect(Collectors.averagingInt(Product::getPrice));

计算stream中所有数字元素的和

int summingPrice = productList.stream()
    .collect(Collectors.summingInt(Product::getPrice));

  方法averagingXX()、summingXX()和summarizingXX()适用于基础数据类型(int,long,double),也适用于它们的封装类( Integer,Long,Double)。一个很有效的功能技术提供映射,因此开发者也不是一定需要在collect()方法之后使用map()操作才能完成映射的。
收集stream元素集的统计信息

IntSummaryStatistics statistics = productList.stream()
    .collect(Collectors.summarizingInt(Product::getPrice));

  通过使用IntSummaryStatistics的生成实例,开发者能够通过请求toString()方法创建一个统计报告,结果将是一系列显而易见的结果:IntSummaryStatistics{count=5, sum=86, min=13, average=17,200000, max=23}。通过调用上述方法getCount()、getSum()、getMin()、getAverage()、getMax(),我们也很容易从对象中提取出count、sum、min、average的值,这是因为所有的值均可以从单个管道中获取。

采用指定方法组合stream中的元素:

Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
  .collect(Collectors.groupingBy(Product::getPrice));

  此例中stream将根据group规则将所有元素规约成一个map。
根据一些描述对stream进行分组:

Set<Product> unmodifiableSet = productList.stream()
  .collect(Collectors.collectingAndThen(Collectors.toSet(),
  Collections::unmodifiableSet));

  这种相对特殊的情况里,collection将stream转化为一个Set,之后在此基础上创建了一个不可变的Set。

Custome collector(自定义收集器):
  假若我们因为一些特定的原因需要创建自定义的收集器,那更简介轻快的方法是采用Collection的of()方法:

Collector<Product, ?, LinkedList<Product>> toLinkedList =
  Collector.of(LinkedList::new, LinkedList::add, 
    (first, second) -> { 
       first.addAll(second); 
       return first; 
    });
 
LinkedList<Product> linkedListOfPersons =
  productList.stream().collect(toLinkedList);

  在上例中,Collection的实例被规约成了一个LinkedList<Person>。

Parallel Streams - 并行流

  在Java8之前,并行化十分复杂。ExecutorServiceFornJoin的出现大大降低了并行开发的复杂度,但它们都无不避免的关注在如何创建一个特征鲜明的executor,以及如何去运行它等等。Java8提倡了一种新的方式用于在函数类型中实现并行化。
  API提供并行流用以并行化执行操作。当stream的源是一个数组或者Collection时,在parallelStream()方法的帮助下可以实现并行化:

Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
  .map(product -> product.getPrice() * 12)
  .anyMatch(price -> price > 200);

但如果stream的源不是数组或者集合类型时,parallel()方法就应该被使用了:

IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();

上例中,Stream API自动使用了ForkJoin框架去完成并行操作。默认情况下,公共线程池将被使用,不会(至少暂时不会)给它单独分配线程。当stream处于并行状态时,应当注意可能产生阻塞的操作,当对时间效率有所追求且操作可并行时应当转换为并行stream(理由是假如某个任务大小远远多于其他任务,那它将更加耗时)。当然啦,并行模式也可以转换回串行模式,只要使用sequential()方法就能做到这点:

IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel();

Conclusions - 结论

  Stream API在对链式数据进行操作时体现了其强大性,但也易于理解。它通过引用的方法规约大容量的数据,构建了更健壮的程序,最主要的是提升了项目开发的生产力。
  在本文中stream均是未被关闭的(我们没有调用close()方法或者其他的终结操作),但在实际项目中,不要这样无节制的放纵stream的存在,这将逐步耗尽你的内存,造成内存泄漏程序崩溃的风险
最后,本文所对应的示例代码你可以在github-core-java-8上获取到。祝福你身体健康,编码顺利!

附录

  1. 自动装箱: 编译器自动为语句进行语法解析,如类型补充等。详见Java 自动装箱与拆箱(Autoboxing and unboxing)
  2. JAVA NIO: New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套Java I/O标准。详见NIO与AIO
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 了解Stream ​ Java8中有两个最为重要的改变,一个是Lambda表达式,另一个就是Stream AP...
    龙历旗阅读 3,301评论 3 4
  • Int Double Long 设置特定的stream类型, 提高性能,增加特定的函数 无存储。stream不是一...
    patrick002阅读 1,267评论 0 0
  • 想做一个安静的人,不要那么浮躁。从小到大都被老师说我浮躁,静不下心来。我还记得书法老师说的那句话,能沉着练字的,一...
    不过逆旅阅读 154评论 0 0
  • 别人思考出来的道理,你不能直接用,你也要思考他思考出来的道理。不断的想,思考对应的事例,相反的事例,你就能更加明白...
    羊咩的内心拥有一头狼阅读 169评论 0 0