应用场景
需求:需要按小时统计用户在某个网页上的浏览次数,观察一天内网站活跃度分布情况。
编程思路: 利用flink来计算这个问题的话, 可以采用聚合窗口,将每条采集到的用户浏览日志累加起来, 每隔1小时生成一个窗口输出就可以了。
可能存在的问题: 正常情况下采集到的日志是按照顺序来的, 但是如果遇上数据采集传输时候网络抖动、或者flink程序资源处理数据过慢产生反压,特别是使用Kafka的时候,多个分区之间的数据无法保证有序,就可能出现用户14:58的时候浏览了网页,但是等flink开始处理的时候已经是15:03了,这个时候本应该将这个次数统计到14点这个区间中, 却由于数据“迟到”导致统计到了下一个区间中。
解决方案:
- Watermark : 防止数据乱序 / 指定时间内获取不到全部数据。
- allowLateNess:将窗口关闭时间再延迟一段时间。
- sideOutPut :当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理(最后兜底的操作)。
Watermark是什么
Watermark可以翻译为水位线,他的作用是基于已经收集的消息来估算是否还有消息未到达。
从源码可以看出Watermark本质上是一个时间戳。时间戳反映的是事件发生的时间,而不是事件处理的时间。
Watermark原理
将用读取进入系统的最新事件时间减去固定的时间间隔作为Watermark, 时间间隔为用户外部配置的支持最大延迟到达的时间长度, 理论上不会有时间超过该时间间隔到达,否则就认为是迟到事件或异常事件。
Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。可以把Watermarks理解为一个水位线,这个Watermarks在不断的变化表示水位在升高。Watermark实际上作为数据流的一部分随数据流流动。 当Flink中的运算符接收到Watermarks时,它明白早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达。 这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。
通俗的来讲, 比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。
WaterMark设定方法
设定位置
通常情况下,在接收到Source的数据后,应该立刻生成Watermark,但是也可以在应用简单的Map或者Filter操作后再生成Watermark。如果指定多次Watermark,后面指定的值会覆盖前面的值。WaterMark分类
定期水位线(Periodic Watermark)
周期性地触发Watermark的生成和发送,默认是100ms。每隔N秒自动向流里注入一个Watermark,可以定义一个最大允许乱序的时间,这种比较常用。
在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
标点水位线(Punctuated Watermark)
基于某些事件触发Watermark的生成和发送。基于事件向流里注入一个Watermark,每一个元素都有机会判断是否生成一个Watermark。如果得到的Watermark不为空并且比之前的大,就注入流中。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
迟到事件处理
处理方法:
- 重新激活已经关闭的窗口并重新计算以修正结果(Allowed Lateness)。
- 将迟到事件收集起来另外处理(Side Output)。
- 将迟到事件视为错误消息并丢弃(flink默认)。
Side Output机制:
将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
Allowed Lateness机制:
允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。
总结:
- 窗口window 的作用是为了周期性的获取数据。
- watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
- allowLateNess是将窗口关闭时间再延迟一段时间。
- sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
EventTime+Watermark解决乱序数据的案例
程序说明
首先通过自定义数据源模拟一些带有时间的数据,将所有的数据都聚合到5s一个的Tumbling窗口中, 在将进入到每个窗口的数据上所带的时间戳加起来, 通过最终的输出结果分析进入到每个窗口内参与计算的数据是否符合预期
- 将并行度设置为1, 让所有的数据全部使用1个并发处理, 保证数据处理不会因为多并发导致乱序。
- 设置时间类型为事件事件, 使系统按照数据上所带的时间进行处理
结果解析
outOfOrder = 0, 表示允许数据迟到的时间为0ms, 也就是不允许迟到,乱序的数据就会被丢弃。
(1)第一个5s窗口(0-4s)内参与计算的数据有 0L, 1000L, 2000L, 3000L, 3000L, 4000L。 后面那个迟到的4000L数据就会被丢弃,所以结果为(key,13000)。
(2)第二个5s窗口(5s - 9s)内参与计算的数据有 5000L, 6000L, 6000L, 7000L, 8000L。 后面还有2个迟到的8000L和9000L数据就会被丢弃,所以结果为(key,32000)。
(3)第三个5s窗口同理可算出结果为(key,10000)。outOfOrder = 3000, 表示允许数据迟到的时间为3000ms,也就是乱序的数据如果不超过3000ms到达, 仍然会参与窗口计算。
(1)第一个5s窗口(0-4s)内参与计算的数据有 0L, 1000L, 2000L, 3000L, 3000L, 4000L, 4000L。 后面那个迟到的4000L数据因为没超过3000ms也会参与计算,所以结果为(key,17000)。
(2)第二个5s窗口(5s - 9s)内参与计算的数据有 5000L, 6000L, 6000L, 7000L, 8000L,8000L,9000L。 后面还有2个迟到的8000L和9000L数据因为没超过3000ms也会参与计算,所以结果为(key,49000)。
(3)第三个5s窗口同理可算出结果为(key,10000)。
代码
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import scala.Tuple2;
import java.util.concurrent.TimeUnit;
/**
* Created By qiuzhi. Description: Date: 2020-07-09 Time: 4:29 PM
*
* @author zhanghaichao
* @date 2020/07/09
*/
class SourceOperator implements SourceFunction<Tuple2<String, Long>> {
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
ctx.collect(new Tuple2<>("key", 0L));
ctx.collect(new Tuple2<>("key", 1000L));
ctx.collect(new Tuple2<>("key", 2000L));
ctx.collect(new Tuple2<>("key", 3000L));
ctx.collect(new Tuple2<>("key", 3000L));
ctx.collect(new Tuple2<>("key", 4000L));
ctx.collect(new Tuple2<>("key", 5000L));
// out of ornew Tuple2<>(der);
ctx.collect(new Tuple2<>("key", 4000L));
ctx.collect(new Tuple2<>("key", 6000L));
ctx.collect(new Tuple2<>("key", 6000L));
ctx.collect(new Tuple2<>("key", 7000L));
ctx.collect(new Tuple2<>("key", 8000L));
ctx.collect(new Tuple2<>("key", 10000L));
// out of ornew Tuple2<>(der);
ctx.collect(new Tuple2<>("key", 8000L));
ctx.collect(new Tuple2<>("key", 9000L));
}
@Override
public void cancel() {
}
}
public class OutOfOrderCase {
public static void main(String[] args) throws Exception {
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(new SourceOperator())
// 每个元素都有机会生成watermark
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {
// 设置允许迟到的时间(ms)
private int outOfOrder = 0;
// (key,13000)
// (key,32000)
// (key,10000)
//private int outOfOrder = 3000;
// (key,17000)
// (key,49000)
// (key,10000)
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2._2;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> stringLongTuple2, long l) {
return new Watermark(stringLongTuple2._2 - outOfOrder);
}
}).keyBy(new KeySelector<Tuple2<String,Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {
return stringLongTuple2._1;
}
})
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
.reduce((ReduceFunction<Tuple2<String, Long>>)(stringLongTuple2, t1) -> new Tuple2<>(stringLongTuple2._1,stringLongTuple2._2 + t1._2))
.print();
env.execute("Out of order");
}
}