Java 8 引入了一些为了提高编码的速度与间接性的新特性。其中最重要的特性之一,便是“Lambda 表达式”,它打开了使用java进行函数是编程的大门。Lambda表达式允许直接实现与传递函数,而不需要定义一个额外的(匿名)类。
注意:Flink Java API的所有操作符都支持lambda表达式,然而,不论何时,一个lambda表达式使用了Java的泛型,你需要明确的声明类型信息。
这篇文档接受信号如何使用lambda表达式以及解释了当前的使用限制。
示例与限制
下面的代码展示了如何实现一个简单的map函数,使用lambda表达式将输入进行乘方操作。map函数的输入i与输出不必须要声明类型,因为它们可以通过java编译器推断出来。
env.fromElements(1, 2, 3)
// returns the squared i
.map(i -> i*i)
.print();
Flink能自动的从实现类的方法签名中 OUT map(IN value)提取出输出的类型信息,因为 OUT 不是一个泛型,而是一个 Integer(注:通过java 编译器推断)
不幸的是,类似flatMap这样的函数,他们的方法签名 void flatMap(IN value,Collector<OUT> out) 会被java 编译器编译为 void flatMap<IN value,Collector out)。这样flink就不能自动推断输出值的类型了。
这种情况下,Flink会抛出如下异常:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
Otherwise the type has to be specified explicitly using type information.
这种情况下,类型信息需要被明确的指定,否则输出会被看做是Object,导致不高效的序列化。
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.util.Collector;
DataSet<Integer> input = env.fromElements(1, 2, 3);
// collector type must be declared
input.flatMap((Integer number, Collector<String> out) -> {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
})
// provide type information explicitly
.returns(Types.STRING)
// prints "a", "a", "aa", "a", "aa", "aaa"
.print();
相同的问题发生在map函数使用含泛型的返回类型时。方法签名 Tuple2<Integer,Integer> map(Integer value) 会被擦除为: Tuple2 map(Integer value),如:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
env.fromElements(1, 2, 3)
.map(i -> Tuple2.of(i, i)) // no information about fields of Tuple2
.print();
通常来说,这种问题可以通过如下几种方式解决:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
// use the explicit ".returns(...)"
env.fromElements(1, 2, 3)
.map(i -> Tuple2.of(i, i))
.returns(Types.TUPLE(Types.INT, Types.INT))
.print();
// use a class instead
env.fromElements(1, 2, 3)
.map(new MyTuple2Mapper())
.print();
public static class MyTuple2Mapper extends MapFunction<Integer, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
return Tuple2.of(i, i);
}
}
// use an anonymous class instead
env.fromElements(1, 2, 3)
.map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
return Tuple2.of(i, i);
}
})
.print();
// or in this example use a tuple subclass instead
env.fromElements(1, 2, 3)
.map(i -> new DoubleTuple(i, i))
.print();
public static class DoubleTuple extends Tuple2<Integer, Integer> {
public DoubleTuple(int f0, int f1) {
this.f0 = f0;
this.f1 = f1;
}
}