Map: 对于简单的map(i -> i * i) flink 可以猜测其 类型。复杂的则需要指定return type,或者构造一个MapFunction,或者extends 自 Tuple2<Integer, Integer>。
flatMap:对于flatMap 的支持是无法猜测出来 类型的,必须通过returns(Types.STRING) 指定具体的返回值类型。
package myflink.learn.lambda;
/**
* Created by:
* date: 2019-02-17.
*/
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
// https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/java_lambdas.html#java-lambda-expressions
@Slf4j
@ToString
public class LambdaDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3).map(i -> i * i).print();
DataSet<Integer> input = env.fromElements(1, 2, 3);
input.flatMap((Integer number, Collector<String> out) -> {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
})
.returns(Types.STRING)
.print();
// env.fromElements(1, 2, 3)
// .map(i -> Tuple2.of(i, i)) // no information about fields of Tuple2
// .print(); // Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing.
// // In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
// // An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface.
// // Otherwise the type has to be specified explicitly using type information.
env.fromElements(1, 2, 3)
.map(i -> Tuple2.of(i, i))
.returns(Types.TUPLE(Types.INT, Types.INT))
.print();
// 或者 使用一个Mapper
env.fromElements(1, 2, 3)
.map(new MyTuple2Mapper())
.print();
// 或者 使用一个 匿名类
env.fromElements(1, 2, 3).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer value) throws Exception {
return Tuple2.of(value, value);
}
}).print();
// or in this example use a tuple subclass instead
env.fromElements(1, 2, 3)
.map(i -> new DoubleTuple(i, i))
.print();
}
public static class DoubleTuple extends Tuple2<Integer, Integer> {
// 必须加上 无参构造函数,newInstance 时需要
/*
Caused by: java.lang.NoSuchMethodException: myflink.learn.lambda.LambdaDemo$DoubleTuple.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
*/
public DoubleTuple() {
}
public DoubleTuple(int f0, int f1) {
this.f0 = f0;
this.f1 = f1;
}
}
}