1. 涉及组件
FlinkKafkaConsumer是用户使用Kafka作为Source进行编程的入口,它有一个核心组件KafkaFetcher,用来消费kafka中的数据,并向下游发送接收到的数据,如果调用了FlinkKafkaConsumer#assignTimestampsAndWatermarks
,还负责WaterMark的发送,WaterMark是本篇文章的重点。
我们先看下KafkaFetcher的组成
- 消费线程,用来构建KafkaConsumer客户端,向Kafka请求指定的分区数据,将获取的批量数据ConsumerRecords放入到Handover,Handerover可以看成一个同步队列,生成一个必须等到被消费后才能再生产
- 任务线程,用来消费Handerover中的数据,将ConsumerRecords反序列化为一条条的数据,然后存储在队列ArrayDeque中,然后同一个循环来消费该队列中的消息,用来做三件事情(看图吧,这里不写了)
- 在创建KafkaFetcher时,会根据watermark的发送间隔,向timeService提交一个定时任务,定时的更新每个partition的watermark,然后取各个partition中最小的watermark,作为任务的候选watermark进行更新,如果更新成功则会向下游发送
2. WaterMark的传播
下面是调用了FlinkKafkaConsumer#assignTimestampsAndWatermarks
之后,KafkaFetcher中管理WaterMark的示意图
- 只是一个Task,该Task消费2个分区
- 更新每个分区的WaterMark:KafkaTopicPartitionStateWithWatermarkGenerator用来执行WatermarkGenerator.onPeriodicEmit方法,并通过多路复用器WatermarkOutputMultiplexer将每个partition生成的WaterMark存储到OutputState中,当新生成的WaterMark大于存储在OutputState中的WaterMark时,则更新OutputState中的WaterMark
-
更新Task WaterMark:通过多路复用器WatermarkOutputMultiplexer遍历所有非IDLE状态的OutputState的Watermark,取最小的作为最新的Task的WaterMark,如果该值大于老的Task WaterMark,则更新并向下游发送
2.1 WaterMark传播可能产生的问题:Window算子不被触发
如图,假设partition1没有数据了,它的watermark就不更新,则Task1由于Task WaterMark得不到更新,不往下面发送WM,而Task2发送WM(30),下游任务接收后,也会取最小,还是10,这样会导致下游的Window计算不会被触发。
解决办法是assignTimestampsAndWatermarks.withIdleness(Duration.ofMinutes(1)),上面的是示例,表示如果某个partition在1分钟内没有数据可供消费了,则将该partition置为IDLE,在更新Task WaterMark将该partition的WaterMark忽略。当所有的partition都IDLE了,则会向下游发送StreamStatus.IDLE事件,接下来发生的事情可以参考flink解析:EventTime与Watermark
2.2 API使用不当产生的问题:丢失数据
final FlinkKafkaConsumer<String> producer = new FlinkKafkaConsumer<>(sourceTopic, new SimpleStringSchema(), properties);
env.addSource(producer).assignTimestampsAndWatermarks(getWatermarkStrategy()));
不是调用的FlinkKafkaConsumer#assignTimestampsAndWatermarks而是调用DataStreamSource#assignTimestampsAndWatermarks,可能会产生数据丢失的问题
- 代码那样写,consumer与assignTimestampsAndWatermarks就是2个operator了,WaterMark直接按照规则往下发了,当40发过去后,20过去就被当成迟到数据了,这需要注意