上一节主要是大致介绍了下 flink 的窗口组成, 以及如何去划分窗口的. 那么这一篇文章主要是对剩下的内容做一下总结, 说一下如何对窗口内的数据做处理.
Window Function
Window Assigner 的作用是划分窗口的, 而 Window Function 就是对窗口内的数据做处理的一个过程. Flink 提供了 4 种类型的 Window Function, 分别是 ReduceFunction / AggregateFunction / FoldFunction / ProcessWindowFunction. 另外, 这四类还根据计算原理的不同分为增量聚合函数和全量窗口函数. 增量的计算性能比较高, 主要是基于中间状态的计算结果, 窗口中只维护中间结果的状态值.
1. ReduceFunction (增量)
对输入的两个相同类型的元素按照指定的计算方式进行聚合, 通过实现 ReduceFunction 接口就可以在reduce( ) 函数内部进行聚合操作了.
// 将Tuple2 按照 f1 进行 keyBy, 之后将 f0字符合并起来
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
return new Tuple2<>(t1.f0 + t2.f0, t1.f1);
}
});
当然也可以使用匿名函数的方式,写起来会更加简洁.上述代码可以改为:
input.keyBy(x -> x.f1).timeWindow(Time.seconds(10), Time.seconds(1))
.reduce((t1,t2) -> new Tuple2<>(t1.f0 + t2.f0, t1.f1));
2. AggregateFunction (增量)
AggregateFunction 相对于ReduceFunction更加灵活,但是实现起来也更复杂, AggregateFunction有 4 个需要复写的方法, 其中createAccumulator( ) 定义累加器, add( ) 定义数据的添加逻辑, getResult( ) 定义了根据 accumulator 计算结果的逻辑, merge()方法定义合并 accumulator 的逻辑.
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
// 自定义一个AggregateFunciton, 将相同标号 f1 的数据的 f0字符串字段合并在一起
// ("hello", 1L) + ("world", 1L) = ("hello world", 1L)
.aggregate(new MyAggregateFunction());
通过自定义的 MyAggregateFunction() 来实现 AggregateFunction 接口
public static class MyAggregateFunction implements AggregateFunction<Tuple2<String, Long>, String, String>{
@Override
public String createAccumulator() {
// 初始化累加器
return "";
}
@Override
public String add(Tuple2<String, Long> t, String s) {
// 输入数据与累加器的合并
return s + " " +t.f0;
}
@Override
public String getResult(String s) {
// 得到累加器的结果
return s.trim();
}
@Override
public String merge(String s, String acc1) {
// 合并累加器
return s + " " + acc1;
}
}
3. FoldFunction (增量)
FoldFunction定义了如何将窗口中的输入元素与外部的元素合并的逻辑
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1)).fold("flink", (acc, t) ->t.f0 + acc);
FoldFunction在新版本已经被标记@Deprecated了, 建议使用AggregateFunction代替
4. ProcessWindowFunction (全量)
ProcessWindowFunction 相较于其他的 Window Function, 可以实现一些更复杂的计算, 比如基于整个窗口做某些指标计算 或者需要操作窗口中的状态数据和窗口元数据. Flink 提供了 ProcessWindowFunction 这个抽象类, 继承此类就可以实现ProcessWindowFunction, 其中, 必须要实现 process( ) 方法, 这是处理窗口数据的主要方法.还在一下跟窗口数据相关的方法可以有选择的实现.
public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Long, Long>, String, Long, TimeWindow> {
@Override
public void process(Long s, Context context, Iterable<Tuple3<String, Long, Long>> elements, Collector<String> out) throws Exception {
// 统计每个窗口内的所有数据的 f0字段加起来共有多少个单词
// 也就做单个窗口的 wordcount
Long count = 0L;
for (Tuple3<String, Long, Long> element : elements) {
count += element.f0.split(" ").length;
}
out.collect("window: " + context.window() + " word count: " + count);
}
}
5. 增量与全量共同使用
增量聚合函数虽然性能好, 但是灵活性不如全量函数, 例如对窗口状态数据的操作以及对窗口中的元数据信息的获取. 但是如果用 ProcessWindowFunction 去完成一些基础的增量计算相对比较浪费资源, 因此可以两者结合的方式来实现.
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
// 第一个Function为 ReduceFunction, 取窗口的最小值
.reduce((r1, r2) -> {
return r1.f0 < r2.f0 ? r1 : r2;
// 第二个Function为 ProcessWindowFunction, 获取窗口的时间信息
}, new ProcessWindowFunction<Tuple2<Long, Long>, String, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<Tuple2<Long, Long>> elements, Collector<String> out) throws Exception {
out.collect("window: " + context.window());
}
}).print();
Flink 窗口中的其他组件
除了 Window Assigner 和 Window Function外,Flink的窗口中还有 Triger窗口触发器, 其负责判断何时将窗口中的数据取出做计算, flink已经默认为各种类型的窗口实现了 triger. 用户也可以自己手动指定. Evictors 是数据剔除器, 目的是把窗口中的数据按照需求做一定的剔除. Flink也有 API 针对延迟数据做处理, 延迟的数据可以丢弃也可以通过sideOutputLateDate( ) 方法处理.
1. Triger 窗口触发器
EventTimeTrigger: 通过对比 watermark 和窗口 EndTime 确定是否触发窗口
ProcessTimeTrigger: 通过对比 ProcessTime 和窗口 EndTime 确定是否触发窗口
ContinuousEventTimeTrigger: 根据间隔时间周期性触发窗口
ContinuousEventTimeTrigger: 同上, 区别是使用ProcessTime
CountTrigger: 根据接入数量是否超过阈值
DeltaTrigger: 根据计算出来的 Delta 指标是否超过指定的 Threshold
PurgingTrigger: 可以将任意触发器作为参数转换为Purge类型触发器
2. Evictors触发器
CountEvictor: 保持固定数量的数据, 超过的剔除
DeltaEvictor: 通过定义 delta 和 threshold , 计算两个数据之间的 delta 值, 超过则剔除
TimeEvictor: 指定时间间隔, 将当前窗口中的最新元素的时间减去Interval, 然后将小于该结果的数据全部剔除