Flink 使用之WatermarkStrategy(含源码分析)

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

概括来说,watermark用于基于event time的流计算系统数据流可能发生乱序的情况。对于event time数据流,接收到数据的时间和上游产生数据的时间是不相关的,因此可能会出现产生时间较早的数据由于网络抖动等原因到达Flink系统较晚的情形。Watermark用于应对数据乱序的情况。Watermark是数据流中的一种特殊数据,由Flink内部周期(可自定义)产生。下游接收到watermark的时候,会认为timestamp在watermark之前的数据已经到齐。针对这些数据的运算过程可以开始。Watermark之后的数据没有到齐,需要在缓存的同时,等待后续数据的到来。Watermark的生成策略可以实现数据乱序的兼容。例如将watermark发送的时间设置为当前接收到的数据的最大timestamp(记为t)减去5s。这样下游认为t-5s之前的数据已经到齐。t-5s之后的数据先缓存起来等待。从而实现容忍“乱序的程度”不超过5s的情形。

更为详细的Flink watermark讲解参见以下文章:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/

Flink 源码之时间处理

接下来讲解如何配置使用watermark生成策略(WatermarkStrategy)。

配置watermark自动发送周期

Flink默认的watermark自动发送周期为200ms。Flink支持全局方式和局部方式配置自动发送周期。

全局配置方式:修改flink-conf.yaml文件,增加或修改
pipeline.auto-watermark-interval(对应PipelineOptions.AUTO_WATERMARK_INTERVAL)配置项。

作业局部修改方式:调用ExecutionConfig.setAutoWatermarkInterval(...)方法。

env.getConfig().setAutoWatermarkInterval(100);

WatermarkStrategy使用

forMonotonousTimestamps

生成单调递增的watermark。数据流元素到来的时候不发送watermark,仅在到达自动发送周期的时候才发送。

这里递增的意思并非仅仅是watermark时间戳数值的严格递增,每次发送的watermark都是最近接收到的元素携带的timestamp。(从元素提取出携带的timestamp过程由TimestampAssigner负责,后面分析)。一种例外情况是如果遇到迟到的数据(watermark比前一个元素小),这个元素的watermark会被MonotonousTimestamps排除不做记录,可以保证向下游发送的watermark是递增的。

// 使用env创建数据源
source = ...

source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素类型>forMonotonousTimestamps();

forBoundedOutOfOrderness

上面的单调递增方式无法解决元素乱序的问题。这里的BoundedOutOfOrderness是专门为数据存在乱序这种场景考虑的。使用时候需要指定一个参数,即最大可容忍的数据迟到时间。如果乱序数据迟到超过这个时间限制,该数据将被忽略。当然还可以配置为旁路输出,参见Flink 使用之数据分流

BoundedOutOfOrderness实现并不复杂,基本和上面单调递增的方式一致。区别是在周期发送watermark的时候,发送的watermark需要减去最大可容忍的数据迟到时间。从而实现了数据计算的触发时刻向后拖延,在拖延的时间段内“等待”乱序数据到来。

使用方法如下:

source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素类型>forBoundedOutOfOrderness(Duration.ofSeconds(30));

forGenerator

前面两种watermark generator能够满足绝大多数使用场景。如果仍不能满足要求,Flink提供了创建自定义watermark generator的方式。

这里以Integer类型的数据源为例,说明自定义generator的写法。

.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier<Integer>() {
    @Override
    public WatermarkGenerator<Integer> createWatermarkGenerator(Context context) {
        return new WatermarkGenerator<Integer>() {
            @Override
            public void onEvent(Integer integer, long eventTimestamp, WatermarkOutput watermarkOutput) {
                
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

            }
        };
    }
}));

在自定义WatermarkGenerator的时候按需实现两个方法:

  • onEvent:接收到元素的时候触发。参数分别为输入的元素,元素提取出来的timestamp(event time)和控制输出watermark的对象(后面解释)。
  • onPeriodicEmit:到达自动发送watermark周期的时候触发。参数只有一个,和前面的相同。

WatermarkOutput用来像下游发送watermark,或者控制数据输出。它有三个方法:

  • emitWatermark: 发送watermark到下游。
  • markIdle: 标记output为空闲状态。
  • markActive: 标记output为活动状态。如果output在空闲状态发送了watermark,也会自动标记为活动状态。

Flink中一个计算步骤可能有多个上游(双数据流或更多),计算步骤会考虑到所有上游的watermark。设想如果一个流一直不产生watermark,需要等待这个流的数据呢还是这个流目前就没有数据可以忽略?Flink不好判断。为了解决这个问题引入了idle(空闲)机制。如果一个数据源标记了空闲状态,下游计算的时候不会考虑这个数据源的watermark。未能正确处理数据源的idle状态会导致Flink整个计算过程的阻塞。务必要注意这一点。

noWatermarks

不生成任何watermark。

source.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());

withTimestampAssigner

该方法用来配置如何从元素中抽取出watermark。例如到来的数据包含数据生成时的timestamp,格式为Tuple2<String, Long>类型,值为("Hello", 1690437024)。我们可以获取元素的第二个字段作为Flink内部的timestamp使用。

withTimestampAssigner使用方法如下所示:

source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素类型>forMonotonousTimestamps().withTimestampAssigner((element, timestamp) -> {
    // element: 到来的元素
    // timestamp:上游为元素指定的timestamp,通常为数据源产生的timestamp
    // 需要编写自己的抽取逻辑
    // 返回抽取出的timestamp
}));

withIdleness

上面forGenerator章节提到了idle问题。大家可能会问:有没有一种常见的策略可以自动标记idle状态?比如数据流持续一段时间没有数据到来的时候自动标记为idle状态。withIdleness正好是这种策略。它对watermark generator做了包装。用户在其中不需要再去编写何时标记idle的逻辑。

withIdleness的用法如下所示:

.assignTimestampsAndWatermarks(WatermarkStrategy.<元素类型>forBoundedOutOfOrderness(Duration.ofSeconds(30)).withIdleness(Duration.ofSeconds(5)));

WatermarkStrategy源代码分析

forBoundedOutOfOrderness

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
    return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}

上面创建了BoundedOutOfOrdernessWatermarks。继续查看它的代码。分析内容在注释中。

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    // 暂存最大的timestamp
    // 发送的timestamp一定是递增(或者大小不变)的
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    // 最大可容忍的数据迟到的时间范围(乱序程度)
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 接收到元素的时候,更新maxTimestamp
        // 如果接收到迟到的元素(eventTimestamp比maxTimestamp小),忽略不更新,确保timestamp递增
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 在发送watermark时间周期到来的时候,发送watermark
        // 发送的watermark需要减去outOfOrdernessMillis
        // 含义是让下游认为maxTimestamp - outOfOrdernessMillis - 1之前的数据已经到齐
        // 只有认为到齐的数据参会参与计算,未到齐的数据会缓存等待
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

forMonotonousTimestamps

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
    return (ctx) -> new AscendingTimestampsWatermarks<>();
}

这里创建了AscendingTimestampsWatermarks。它继承了BoundedOutOfOrdernessWatermarks,代码如下所示。

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

    /** Creates a new watermark generator with for ascending timestamps. */
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}

看到构造方法很容易明白,AscendingTimestampsWatermarks是一种不容忍任何数据迟到的BoundedOutOfOrdernessWatermarks

withIdleness

default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
    checkNotNull(idleTimeout, "idleTimeout");
    checkArgument(
            !(idleTimeout.isZero() || idleTimeout.isNegative()),
            "idleTimeout must be greater than zero");
    return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}

继续查看WatermarkStrategyWithIdlenesscreateWatermarkGenerator方法:

@Override
public WatermarkGenerator<T> createWatermarkGenerator(
        WatermarkGeneratorSupplier.Context context) {
    return new WatermarksWithIdleness<>(
            baseStrategy.createWatermarkGenerator(context), idlenessTimeout);
}

创建出的watermark generator为WatermarksWithIdleness。该类使用了装饰器模式。在不改变原有watermark generator的基础之上增加了标记idle的能力。它有三个成员变量。

// 包装的watermark生成器
private final WatermarkGenerator<T> watermarks;
// idle定时器,用来判断是否idle
private final IdlenessTimer idlenessTimer;
// 状态标记,目前是否处于idle状态
private boolean isIdleNow = false;

继续分析onEventonPeriodicEmit方法:

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
    // 调用被包装watermark generator的onEvent方法
    watermarks.onEvent(event, eventTimestamp, output);
    // 告知idlenessTimer有活动发生
    idlenessTimer.activity();
    // 标记空闲状态为false
    isIdleNow = false;
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
    if (idlenessTimer.checkIfIdle()) {
        // 检查空闲状态,如果为空闲
        if (!isIdleNow) {
            // 如果当前状态不是空闲
            // 说明刚从活动状态变为空闲状态
            // 标记为idle状态
            output.markIdle();
            // 记录空闲状态为true
            isIdleNow = true;
        }
    } else {
        // 如果不是idle状态,调用包装的watermark generator的onPeriodicEmit方法
        watermarks.onPeriodicEmit(output);
    }
}

最后的问题就是IdlenessTimer是怎么判断是否idle的。我们继续分析它的构造函数,activity方法和checkIfIdle方法。

IdlenessTimer(Clock clock, Duration idleTimeout) {
    // 获取时钟
    this.clock = clock;

    long idleNanos;
    // 将idle超时时间转换为纳秒保存
    try {
        idleNanos = idleTimeout.toNanos();
    } catch (ArithmeticException ignored) {
        // long integer overflow
        idleNanos = Long.MAX_VALUE;
    }

    this.maxIdleTimeNanos = idleNanos;
}

public void activity() {
    // 内部有个计数器,只要有活动,该计数器自增1
    // counter是long类型,即便是自增溢出了,也不会影响
    counter++;
}

public boolean checkIfIdle() {
    if (counter != lastCounter) {
        // lastCounter为最近一次counter计数
        // 如果不等,说明期间有活动
        // 这里不写if (counter > lastCounter)的原因是兼容counter溢出的情况
        // activity since the last check. we reset the timer
        // 更新lastCounter,重设计时器
        lastCounter = counter;
        startOfInactivityNanos = 0L;
        return false;
    } else // timer started but has not yet reached idle timeout
    if (startOfInactivityNanos == 0L) {
        // first time that we see no activity since the last periodic probe
        // begin the timer
        // 首次发现counter没有更新,即没有活动,启用计时器
        startOfInactivityNanos = clock.relativeTimeNanos();
        return false;
    } else {
        // 如果当前时间和计时器时间差超过maxIdleTimeNanos,说明处于空闲状态
        return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;
    }
}
    

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容