使用 Flink Tuples
当你使用类似于 groupBy, join, 或者 keyBy 算子时,Flink 提供了多种用于在你的数据集上选择 key 的方法。你可以使用 key 选择函数,如下:
// Join movies and ratings datasets
movies.join(ratings)
// Use movie id as a key in both cases
.where(new KeySelector() {
@Override
public String getKey(Movie m) throws Exception {
return m.getId();
}
})
.equalTo(new KeySelector() {
@Override
public String getKey(Rating r) throws Exception {
return r.getMovieId();
}
})
你甚至可以指定 POJO 类型中一个 field 的名字:
movies.join(ratings)
// Use same fields as in the previous example
.where("id")
.equalTo("movieId")
但是如果你现在使用的是 Flink 元组类型(tuple types)的数据,你可以简单地指定将要作为 key 的字段在元组中的位置:
DataSet> movies = ...
DataSet> ratings = ...
movies.join(ratings)
// Specify fields positions in tuples
.where(0)
.equalTo(1)
这种方法在 Flink 中将会获得最佳的性能,但是可读性方面呢?这是不是意味着你的代码看起来像下面那样:
DataSet> result = movies.join(ratings)
.where(0)
.equalTo(0)
.with(new JoinFunction, Tuple2,
Tuple3>() {
// What is happening here?
@Override
public Tuple3 join(Tuple2 first,
Tuple2 second) throws Exception {
// Some tuples are joined with some other tuples and some fields are returned???
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
在这种情况下,提高可读性的常见方法是创建一个继承自 TupleX 的类,并且实现其中的 getters 和 setters。下面是 Flink Gelly 类库中 Edge 类的实现,其中有三个 fileds,所以它直接继承了 Tuple3 类:
public class Edge extends Tuple3 {
public Edge(K source, K target, V value) {
this.f0 = source;
this.f1 = target;
this.f2 = value;
}
// Getters and setters for readability
public void setSource(K source) {
this.f0 = source;
}
public K getSource() {
return this.f0;
}
// Also has getters and setters for other fields
...
}
重用 Flink 对象
另外一种可以提升 Flink 应用程序性能的方法是在用户自定义函数返回数据时使用可变对象(mutable objects),请看看下面的例子:
stream
.apply(new WindowFunction, String, TimeWindow>() {
@Override
public void apply(String userName, TimeWindow timeWindow,
Iterable iterable,
Collector> collector) throws Exception {
long changesCount = ...
// A new Tuple instance is created on every execution
collector.collect(new Tuple2<>(userName, changesCount));
}
}
正如你所看到的,在我们每次调用 apply 函数的时候,我们都会创建一个 Tuple2 类型的实例,这将会给垃圾回收造成很大的压力。解决这个问题的一种方法就是反复使用相同的实例:
stream
.apply(new WindowFunction, String, TimeWindow>() {
// Create an instance that we will reuse on every call
private Tuple2 result = new Tuple<>();
@Override
public void apply(String userName, TimeWindow timeWindow,
Iterable iterable,
Collector> collector) throws Exception {
long changesCount = ...
// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Auto-boxing!! A new Long value may be created
result.f1 = changesCount;
// Reuse the same Tuple2 object
collector.collect(result);
}
}
上面的代码性能会好些。虽然我们在每次调用的时候只创建了一个 Tuple2 实例,但是我们还间接地创建了 Long 类型的实例。为了解决这个问题, Flink 内部提供了一系列 value classes,比如:IntValue, LongValue, StringValue, FloatValue 等。这些类的重点是为内置类型提供了可变版本,所以我们可以在用户自定义函数中重用这些类型,下面就是如何使用的例子:
stream
.apply(new WindowFunction, String, TimeWindow>() {
// Create a mutable count instance
private LongValue count = new IntValue();
// Assign mutable count to the tuple
private Tuple2 result = new Tuple<>("", count);
@Override
// Notice that now we have a different return type
public void apply(String userName, TimeWindow timeWindow,
Iterable iterable,
Collector> collector) throws Exception {
long changesCount = ...
// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Update mutable count value
count.setValue(changesCount);
// Reuse the same tuple and the same LongValue instance
collector.collect(result);
}
}
上面这些使用习惯在 Flink 类库中被普遍使用,比如 Flink Gelly。
使用函数注解
另一种优化 Flink 应用程序的方法是提供一些关于用户自定义函数如何对输入数据进行处理的信息。由于 Flink 无法解析和理解你的代码,所以你提供一些关键的信息将会帮助 Flink 创建一个更加高效的执行计划。我们可以使用三种注解:
@ForwardedFields – 指定输入数据中哪些字段保持不变并且在输出值中使用(specifies what fields in an input value were left unchanged and are used in an output value.)。
@NotForwardedFields – 指定在输出中相同位置未保留的字段(specifies fields which were not preserved in the same positions in the output.)。
@ReadFields – 指定哪些字段在计算结果的时候用到。你只能指定那些在计算中使用的字段,而不是仅仅将数据拷贝到输出中的字段。(specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.)
我们来看看如何使用 ForwardedFields 注释:
// Specify that the first element is copied without any changes
@ForwardedFields("0")
class MyFunction implements MapFunction, Tuple2> {
@Override
public Tuple2 map(Tuple2 value) {
// Copy first field without change
return new Tuple2<>(value.f0, value.f1 + 123);
}
}
上面的注释意味着输入元组的第一个元素将不会改变,而且在返回元组中同样处在第一个位置。
如果你没有改变一个元素,只不过简单地将它移到不同的位置上,你同样可以使用 ForwardedFields 注释来实现。下面例子中,我们简单地将输入元组的位置互相交换,并且直接返回:
// 1st element goes into the 2nd position, and 2nd element goes into the 1st position
@ForwardedFields("0->1; 1->0")
class SwapArguments implements MapFunction, Tuple2> {
@Override
public Tuple2 map(Tuple2 value) {
// Swap elements in a tuple
return new Tuple2<>(value.f1, value.f0);
}
}
上面例子中提到的注释只能应用到只有一个输入参数的函数中,比如 map 或者 flatMap。如果你有两个输入参数的函数,你可以分别使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 注释来为第一和第二个参数指定一些信息。
下面我们使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 注释来为实现 JoinFunction 接口的类指定相关的信息:
// Two fields from the input tuple are copied to the first and second positions of the output tuple
@ForwardedFieldsFirst("0; 1")
// The third field from the input tuple is copied to the third position of the output tuple
@ForwardedFieldsSecond("2")
class MyJoin implements JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
})
Flink 同样提供了 NotForwardedFieldsFirst, NotForwardedFieldsSecond, ReadFieldsFirst, 和 ReadFirldsSecond 注释来实现相同的功能。
选择 Join 类型
如果你为 Flink 提供了一些信息,可以使你的 Join 操作更快,在讨论这个是如何工作之前,让我们先了解 Fliink 是如何运行 Join 操作的。
当 Flink 处理批量数据时,集群中的每台机器只存储了部分的数据。为了执行 Join 操作, Apache Flink 需要找到两个数据集所有 key 相同的数据。为了做到这一点,Flink 首先必须将两个数据集拥有相同 key 的数据放在同一台机器上。这里有两种实现策略:
Repartition-Repartition strategy:在这种场景下,Join 的两个数据集分别对它们的 key 使用相同的分区函数进行分区,并经过网络发送数据。这就意味着如果数据集非常大,这将花费相当一部分时间将数据分发出去。
Broadcast-Forward strategy:这种场景下,大的数据集R不做处理,另一个比较小的数据集S将全部复制到集群中所有拥有R的一部分数据的机器上。
如果你使用一个比较小的数据集和一个比较大的数据集进行 join 操作,你可以使用 Broadcast-Forward 策略,这个很容易实现:
ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)
这种写法表示第一个数据集要比第二个数据集小的多。
Flink 支持的其他 join 提示有以下几种:
BROADCAST_HASH_SECOND – 表示第二个数据集比较小
REPARTITION_HASH_FIRST – 表示第一个数据集比较小
REPARTITION_HASH_SECOND – 表示第二个数据集有点小
REPARTITION_SORT_MERGE – 表示重新分区两个数据集并使用排序和合并策略(sorting and merging strategy)
OPTIMIZER_CHOOSES – Flink 优化器将决定如何连接数据集