第四章 flink特性 - 流计算乱序延迟数据问题(Watermark)

应用场景

需求:需要按小时统计用户在某个网页上的浏览次数,观察一天内网站活跃度分布情况。

编程思路: 利用flink来计算这个问题的话, 可以采用聚合窗口,将每条采集到的用户浏览日志累加起来, 每隔1小时生成一个窗口输出就可以了。

可能存在的问题: 正常情况下采集到的日志是按照顺序来的, 但是如果遇上数据采集传输时候网络抖动、或者flink程序资源处理数据过慢产生反压,特别是使用Kafka的时候,多个分区之间的数据无法保证有序,就可能出现用户14:58的时候浏览了网页,但是等flink开始处理的时候已经是15:03了,这个时候本应该将这个次数统计到14点这个区间中, 却由于数据“迟到”导致统计到了下一个区间中。

解决方案

  • Watermark : 防止数据乱序 / 指定时间内获取不到全部数据。
  • allowLateNess:将窗口关闭时间再延迟一段时间。
  • sideOutPut :当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理(最后兜底的操作)。

Watermark是什么

Watermark可以翻译为水位线,他的作用是基于已经收集的消息来估算是否还有消息未到达。
从源码可以看出Watermark本质上是一个时间戳。时间戳反映的是事件发生的时间,而不是事件处理的时间。


image.png

Watermark原理

将用读取进入系统的最新事件时间减去固定的时间间隔作为Watermark, 时间间隔为用户外部配置的支持最大延迟到达的时间长度, 理论上不会有时间超过该时间间隔到达,否则就认为是迟到事件或异常事件。

Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。可以把Watermarks理解为一个水位线,这个Watermarks在不断的变化表示水位在升高。Watermark实际上作为数据流的一部分随数据流流动。 当Flink中的运算符接收到Watermarks时,它明白早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达。 这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。

通俗的来讲, 比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

WaterMark设定方法

  • 设定位置
    通常情况下,在接收到Source的数据后,应该立刻生成Watermark,但是也可以在应用简单的Map或者Filter操作后再生成Watermark。如果指定多次Watermark,后面指定的值会覆盖前面的值。

  • WaterMark分类

定期水位线(Periodic Watermark)
周期性地触发Watermark的生成和发送,默认是100ms。每隔N秒自动向流里注入一个Watermark,可以定义一个最大允许乱序的时间,这种比较常用。

在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

标点水位线(Punctuated Watermark)
基于某些事件触发Watermark的生成和发送。基于事件向流里注入一个Watermark,每一个元素都有机会判断是否生成一个Watermark。如果得到的Watermark不为空并且比之前的大,就注入流中。

在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

迟到事件处理

处理方法

  • 重新激活已经关闭的窗口并重新计算以修正结果(Allowed Lateness)。
  • 将迟到事件收集起来另外处理(Side Output)。
  • 将迟到事件视为错误消息并丢弃(flink默认)。

Side Output机制
将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

Allowed Lateness机制
允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

总结

  • 窗口window 的作用是为了周期性的获取数据。
  • watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess是将窗口关闭时间再延迟一段时间。
  • sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

EventTime+Watermark解决乱序数据的案例

程序说明

首先通过自定义数据源模拟一些带有时间的数据,将所有的数据都聚合到5s一个的Tumbling窗口中, 在将进入到每个窗口的数据上所带的时间戳加起来, 通过最终的输出结果分析进入到每个窗口内参与计算的数据是否符合预期

  • 将并行度设置为1, 让所有的数据全部使用1个并发处理, 保证数据处理不会因为多并发导致乱序。
  • 设置时间类型为事件事件, 使系统按照数据上所带的时间进行处理

结果解析

  1. outOfOrder = 0, 表示允许数据迟到的时间为0ms, 也就是不允许迟到,乱序的数据就会被丢弃。
    (1)第一个5s窗口(0-4s)内参与计算的数据有 0L, 1000L, 2000L, 3000L, 3000L, 4000L。 后面那个迟到的4000L数据就会被丢弃,所以结果为(key,13000)。
    (2)第二个5s窗口(5s - 9s)内参与计算的数据有 5000L, 6000L, 6000L, 7000L, 8000L。 后面还有2个迟到的8000L和9000L数据就会被丢弃,所以结果为(key,32000)。
    (3)第三个5s窗口同理可算出结果为(key,10000)。

  2. outOfOrder = 3000, 表示允许数据迟到的时间为3000ms,也就是乱序的数据如果不超过3000ms到达, 仍然会参与窗口计算。
    (1)第一个5s窗口(0-4s)内参与计算的数据有 0L, 1000L, 2000L, 3000L, 3000L, 4000L, 4000L。 后面那个迟到的4000L数据因为没超过3000ms也会参与计算,所以结果为(key,17000)。
    (2)第二个5s窗口(5s - 9s)内参与计算的数据有 5000L, 6000L, 6000L, 7000L, 8000L,8000L,9000L。 后面还有2个迟到的8000L和9000L数据因为没超过3000ms也会参与计算,所以结果为(key,49000)。
    (3)第三个5s窗口同理可算出结果为(key,10000)。

代码

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import scala.Tuple2;

import java.util.concurrent.TimeUnit;

/**
 * Created By qiuzhi. Description: Date: 2020-07-09 Time: 4:29 PM
 *
 * @author zhanghaichao
 * @date 2020/07/09
 */
class SourceOperator implements SourceFunction<Tuple2<String, Long>> {
    @Override
    public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
        ctx.collect(new Tuple2<>("key", 0L));
        ctx.collect(new Tuple2<>("key", 1000L));
        ctx.collect(new Tuple2<>("key", 2000L));
        ctx.collect(new Tuple2<>("key", 3000L));
        ctx.collect(new Tuple2<>("key", 3000L));
        ctx.collect(new Tuple2<>("key", 4000L));
        ctx.collect(new Tuple2<>("key", 5000L));
        // out of ornew Tuple2<>(der);
        ctx.collect(new Tuple2<>("key", 4000L));
        ctx.collect(new Tuple2<>("key", 6000L));
        ctx.collect(new Tuple2<>("key", 6000L));
        ctx.collect(new Tuple2<>("key", 7000L));
        ctx.collect(new Tuple2<>("key", 8000L));
        ctx.collect(new Tuple2<>("key", 10000L));
        // out of ornew Tuple2<>(der);
        ctx.collect(new Tuple2<>("key", 8000L));
        ctx.collect(new Tuple2<>("key", 9000L));
    }

    @Override
    public void cancel() {

    }
}

public class OutOfOrderCase {
    public static void main(String[] args) throws Exception {

        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.addSource(new SourceOperator())
          // 每个元素都有机会生成watermark
        .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {
          
           // 设置允许迟到的时间(ms)
            private int outOfOrder = 0;
            // (key,13000)
            // (key,32000)
            // (key,10000)
            //private int outOfOrder = 3000;
            // (key,17000)
            // (key,49000)
            // (key,10000)

            @Override
            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                return stringLongTuple2._2;
            }

            @Override
            public Watermark checkAndGetNextWatermark(Tuple2<String, Long> stringLongTuple2, long l) {
                return new Watermark(stringLongTuple2._2 - outOfOrder);
            }


        }).keyBy(new KeySelector<Tuple2<String,Long>, String>() {
            @Override
            public String getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {
                return stringLongTuple2._1;
            }
        })
            .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
            .reduce((ReduceFunction<Tuple2<String, Long>>)(stringLongTuple2, t1) -> new Tuple2<>(stringLongTuple2._1,stringLongTuple2._2 + t1._2))
            .print();

        env.execute("Out of order");
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容