Flink 1.19 Watermark源码分析

Watermark

  • watermark用于指示事件的进展,一个时间戳为T的watermark表示事件时间已经推进到了T,而且不会有比T更早的事件出现
  • watermark一般在数据流的源头创建,并通过流和操作符传播
  • 当到达流的末尾是,会发出一个Long.MAX_VALUE表示终止水印


    image.png

前提

所有算子包括Source和Sink都是逻辑数据流图,需要StreamGraphGeneratorJobGraphGenerator生成物理执行计划,根据information通过translator将算子转为operator。也就是说,flink job的编码是两条线,一条明线,即用户编写的逻辑,一条暗线是flink内部真正封装了算子行为,水位线传递,Checkpoint生成逻辑的物理执行计划。

watermark的源头

  • TimestampsAndWatermarks是source从事件提取时间戳生成watermark的重要接口
  • WatermarkUpdateListener接口用于监听
  • createMainOutput 创建ReaderOutput,用于Source Reader输出数据,包含内部处理时间戳逻辑和watermark生成逻辑
  • startPeriodicWatermarkEmitsstopPeriodicWatermarkEmits 启动和停止周期性watermark生成
  • emitImmediateWatermark 立即发出watermark
  • 两个工厂方法createProgressiveEventTimeLogiccreateNoOpEventTimeLogic ,会根据 WatermarkStrategy 生成TimestampsAndWatermarks实例
    • createProgressiveEventTimeLogic 创建一个符合渐进式事件时间逻辑的TimestampsAndWatermarks 实例,用于流处理
    • createNoOpEventTimeLogic 创建一个无操作的TimestampsAndWatermarks 实例,用于批处理,跳过watermark生成逻辑
image.png
  • 接口TimestampsAndWatermarksProgressiveTimestampsAndWatermarksNoOpTimestampsAndWatermarks实现,他们唯一的创建方式是调用TimestampsAndWatermarks的工程方法创建
  • 那么,是谁创建了TimestampsAndWatermarks了呢,是SourceOperator,Source在物理计划的实现。在SourceOperatoropen方法创建了eventTimeLogic,可以看到watermarkStrategy代表用户指定的生成策略,我们指定的BoundedOutOfOrdernessWatermarks策略就是在这里被传递进物理执行计划的
image.png
  • 话分两头,SourceOperator需要具有将record和watermark输出的能力,Source的输出由ReaderOutput 定义,他实现了SourceOutput 能力,是SourceReader和下游算子交互的核心接口之一
  • 数据记录发送
    • collect(T record) 发送不带时间戳的记录
    • collect(T record, long timestamp) 发送带时间戳的记录
  • watermark发送,emitWatermark(Watermark watermark) 发送watermark,标记事件进展
  • idle状态管理,markIdle() 标记当前输出为idle状态,表示下游算子无需等待该输出的watermark
  • split管理
    • createOutputForSplit(String splitId) 为特定分片创建一个独立的SourceOutput ,用于设置各个分片的创建逻辑,比如创建水位线
    • releaseOutputForSplit(String splitId) 释放特定分片的SourceOutput
image.png
  • SourceOperator 中,output就是成员对象currentMainOutput,他被上文eventTimeLogic所创建,eventTimeLogicTimestampsAndWatermarks类型,当我们使用流处理时,eventTimeLogic的类型是ProgressiveTimestampsAndWatermarks
  • ProgressiveTimestampsAndWatermarks实现了createMainOutput方法,在这个方法中currentMainOutput被赋值为StreamingReaderOutput
  • StreamingReaderOutput继承自SourceOutputWithWatermarksSourceOutputWithWatermarks实现了SourceOutput
  • 现在我们就知道了一些事情,SourceOperator有两个成员变量,类型为TimestampsAndWatermarksReaderOutputTimestampsAndWatermarks用于创建流处理或者批处理的Watermark,根据用户指定watermark生成策略激发新的watermark,以及创建ReaderOutputReaderOutput用于向下游发送record和watermark

watermark生成

  • WatermarkGenerator有两个方法onEventonPeriodicEmit
  • onEvent表示record到达时,检查或者记住这个时间戳,参数里包含WatermarkOutput,可以决定是否发出watermark
  • onPeriodicEmit 周期性发射水印
  • BoundedOutOfOrdernessWatermarks为例,record到达之后会调用onEvent记住record的时间戳,然后当onPeriodicEmit触发的时候发射record时间戳减延迟时间作为watermark

watermark传递(KeyedCoProcessFunction为例)

  • 一个用户可以使用或者继承的算子都有他对应的运行时operator,负责执行用户算子的逻辑,并管理状态,传递watermark。比如KeyedCoProcessFunction对应KeyedCoProcessOperator
  • KeyedCoProcessOperatorimplementTwoInputStreamOperatorTwoInputStreamOperator里面具有接口函数processWatermark1processWatermark2,但是KeyedCoProcessOperator并没有实现这两个的方法,他的默认实现被放到了父类的父类中。
image.png
  • AbstractUdfStreamOperator 是用户定义function对应的operator的基类,KeyedCoProcessOperator继承了AbstractUdfStreamOperator
image.png
  • AbstractUdfStreamOperator继承了AbstractStreamOperatorAbstractStreamOperator是所有stream operator的基类。里面定义了两个方法processWatermark1processWatermark2
    • 处理watermark的逻辑是,处理指定的输入流的watermark,对于这个两输入的算子,就有两个索引,然后更新这个算子的watermark状态,更新时间服务管理器并发射watermark到下游
    • 其中updateWatermark是关键的方法,他会遍历所有输入方向的watermark,然后取其中小的watermark,更新为这个算子的combine watermark。如果所有输入都是idle,那这个算子也会变为idle状态
    • 如果成功更新了combine watermark,会将这个新的combine watermark更新时间管理服务以及发射这个watermark。这涉及两个东西InternalTimeServiceManagerOutput<StreamRecord<OUT>>
    • InternalTimeServiceManager 太大了,就不说了
    • 这里的Output 其实是WatermarkOutput,他有多种实现,有非链式算子输出和链式算子输出
      • 如果是非链式算子会将watermark写入网络缓冲区,通过网络发给下游算子
      • 如果是链式算子会直接将watermark传递给下游算子
image.png
image.png
  • IndexedCombinedWatermarkStatus 通过索引管理多个输入流的PartialWatermark,利用CombinedWatermarkStatus计算全局watermark(也就是这个算子对应的operator),它是对CombinedWatermarkStatus的封装,添加了按索引管理输入流的功能
image.png
  • CombinedWatermarkStatus 组合多个PartialWatermark对象,计算全局watermark值和状态,他维护了一个PartialWatermark列表
    • updateWatermark是更新watermark的关键方法,取小的那个watermark
image.png
image.png
  • PartialWatermark表示单个输入流的watermark状态
    • 每个PartialWatermark跟踪一个输入流的watermark值和idle,active状态。
    • onWatermarkUpdate表示收到watermark变动时应该做什么动作,这个动作在PartialWatermark初始化的时候会指定
image.png
  • WatermarkOutputMultiplexer(watermark输出多路复用器)
    • WatermarkUpdateListener用来监听某个输入流的watermark值,发生变化时会调用onWatermarkUpdate方法
    • WatermarkOutputMultiplexer中当某个输入流watermark更新时,WatermarkOutputMultiplexer会通过WatermarkUpdateListener将更新传递到底层的WatermarkOutput
    • WatermarkOutputMultiplexer将多个输入流的watermark更新组合成一个全局的watermark更新。
    • 支持两种类型的输入:立即输出和延迟输出
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容