Java 8函数式编程学习笔记

Java 8函数式编程学习笔记

author:Gavin

date:2018/11/09

什么是函数式编程

在思考问题时,使用不可变值和函数,函数对一个值进行处理,映射成另一个值。

函数接口

函数接口是只有一个抽象方法的接口,用作Lambda 表达式的类型。

Java中重要的函数接口

接口 参数 返回类型 示例
Predicate<T> T boolean 今天下雨吗?
Consumer<T> T void 输出一个值
Function<T, R> T R 获得Artist对象的名字
Supplier<T> None T 工厂方法
UnaryOperator<T> T T 逻辑非(!)
BinaryOperator<T> (T, T) T 求两个数的乘积(*)

常用的流操作

collect(toList())

collect(toList()) 方法由 Stream 里的值生成一个列表,是一个及早求值操作。

List<String> collected = Stream.of("a", "b", "c")
                                .collect(Collectors.toList());
assertEquals(Arrays.asList("a", "b", "c"), collected);

map

如果一个函数可以将一种类型的值转换成另外一种类型,map操作就可以使用该函数,将一个流中的值转换成一个新的流。

List<String> collected = Stream.of("a", "b", "hello")
                                .map(string -> string.toUpperCase())
                                .collect(toList());
assertEquals(asList("A", "B", "HELLO"), collected);
map操作

传给map的 Lambda 表达式只接受一个 String 类型的参数,返回一个新的 String。参数和返回值不必属于同一种类型,但是 Lambda 表达式必须是 Function 接口的一个实例,Function 接口是只包含一个参数的普通函数接口。

Function 接口

filter

遍历数据并检查其中的元素时,可尝试使用 Stream 中提供的新方法 filter。

filter操作
List<String> beginningWithNumbers 
    = Stream.of("a", "1abc", "abc1")
            .filter(value -> isDigit(value.charAt(0)))
            .collect(toList());
assertEquals(asList("1abc"), beginningWithNumbers);

和map很像,filter 接受一个函数作为参数,该函数用 Lambda 表达式表示。该函数和前面示例中 if 条件判断语句的功能一样,如果字符串首字母为数字,则返回 true。若要重构遗留代码,for 循环中的 if 条件就是一个很强的信号,可用 filter 方法替代。

由于此方法和 if 条件语句的功能相同,因此其返回值肯定是 true 或者 false。经过过滤, Stream 中符合条件的,即 Lambda 表达式值为 true 的元素被保留下来。该 Lambda 表达式的函数接口正是前面的 Predicate。

Predicate接口

flatMap

flatMap 方法可用 Stream 替换值,然后将多个 Stream 连接成一个 Stream。

flatMap操作

stream_reduce.png

前面的 map 操作可以用一个新的值替代 Stream 中的值。有时,希望让 map 操作有点变化,生成一个新的 Stream 对象取而代之。用户通常不希望结果是一连串的流,此时 flatMap 最能排上用场。

List<Integer> together = Stream.of(asList(1,2), asList(3,4))
                                .flatMap(numbers -> numbers.stream())
                                .collect(toList());
assertEquals(asList(1,2,3,4), together);

调用 stream 方法,将每个列表转换成 Stream 对象,其余部分由 flatMap 方法处理。flatMap 方法的相关函数接口和 map 方法的一样,都是 Function 接口,只是方法的返回值限定为 Stream 类型罢了。

max 和 min

Stream 上常用的操作之一是求最大值和最小值。Stream API 中的 max 和 min 操作足以解决这一问题。

List<Track> tracks = asList(new Track("Bakai", 524),
                            new Track("Violets for Your Furs", 378),
                            new Track("Time Was", 451));
Track shortestTrack = tracks.stream()
                            .min(Comparator.comparing(track -> track.getLength())).get();

assertEquals(tracks.get(1), shortestTrack);

查找 Stream 中最大或最小元素,首先要考虑的是用什么作为排序的指标。

为了让 Stream 对象按照曲目长度进行排序,需要传给它一个 Comparator 对象。Java 8 提供了一个新的静态方法 comparing,使用它可以方便地实现一个比较器。

reduce

reduce 操作可以实现一组值中生成一个值。在上述例子中用到的 count、min 和 max 方法,因为常用而被纳入标准库中。事实上,这些方法都是 reduce 操作。

下图展示了如何通过 reduce 操作对 Stream 中的数字求和。以0作七点-一个空 Stream 的求和结果, accumulator 的值就是所有元素的和。

使用reduce操作实现累加

Lambda 表达式就是 reducer,它执行求和操作,有两个参数:传入 Stream 中的当前元素和 acc。将两个参数相加,acc 是累加器,保存着当前的累加结果。

int count = Stream.of(1, 2, 3).reduce(0, (acc, ele) -> acc + ele);

assertEquals(6, count);

Lambda 表达式的返回值是最新的 acc,是上一轮 acc 的值和当前元素相加的结果。reducer 的类型是 BinaryOperator。

展开 reduce 操作:

BinaryOperator<Integer> accumulator = (acc, ele) -> acc + ele;
int count = accumulator.apply(
        accumulator.apply(
            accumulator.apply(0, 1),
        2),     
3);

reduce 过程的中间值:

元素 acc 结果
N/A N/A 0
1 0 1
2 1 3
3 3 6

高阶函数

高阶函数是指接受另外一个函数作为参数,或返回一个函数的函数。高阶函数不难辨认:看函数签名就够了。如果函数的参数列表里包含了函数接口,或该函数返回一个函数接口,那么该函数就是高阶函数。

map 是一个高阶函数,因为它的 mapper 参数是一个函数。事实上 Stream 接口中几乎所有的函数都是高阶函数。之前的排序用到了 comparing 函数,它接受一个函数作为参数,获取相应的值,同时返回一个 Comparator。Comparator 可能会被误以为是一个对象,但它有且只有一个抽象方法,所以实际上是一个函数接口。

事实上,可以大胆断言,Comparator 实际上应该是个函数,但是那时的 Java 只有对象,因为才造出了一个类,一个匿名类。

正确使用 Lambda 表达式

明确要达成什么转化,而不是说明如何转化的另外一层含义在于写出的函数没有副作用。这一点非常重要,这样只通过函数的返回值就能充分理解函数的全部作用。

没有副作用的函数不会改变程序或外界的状态。在 Lambda 表达式中使用局部变量,可以不使用 final 关键字,但局部变量在既成事实上必须是 final 的。

不论何时,将 Lambda 表达式传给 Stream 上的高阶函数,都应该尽量避免副作用。唯一的例外是 forEach 方法,它是一个终结方法。

基本类型

Java 的泛型是基于对泛型参数类型的擦除-换句话说,假设它是 Object 对象的实例,因此只有装箱类型才能作为泛型参数。这就解释了为什么在 Java 中想要一个包含整型值的列表 List<int>,实际上得到的却是一个包含整型对象的列表List<Integer>。

麻烦的是,由于装箱类型是对象,因此在内存中存在额外的开销。比如,整型在内存中占用4字节,整型对象却要占用16字节。这一情况在数组上更加严重,整型数组中的每个元素只占用基本类型的内存,而整型对象数组中,每个元素都是内存中的一个指针,指向 Java 堆中的某个对象。在最坏的情况下,同样大小的数组,Integer[]要比 int[] 多占用 6 倍内存。

将基本类型转换为装箱类型,称为装箱,反之则称为拆箱,两者都需要额外的计算开销。对于需要大量数值运算的算法来说,装箱和拆箱的计算开销,以及装箱类型占用的额外内存,会明显减缓程序的运行速度。

为了减少这些开销,Stream 类的某些方法对基本类型和装箱类型做了区分。下图的高阶函数 mapToLong 和其它类似函数即为该方面的一个尝试。在 Java 8 中,仅对整型、长整型和双浮点型做了特殊处理,因为它们在数值计算中用的最多,特殊处理后的系统性能提升效果最明显。

ToLongFunction

对基本类型做特殊处理的方法在命名上有明确的规范。如果方法返回类型为基本类型,则在基本类型前面加 To,如上图所示的 ToLongFunction。如果参数类型是基本类型,则不加前缀只需类型名即可,如下图中的 LongFunction。如果高阶函数使用基本类型,则在操作后加后缀 To 再加基本类型,如 mapToLong。

LongFunction

这些基本类型都有与之对应的 Stream,以基本类型名为前缀,如 LongStream。事实上,mapToLong 方法返回的不是一个一般的 Stream,而是一个特殊处理的 Stream。在这个特殊的 Stream 中,map 方法的实现方式也不同,它接受一个 LongUnaryOperator 函数,将一个长整型值映射成另一个长整型值,如下图所示。通过一些高阶函数装箱方法,如 mapToObj,也可以从一个基本类型的 Stream 得到一个装箱后的 Stream,如 Stream<Long>。

LongUnaryOperator

使用 summaryStatistics 方法统计曲目长度

public static void printTrackLengthStatistics(Album album) {
    IntSummaryStatistics trackLengthStats 
            = album.getTracks()
                    .mapToInt(track -> track.getLength())
                    .summaryStatistics();
    System.out.println("Max: %d, Min: %d, Ave: %f, Sum: %d",
                      trackLengthStats.getMax(),
                      trackLengthStats.getMin(),
                      trackLengthStats.getAverage(),
                      trackLengthStats.getSum());
}

这些统计在所有特殊处理的 Stream,如 DoubleStream、LongStream 中都可以得出。

@FunctionInterface

为了提供 Stream 对象可操作性而引入各种新接口,都需要有 Lambda 表达式可以实现它。它们存在的意义在于将代码块作为数据打包起来。因此,它们都添加了 @FunctionInterface注释。

该注释会强制 javac 检查一个接口是否符合函数接口的标准。如果该注释添加给一个枚举类型、类或另外一个注释,或者接口包含不止一个抽象方法,javac 就会报错。

默认方法

Collection 接口中增加了新的 stream 方法,如果让其它的类在不知道该方法的情况下通过编译?Java 8 通过如下方式解决该问题:Collection 接口告诉它所有的子类:“如果你没有实现 stream 方法,就使用我的吧。” 接口中这样的方法叫做默认方法,在任何接口中,无论函数接口还是非函数接口,都可以使用该方法。

默认方法示例:forEach 实现方式

default void forEach(Consumer<? extends t> action) {
    for (T t : this) {
        action.accpet(t);
    }
}

Optional

reduce 方法的一个重点尚未体积:reduce 方法有两种形式,一种如前面出现的需要有一个初始值,另一种便是则不需要有初始值。没有初始值的情况下,reduce 的第一步使用 Stream 中的前两个元素。有时,reduce 操作不存在有意义的初始值,这样做就是有意义的,此时,reduce方法返回一个 Optional 对象。

使用 Optional 对象有两个目的:首先,Optional 对象鼓励程序员适时检查变量是否为空,以避免代码缺陷;其次,它将一个类的 API 中可能为空的值文档化,这比阅读实现代码要简单很多。

创建某个值的 Optional 对象

Optional<String> a = Optional.of("a");
assertEquals("a", a.get());

Optional 对象也可能为空,因此还有一个对应的工厂方法 empty,另外一个工厂方法 ofNullable 则可以将一个空值转换成 Optional 对象。

创建一个空的 Optional 对象,并检查其是否有值

Optional emptyOptional = Optional.empty();
Optional alsoEmpty = Optional.ofNullable(null);

assertEquals(emptyOptional.isPresent());

assertTrue(a.isPresent());

使用 Optional 对象的方式之一是在调用 get() 方法前,先使用 isPresent 检查 Optional 对象是否有值。使用 orElse 方法则更简洁,当 Optional 对象为空时,该方法提供了一个备选值。如果计算备选值在计算上太过繁琐,即可使用 orElseGet 方法。该方法接受一个 Supplier 对象,只有在 Optional 对象真正为空时才会调用。

使用 orElse 和 orElseGet 方法

assertEquals("b", emptyOptional.orElse("b"));
assertEquals("c", emptyOptional.orElseGet(() -> "c"));

高级集合类和收集器

方法引用

Lambda 表达式经常调用参数。比如想得到艺术家的名字,Lambda 的表达式如下:

artist -> artist.getName()

Java 8 提供了一个简写语法,叫做方法引用,帮助程序员重用已有方法。代码如下:

Artist::getName

标准语法为 Classname::methodName。虽然这是一个方法,但是不需要在后面加括号,因为这里并不调用该方法。我们只是提供了和 Lambda 表达式等价的一种结构,在需要时才会调用。凡是使用 Lambda 表达式的地方,就可以使用方法引用。

构造函数同样也有缩写形式,如下:

(name, nationality) -> new Artist(name, nationality)
// 使用方法引用,上述代码可写为:
    Artist::new

另外一个要注意的地方是方法引用自动支持多个参数,前提是选对了正确的函数接口。

还可以用这种方式创建数组,下面的代码创建了一个字符串型的数组:

String[]::new

使用收集器

标准类库已经提供了一些有用的收集器:java.util.stream.Collectors。

转换成其它集合

有一些收集器可以生成其它集合。比如 toList,生成了 java.util.List 类的实例。还有 toSet 和 toCollection,分别生成 Set 和 Collection类的实例。

通常情况下,创建集合时需要调用适当的构造函数指明集合的具体类型:

List<Artist> artiests = new ArrayList<>();

但是调用 toList 或者 toSet 方法时,无需指定具体的类型。Stream 类库在背后自动为你挑选了合适的类型。你可能希望使用 TreeSet,而不是由框架在背后自动为你指定一种类型的 Set。此时就可以使用 toCollection,它接受一个函数作为参数,来创建集合。

使用 toCollection,用定制的集合收集元素

stream.collect(toCollection(TreeSet::new));

转换成值

还可以利用收集器让流生成一个值。maxBy 和 minBy 允许用户按某种特定的顺序生成一个值。下例展示了如何找出成员最多的乐队。它使用了一个 Lambda 表达式,将艺术家映射为成员变量,然后定义了一个比较器,并将比较器传入 maxBy 收集器。

找出成员最多的乐队

public Optional<Artist> biggestGroup(Stream<Artist> artists) {
    Function<Artist, Long> getCount = artist -> artist.getMembers().count();
    return artists.collect(maxBy(comparing(getCount)));
}

找出一组专辑上曲目的平均数

public double averageNumberOfTracks(List<Artist> albums) {
    return albums.stream()
                .collect(averagingInt(album -> album.getTrackList().size()));
}

事实上,Java 8 也提供了能完成类似功能的收集器,如 averagingInt。可以使用 summingInt 及其重载方法求和。 SummaryStatistics也可以使用 summingInt 及其组合收集。

数据分块

另外一个常用的操作是将其分解成两个集合。假设有一个艺术家组成的流,你可能希望将其分成两个部分,一部分是独唱歌手,另一部分是有多人组成的乐队。可以使用 partitioningBy 收集器,它接受一个流,并将其分成两部分,如下图所示。它使用 Predicate 对象判断一个元素应该属于哪部分,并根据布尔值返回一个 Map 到列表。因此,对于 true List 中的元素, Predicate 返回 true;对其它 List 中的元素,Predicate 返回 false。

partitioningBy 收集器

将艺术家组成的流分成乐队和独唱歌手两部分

public Map<Boolean, List<Artist>> bandsAndSolo(Stream<Artist> artists) {
    // 使用方法引用代替 Lambda 表达式
    // return artists.collect(partitioningBy(Artist::isSolo));
    return artists.collect(partitioningBy(artist -> artist.isSolo()));
}

数据分组

数据分组是一种更自然的分割数据操作,与将数据分成 true 和 false 两部分不同,可以使用任意值对数据分组。比如现在有一由专辑组成的流,可以按专辑当中的主唱对专辑分组。

使用主唱对专辑分组

public Map<Artist, List<Album>> albumsByArtist(Stream<Album> albums) {
    return albums.collect(groupingBy(album -> album.getMainMusician()));
}

调用流的 collect 方法,传入一个收集器。groupingBy 收集器接受一个分类函数,用来对数据分组,就像 partitioningBy 一样,接受一个 Predicate 对象将数据分成 true 和 false 两部分。我们使用的分类器是一个 Function 对象,和 map 操作用到的一样。

groupingBy 收集器

字符串

很多时候,收集流中的数据都是为了在最后生成一个字符串。

使用流和收集器格式化艺术家姓名

String result = 
    artists.stream()
            .map(Artist::getName)
            .collect(Collectors.joining(", ", "[", "]"));

这里使用 map 操作提取出艺术家的姓名,然后使用 Collectors.joining 收集流中的值,该方法可以方便地从一个流得到一个字符串,允许用户提供分隔符(用以分割元素)、前缀和后缀。

组合收集器

之前我们使用主唱将专辑分组,现在来考虑如何计算一个艺术家的专辑数量。一个简单的方案是使用前面的方法对专辑先分组后计数。

使用收集器计算每个艺术家的专辑数

public Map<Artist, Long> numberOfAlbums(Stream<Album> albums) {
    return albums.collect(groupingBy(album -> album.getMainMusician(), counting()));
}                 

groupingBy 先将元素分成块,每块都与分类函数 getMainMusician 提供的键值相关联,然后使用下游的另一个收集器收集每块中的元素,最好将结果映射为一个 Map。

使用收集器求每个艺术家的专辑名

public Map<Artist, List<String>> nameOfAlbums(Stream<Album> albums) {
    return albums.collect(groupingBy(Album::getMainMusician, 
                                    mapping(Album::getName, toList())));
}

每个收集器都是生成最终值的一剂良方。这里需要两剂配方,一个传给另一个。Oracle 提供了 mapping 收集器。

mapping 允许在收集器的容器上执行类似 map 的操作。但是需要指明使用什么样的集合类存储结果,比如 toList。这些收集器就像乌龟叠罗汉,龟龟相驮以致无穷。

mapping 收集器和 map 方法一样,接受一个 Function 对象作为参数。

这两个例子我们都用到了第二个收集器,用以收集最终结果的一个子集。这些收集器叫作下游收集器。收集器是生成最终结果的一剂配方,下游收集器则是生成部分结果的配方,主收集器中会用到下游收集器。这种组合使用收集器的方式,使得它们在 Stream 类库中的作用更加强大。

那些作为基本类型特殊定制的函数,如 averagingInt、summarizingLong等,事实上和调用特殊 Stream 上的方法是等价的,加上它们是为了将它们当做下游收集器来使用的。

重构和定制收集器

使用 reduce 和 StringBuilder 格式化艺术家姓名

StringBuilder reduced = 
    artists.stream()
            .map(Artist::getName)
            .reduce(new StringBuilder(), (builder, name) -> {
                if (builder.length() > 0) 
                    builder.append(", ");
                
                builder.append(name);
                return builder;
            }, (left, right) -> left.append(right));
reduced.insert(0, "[");
reduced.append("]");
String result = reduced.toString();

使用 reduce 和 StringCombiner 类格式化艺术家姓名

StringCombiner combined = 
        artists.stream()
                .map(Artist::getName)
                .reduce(new StringCombiner(", ", "[", "]"),
                        StringCombiner::add,
                        StringCombiner::merge);
String result = combined.toString();

代码大相径庭,背后工作是一样的。我们使用 reduce 操作将姓名和分隔符连接成一个 StringCombiner 对象。不过这次连接姓名操作被代理到了 StringCombiner.add 方法,而连接两个连接器操作被 StringCombiner.merge 方法代理。

add 方法返回连接新元素后的结果

public StringCombiner add(String element) {
    if (areAtStart()) {
        builder.append(prefix);
    } else {
        builder.append(delim);
    }
    builder.append(element);
    return this;
}

add 方法在内部将操作代理给一个 StringBuilder 对象。如果刚开始进行连接,则在最前面添加前缀,否则添加分隔符,然后再添加新的元素。这里返回一个 StringCombiner 对象,因为这是传给 reduce 操作所需的类型。合并也是同样的道理,内部操作代理给 StringBuilder 对象。

merge 方法连接两个 StringCombiner 对象

public StringCombiner merge(StringCombiner other) {
    builder.append(other.builder);
    return this;
}

使用 reduce 操作,将工作代理给 StringCombiner 对象

String result = 
        artists.stream()
            .map(Artist::getName)
            .reduce(new StringJoiner(", ", "[", "]"),
                   StringJoiner::add,
                   StringJoiner::merge)
            .toString();

因为现有的逻辑在程序中不能重用,因此,我们想将 reduce 操作重构为一个收集器,在程序中任何地方都能使用。不妨将这个收集器叫做 StringCollector。

使用定制的收集器 StringCollector 收集字符串

String result = artists.stream()
                    .map(Artist::getName)
                    .collect(new StringCollect(", ", "[", "]"));

既然已经将所有对字符串的连接操作代理给了定制收集器,应用程序就无需关心 StringCollector 对象的任何内部细节,它和框架中其它 Collector 对象用起来是一样的。

先来实现 Collector 接口,由于 Collector 接口支持泛型,因此先得确定一些具体的类型:

  • 待收集元素的类型,这里是 String;
  • 累加器的类型 StringJoiner;
  • 最终结果的类型,这里依然是 String。

定义字符串收集器

/**
 * 自定义StringCollector收集器,需要三个参数:
 * 
 * <pre>
 *      <li>delimiter:分隔符</li>
 *      <li>prefix:前缀</li>
 *      <li>suffix:后缀</li>
 * </pre>
 * 
 * @author Gavin
 *
 */
public class StringCollector implements Collector<String, StringJoiner, String> {
    private final String delimiter;
    private final String prefix;
    private final String suffix;

    /**
     * 默认构造函数
     * 
     * @param delimiter
     *            分隔符
     * @param prefix
     *            前缀
     * @param suffix
     *            后缀
     */
    public StringCollector(String delimiter, String prefix, String suffix) {
        this.delimiter = delimiter;
        this.prefix = prefix;
        this.suffix = suffix;
    }

    /**
     * accumulator 是一个函数,将当前元素叠加到收集器
     */
    @Override
    public BiConsumer<StringJoiner, String> accumulator() {
        return StringJoiner::add;
    }

    /**
     * characteristics 返回一个不可变的Set收集器.
     */
    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        return new HashSet<>();
    }

    /**
     * combiner 合并两个容器
     */
    @Override
    public BinaryOperator<StringJoiner> combiner() {
        return StringJoiner::merge;
    }

    /**
     * finisher 方法返回收集操作的最终结果
     */
    @Override
    public Function<StringJoiner, String> finisher() {
        return StringJoiner::toString;
    }

    /**
     * Supplier 是创建容器的工厂
     */
    @Override
    public Supplier<StringJoiner> supplier() {
        return () -> new StringJoiner(delimiter, prefix, suffix);
    }
}

收集器的每一个组件都是函数,因此我们使用箭头表示,流中的值用圆圈表示,最终生成的值用椭圆表示。收集操作一开始,Supplier 先创建出新的容器。

Supplier

收集器的 accumulator 的作用和 reduce 操作的第二个参数一样,它结合之前操作的结果和当前值,生成并返回新的值。这一逻辑已经在 StringJoiner 的 add 方法中得以实现,直接引用就好了。

accumulator 是一个函数,它将当前元素叠加到收集器

public BiConsumer<StringJoiner, String> accumulator() {
    return StringJoiner::add;
}

这里的 accumulator 用来将流中的值叠加入容器中。

Accumulator.png

combine 方法很像 reduce 操作的第三个方法。如果有两个容器,我们需要将其合并。同样,在前面重构中我们已经实现了该功能,直接使用 StringJoiner.merge 方法就行了。

combiner 合并两个容器

public BinaryOperator<StringJoiner> combiner() {
    return StringJoiner::merge;
}

在收集阶段,容器被 combiner 方法成对合并进一个容器,直到最后只剩一个容器为止(如下图所示)。

Combiner

在使用收集器之前,重构的最后一步将 toString 方法内联到方法链的末端,这就将 StringJoiner 转换成了我们想要的字符串。

Finisher

收集器的 finisher 方法作用相同。我们已经将流中的值叠加入一个可变容器中,但这还不是我们想要的最终结果。这里调用了 finisher 方法,以便进行转换。在我们想创建字符串等不可变的值时特别有用,这里容器是可变的。

为了实现 finisher 方法,只需将该操作代理给已经实现的 toString 方法即可。

finisher 方法返回收集器操作的最终结果

public Function<StringJoiner, String> finisher() {
    return StringJoiner::toString();
}

从最后剩下的容器中得到最终结果。

关于收集器,还有一点一直没有提及,那就是特征特征是一组描述收集器的对象,框架可以适当对其优化。characteristic 方法定了特征。

此时,finisher 方法不需要对容器做任何操作。更正式地说,此时的 finisher 方法其实就是 identity 函数:它返回传入参数的值。如果这样,收集器就展现了 IDENTITY_FINISH 的特征,需要使用 characteristics 方法声明。

对收集器的归一化处理

就像之前看到的那样,定制收集器其实不难,但如果你想为自己领域内的类定制一个收集器,不妨考虑一下其他替代方案。最容易想到的方案是构建若干个集合对象,作为参数传给领域内类的构造函数。如果领域内的类包含多种集合,这种方式又简单又适用。

reducing 是一种定制收集器的简便方式

String result = 
        artist.stream()
                .map(Artist::getName)
                .collect(Collectors.reducing(
                    new StringJoiner(", ", "[", "]"),
                    name -> new StringJoiner(", ", "[", "]").add(name),
                    StringJoiner::merge))
                    .toString();

这种方式非常低效,这也是我们要定制收集器的原因之一。

练习

  1. 方法引用

    • [x] 转换大写的 map 方法;

      Stream.of("hello", "world").map(String::toUpperCase);
      
    • [x] 使用 reduce 实现 count 方法;

      Collectors.reducing(0L, e -> 1L, Long::sum);
      
    • [ ] 使用 flatMap 连接列表。

      
      
  2. 收集器

    • [x] 找出名字最长的艺术家,分别使用收集器和 reduce 高阶函数实现。然后对比二者的异同:哪一种方式写起来更简单,哪一种凡是读起来更简单?以下面的参数为例,该方法的正确返回值为 “Stuart Sutcliffe”:

      Stream<String> names = Stream.of("John Lennon", "Paul McCartney",
           "George Harrison", "Ringo Starr", "Pete Best", "Stuart Sutcliffe");
      
      private static Comparator<Artist> byNameLength = comparing(artist -> artist.getName().length());
      
         public static Artist byReduce(List<Artist> artists) {
             return artists.stream().reduce((acc, artist) -> {
                 return (byNameLength.compare(acc, artist) >= 0) ? acc : artist;
             }).orElseThrow(RuntimeException::new);
         }
      
         public static Artist byCollecting(List<Artist> artists) {
             return artists.stream().collect(Collectors.maxBy(byNameLength)).orElseThrow(RuntimeException::new);
         }
      
    • [x] 假设一个元素为单词的流,计算每个单词出现的次数。假设输入如下,则返回值为一个形如[John -> 3, Paul -> 2, George -> 1] 的 Map:

      Stream<String> names = Stream.of("John", "Paul", "George", "John", 
                                      "Paul", "John");
      
       public static Map<String, Long> countWords(Stream<String> names) {
           return names.collect(groupingBy(name -> name, Collectors.counting()));
       }
      
    • [x] 用一个定制的收集器实现 Collectors.groupingBy 方法,不需要提供一个下游收集器,只需实现一个最简单的即可。别看 JDK 的源码,这是作弊!提示:可从下面这行代码开始:

      public class GroupingBy<T, K> implements Collector<T, Map<K, List<T>>, Map<K, List<T>>>
      

      这是一个进阶练习,不妨最后再尝试这道习题。

      public class GroupingBy<T, K> implements Collector<T, Map<K, List<T>>, Map<K, List<T>>> {
         private final static Set<Characteristics> characteristics = new HashSet<>();
      
         static {
             characteristics.add(Characteristics.IDENTITY_FINISH);
         }
      
         private final Function<? super T, ? extends K> classifier;
      
         public GroupingBy(Function<? super T, ? extends K> classifier) {
             this.classifier = classifier;
         }
      
         @Override
         public Supplier<Map<K, List<T>>> supplier() {
             return HashMap::new;
         }
      
         @Override
         public BiConsumer<Map<K, List<T>>, T> accumulator() {
             return (map, ele) -> {
                 K key = classifier.apply(ele);
                 List<T> elements = map.computeIfAbsent(key, k -> new ArrayList<>());
                 elements.add(ele);
             };
         }
      
         @Override
         public BinaryOperator<Map<K, List<T>>> combiner() {
             return (left, right) -> {
                 right.forEach((key, value) -> {
                     left.merge(key, value, (leftValue, rightValue) -> {
                         leftValue.addAll(rightValue);
                         return leftValue;
                     });
                 });
                 return left;
             };
         }
      
         @Override
         public Function<Map<K, List<T>>, Map<K, List<T>>> finisher() {
             return map -> map;
         }
      
         @Override
         public Set<Characteristics> characteristics() {
             return characteristics;
         }
      }
      
  3. 改进Map

    使用 Map 的 computeIfAbsent 方法高效计算斐波那契数列。这里的 “高效” 是指避免将那些较小的序列重复计算多次。

    public class Fibonacci {
        private final Map<Integer, Long> cache;
        
        public Fibonacci() {
            cache = new HahMap<>();
            cache.put(0, 0L);
            cache.put(1, 1L);
        }
        
        public long fibonacci(int x) {
            return cache.computeIfAbsent(x, n -> fibonacci(n - 1) + fibonacci(n - 2));
        }
    }
    

数据并行化

并行和并发

并发是两个任务共享时间段,并行则是两个任务在同一个时间发生,比如运行在多核 CPU 上。如果一个程序要运行两个任务,并且只有一个 CPU 给它们分配了不同的时间片,那么这就是并发,而不是并行。两者区别如下图所示。

并行与并发的区别

并行化是指为缩短任务执行时间,将一个任务分解成几部分,然后并行执行。这和顺序执行的任务的任务量是一样的,区别就像用更多的马来拉车,花费的时间自然减少了。实际上,和顺序执行相比,并行化执行任务时,CPU承载的工作量更大。

数据并行化:数据并行化是指将数据分成块,为每块数据分配单独的处理单元。还是拿马拉车那个例子打比方,就像从车里取出一些货物,放到另一辆车上,两辆马车都沿着同样的路径到达目的地。

当需要在大量数据上执行同样的操作时,数据并行化很管用。它将问题分解为可在多块数据上求解的形式,然后对每块数据执行运算,最后将各数据块上得到的结果汇总,从而获得最终答案。

并行化流操作

并行化操作流只需改变一个方法调用。如果已经有一个 Stream 对象,调用它的 paraller方法就能让其拥有并行操作的能力。如果想从一个集合类创建一个流,调用 parallelStream就能立即获得一个拥有并行能力的流。

并行化计算专辑曲目长度

public int parallelArraySum() {
    return albums.parallelStream()
                .flatMap(Album::getTracks)
                .mapToInt(Track::getLength)
                .sum();
}

性能

影响并行流的主要因素有5个,依次分析如下:

  • 数据大小

    输入数据的大小会影响并行化处理对性能的提升。

  • 源数据结构

    每个管道的操作都基于一些初始数据源,通常是集合,将不同的数据源分割相对容易,这里的开销影响了在管道中并行处理数据时到底能带来多少性能上的提升。

  • 装箱

    处理基本类型比处理装箱类型要快。

  • 核的数量

    极端情况下,只有一个核,因此完全没必要并行化。显然,拥有的核越多,获得潜在性能提升的幅度就越大。在实际中,核的数量不单指你的机器上有多少核,更是指运行时你的机器能使用多少核。这也就是说同时运行的其它进程,或者线程相关性(强制线程在某些核或 CPU 上运行)会影响性能。

  • 单元处理开销

    比如数据大小,这是一场并行执行花费时间和分解合并操作开销之间的战争。花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显。

并行求和

private int addIntegers(List<Integer> values) {
    return values.parallelStream()
                .mapToInt(i -> i)
                .sum();
}

在底层,并行流还是沿用了 for/join 框架。fork 递归式地分解问题,然后每段并行执行,最终由 join 合并结果,返回最后的值。

使用 fork/join 分解合并问题

假设并行流将我们的工作分解开,在一个四核的机器上并行执行。

  1. 数据被分成四块。
  2. 如上图所示,计算工作在每个线程里并行执行。这包括将每个 Integer 对象映射为 int 值,然后在每个线程里将 1/4 的数字相加。理想情况下,我们希望在这里花的时间越多越好,因为这里时并行操作的最佳场所。
  3. 然后合并结果。就是sum操作,但这也可能是reducecollect或其它终结操作。

根据问题的分解方式,初始化的数据源的特性变得尤其重要,它影响了分解的性能。直观上看,能重复将数据结构对半分解的难易程度,决定了分解操作的快慢。能对半分解同时意味着待分解的值能够被等量地分解。

我们可以根据性能的好坏,将核心类库提供的通用数据结构分成以下 3 组。

  • 性能好

    ArrayList、数组或 IntStream.range,这些数据结构支持随机读取,也就是说它们能轻而易举地被任意分解。

  • 性能一般

    HashSet、TreeSet,这些数据结构不易公平地被分解,但是大多数时候分解是可能的。

  • 性能差

    有些数据结构难易分解,比如,可能要花 O(N)的时间复杂度来分解问题。其中包括 LinkedList,对于半分解太难了。还有 Streams.iterateBufferedReader.lines,它们长度未知,因此很难预测该在哪里分解。

初始的数据结构影响巨大。举一个极端的例子,对比对 10 000 个整数并行求和,使用 ArrayList 要比使用 LinkedList 快 10 倍。这不是说业务逻辑的性能情况也会如此,只是说明了数据结构对于性能的影响之大。形如使用 LinkedList 这样难于分解的数据结构并行运行可能更慢。

单独操作每一块的种类时,可以分成两种不同的操作:无状态的有状态的

如果能避开有状态的,选用无状态操作,就能获得更好的并行性能。无状态操作包括 mapfilterflatMap,有操作包括 sorteddistinctlimit

并行化数组操作

这些操作都在工具类 Arrays 中。

方法名 操作
parallelPrefix 任意给定一个函数,计算数组的和
parallelSetAll 使用 Lambda 表达式更新数组元素
parallelSort 并行化对数组元素排序

使用 Lambda 表达式编写并发程序

Future

构建复杂并行操作的另外一种方案是使用 Future。Future 像一张欠条,方法不是返回一个值,而是返回一个 Future 对象,该对象第一次创建时没有值,但以后能拿它 "换回" 一个值。

调用 Future 对象的 get方法获取值,它会阻塞当前线程,直到返回值。

使用 Future 从外部网站下载专辑信息

@Override
public Album lookupByName(String albumName) {
    Future<Credentials> trackLogin = loginTo("track");
    Future<Credentials> artistLogin = loginTo("artist");
    try {
        Future<List<Track>> tracks = lookupTracks(albumName, trackLogin.get());
        Future<List<Track>> artists = lookupArtists(albumName, artistLogin.get());
    } catch(InterruptedException | ExecutionException e) {
        throw new AlbumLookupException(e.getCause());
    }
}

如果要将 Future 对象的结果传给其它任务,会阻塞当前线程的执行。这会成为一个性能问题,任务不是平行执行了,而是串行执行了。

这意味着在登录两个服务之前,我们无法启动任何查找任务。没必要这样:lookupTracks 只需要自己的登录凭证,lookupArtists 也是一样。我们将理想的行为图描述出来。

查询操作不必等待所有操作完成后才能执行

可以将对 get 的调用放到 lookupTracks 和 lookupArtist 方法的中间,这能解决问题,但是代码丑陋,而且无法再多次调用之间重用登录凭证。

我们真正需要的是不必调用 get 方法阻塞当前线程,就能操作 Future 对象返回的结果。我们需要将 Future 和回调结合起来使用。

CompletableFuture

这些问题的解决之道 CompletableFuture,它结合了 Future 对象打欠条的注意和使用回调处理事件驱动的任务。其要点是可以组合不同的实例,而不用担心末日金字塔问题。

使用 CompletableFuture 从外部网站下载专辑信息

public Album lookupByName(String albumName) {
    CompletableFuture<List<Artist>> artistLookup 
            = loginTo("artist")
            .thenCompose(artistLogin -> lookupArtists(albumName, artistLogin));
    
    return loginTo("track")
            .thenCompose(trackLogin -> lookupTracks(albumName, trackLogin))
            .themCombine(artistLookup, (tracks, artist)
                        -> new Album(albumName, tracks, artists))
        .join();
}

loginTo、lookupArtists 和 lookupTracks 方法均返回 CompletableFuture ,而不是 Future。CompletableFuture API 的技巧是注册 Lambda 表达式,并且把高阶函数链接起来。方法不同,但道理和 Stream API 的设计是相同的。

在调用最终的方法之前,无法保证 CompletableFuture 对象已经生成结构。CompletableFuture 对象实现了 Future 接口,可以调用 get 方法获取值。CompletableFuture 对象包含 join 方法,我们在上面调用了该方法,它的作用和 get 方法是一样的,而且它没有使用 get 方法时令人倒胃口的检查异常。

CompletableFuture 的常用情景之一是异步执行一段代码,该段代码计算并返回一个值。有一个工厂方法 supplyAsnc,用来创建 CompletableFuture 实例。

异步创建 CompletableFuture 实例的示例代码

CompletableFuture<Track> lookupTrack(String id) {
    return CompletableFuture.supplyAsync(() -> {
        // 这里会做一些繁重的工作 1
       // ...
        return track; // 2
    }, service); // 3
}

supplyAsync 方法接受一个 Supplier 对象作为参数,然后执行它。这里的要点是要执行一些耗时的任务,同时不会阻塞当前线程 - 这就是方法名中 Async 的含义。3 处的返回值用来完成 COMP了tableFuture。在 2 处我们提供了一个叫做 service 的 Executor,告诉 COMP了tableFuture 对象在哪里执行任务。如果没有提供 Executor,就会使用相同的 fork/join 线程池并行执行。

CompletableFuture 提供了 completeExceptionnally,用于处理异常情况。该方法可以视作 complete 方法的备选项,但不能同时调用 complete 和 completeExceptionally 方法。

出现错误时完成 Future

future.completeExceptionanlly(new AlbumLookupException("Unable to find " + name));

CompletableFuture 实例:

  • 如果想在链的末端执行一些代码而不返回任何值,比如 Comsuper 和 Runnable,那就看看 thenAccept 和 thenRun 方法。
  • 可使用 thenApply 方法转换 CompletableFuture 对象的值,有点像 Stream 的 map 方法。
  • 在 CompletableFuture 对象出现异常时,可使用 exceptionally 方法恢复,可以将一个函数注册到该方法,返回一个代替值。
  • 如果你想有一个 map,包含异常情况和正常,请使用 handle 方法。
  • 要找出 CompletableFuture 对象到底出了什么问题,可使用 isDone 和 isCompletedExceptionally 方法辅助调查。

CompletableFuture 对于处理并发任务非常有用,但这并不是唯一的方法。

响应式编程

CompletableFuture 背后的概念可以从单一的返回值推广到数据流,这就是响应式编程。响应式编程其实是一种声明式编程方法,它让程序员以自动的变化和数据流来编程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • [TOC] 简介 为何修改Java java8的改变是为了让代码在多核CPU上高效运行。 面向对象编程 vs. 函...
    挑灯剑客阅读 591评论 0 0
  • 一如飘飞的长发 散散落落 ...
    姜妮_向华阅读 188评论 0 0
  • 最近,几所学校的重整组合如火如荼地进行着。新整合后的学校面临的第一大问题就是这一届的新生生源问题。 原老三中的几位...
    楼顶上的小蚂蚁阅读 313评论 0 0
  • 不慌不张,不离不弃 在你的方圆几里 坚守着,努力着 直到某一天遇见你 我能够说出那句: 谢谢你薛之谦,让我成为更好...
    谦之友阅读 485评论 0 0