Watermark
- watermark用于指示事件的进展,一个时间戳为T的watermark表示事件时间已经推进到了T,而且不会有比T更早的事件出现
- watermark一般在数据流的源头创建,并通过流和操作符传播
-
当到达流的末尾是,会发出一个Long.MAX_VALUE表示终止水印
image.png
前提
所有算子包括Source和Sink都是逻辑数据流图,需要StreamGraphGenerator
和JobGraphGenerator
生成物理执行计划,根据information通过translator将算子转为operator。也就是说,flink job的编码是两条线,一条明线,即用户编写的逻辑,一条暗线是flink内部真正封装了算子行为,水位线传递,Checkpoint生成逻辑的物理执行计划。
watermark的源头
-
TimestampsAndWatermarks
是source从事件提取时间戳生成watermark的重要接口 -
WatermarkUpdateListener
接口用于监听 -
createMainOutput
创建ReaderOutput,用于Source Reader输出数据,包含内部处理时间戳逻辑和watermark生成逻辑 -
startPeriodicWatermarkEmits
和stopPeriodicWatermarkEmits
启动和停止周期性watermark生成 -
emitImmediateWatermark
立即发出watermark - 两个工厂方法
createProgressiveEventTimeLogic
和createNoOpEventTimeLogic
,会根据WatermarkStrategy
生成TimestampsAndWatermarks
实例-
createProgressiveEventTimeLogic
创建一个符合渐进式事件时间逻辑的TimestampsAndWatermarks
实例,用于流处理 -
createNoOpEventTimeLogic
创建一个无操作的TimestampsAndWatermarks
实例,用于批处理,跳过watermark生成逻辑
-
image.png
- 接口
TimestampsAndWatermarks
被ProgressiveTimestampsAndWatermarks
和NoOpTimestampsAndWatermarks
实现,他们唯一的创建方式是调用TimestampsAndWatermarks
的工程方法创建 - 那么,是谁创建了
TimestampsAndWatermarks
了呢,是SourceOperator
,Source在物理计划的实现。在SourceOperator
的open
方法创建了eventTimeLogic
,可以看到watermarkStrategy
代表用户指定的生成策略,我们指定的BoundedOutOfOrdernessWatermarks
策略就是在这里被传递进物理执行计划的
image.png
- 话分两头,
SourceOperator
需要具有将record和watermark输出的能力,Source的输出由ReaderOutput
定义,他实现了SourceOutput
能力,是SourceReader
和下游算子交互的核心接口之一 - 数据记录发送
-
collect(T record)
发送不带时间戳的记录 -
collect(T record, long timestamp)
发送带时间戳的记录
-
- watermark发送,
emitWatermark(Watermark watermark)
发送watermark,标记事件进展 - idle状态管理,
markIdle()
标记当前输出为idle状态,表示下游算子无需等待该输出的watermark - split管理
-
createOutputForSplit(String splitId)
为特定分片创建一个独立的SourceOutput
,用于设置各个分片的创建逻辑,比如创建水位线 -
releaseOutputForSplit(String splitId)
释放特定分片的SourceOutput
-
image.png
-
SourceOperator
中,output就是成员对象currentMainOutput
,他被上文eventTimeLogic
所创建,eventTimeLogic
是TimestampsAndWatermarks
类型,当我们使用流处理时,eventTimeLogic
的类型是ProgressiveTimestampsAndWatermarks
-
ProgressiveTimestampsAndWatermarks
实现了createMainOutput
方法,在这个方法中currentMainOutput
被赋值为StreamingReaderOutput
-
StreamingReaderOutput
继承自SourceOutputWithWatermarks
,SourceOutputWithWatermarks
实现了SourceOutput
- 现在我们就知道了一些事情,
SourceOperator
有两个成员变量,类型为TimestampsAndWatermarks
和ReaderOutput
,TimestampsAndWatermarks
用于创建流处理或者批处理的Watermark,根据用户指定watermark生成策略激发新的watermark,以及创建ReaderOutput
,ReaderOutput
用于向下游发送record和watermark
watermark生成
-
WatermarkGenerator
有两个方法onEvent
和onPeriodicEmit
-
onEvent
表示record到达时,检查或者记住这个时间戳,参数里包含WatermarkOutput
,可以决定是否发出watermark -
onPeriodicEmit
周期性发射水印 - 以
BoundedOutOfOrdernessWatermarks
为例,record到达之后会调用onEvent
记住record的时间戳,然后当onPeriodicEmit
触发的时候发射record时间戳减延迟时间作为watermark
watermark传递(KeyedCoProcessFunction
为例)
- 一个用户可以使用或者继承的算子都有他对应的运行时operator,负责执行用户算子的逻辑,并管理状态,传递watermark。比如
KeyedCoProcessFunction
对应KeyedCoProcessOperator
。 -
KeyedCoProcessOperator
implementTwoInputStreamOperator
,TwoInputStreamOperator
里面具有接口函数processWatermark1
和processWatermark2
,但是KeyedCoProcessOperator
并没有实现这两个的方法,他的默认实现被放到了父类的父类中。
image.png
-
AbstractUdfStreamOperator
是用户定义function对应的operator的基类,KeyedCoProcessOperator
继承了AbstractUdfStreamOperator
。
image.png
-
AbstractUdfStreamOperator
继承了AbstractStreamOperator
,AbstractStreamOperator
是所有stream operator的基类。里面定义了两个方法processWatermark1
和processWatermark2
- 处理watermark的逻辑是,处理指定的输入流的watermark,对于这个两输入的算子,就有两个索引,然后更新这个算子的watermark状态,更新时间服务管理器并发射watermark到下游
- 其中
updateWatermark
是关键的方法,他会遍历所有输入方向的watermark,然后取其中小的watermark,更新为这个算子的combine watermark。如果所有输入都是idle,那这个算子也会变为idle状态 - 如果成功更新了combine watermark,会将这个新的combine watermark更新时间管理服务以及发射这个watermark。这涉及两个东西
InternalTimeServiceManager
和Output<StreamRecord<OUT>>
-
InternalTimeServiceManager
太大了,就不说了 - 这里的
Output
其实是WatermarkOutput
,他有多种实现,有非链式算子输出和链式算子输出- 如果是非链式算子会将watermark写入网络缓冲区,通过网络发给下游算子
- 如果是链式算子会直接将watermark传递给下游算子
image.png
image.png
-
IndexedCombinedWatermarkStatus
通过索引管理多个输入流的PartialWatermark
,利用CombinedWatermarkStatus
计算全局watermark(也就是这个算子对应的operator),它是对CombinedWatermarkStatus
的封装,添加了按索引管理输入流的功能
image.png
-
CombinedWatermarkStatus
组合多个PartialWatermark
对象,计算全局watermark值和状态,他维护了一个PartialWatermark
列表-
updateWatermark
是更新watermark的关键方法,取小的那个watermark
-
image.png
image.png
-
PartialWatermark
表示单个输入流的watermark状态- 每个
PartialWatermark
跟踪一个输入流的watermark值和idle,active状态。 -
onWatermarkUpdate
表示收到watermark变动时应该做什么动作,这个动作在PartialWatermark
初始化的时候会指定
- 每个
image.png
-
WatermarkOutputMultiplexer
(watermark输出多路复用器)-
WatermarkUpdateListener
用来监听某个输入流的watermark值,发生变化时会调用onWatermarkUpdate
方法 - 在
WatermarkOutputMultiplexer
中当某个输入流watermark更新时,WatermarkOutputMultiplexer
会通过WatermarkUpdateListener
将更新传递到底层的WatermarkOutput
-
WatermarkOutputMultiplexer
将多个输入流的watermark更新组合成一个全局的watermark更新。 - 支持两种类型的输入:立即输出和延迟输出
-
image.png