Generating Timestamps / Watermarks

原文链接

本节与在事件时间上运行的程序相关。有关事件时间处理时间摄入时间的介绍,请参见事件时间简介

为了处理事件时间,流程序需要设置相应的时间特性

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

分配时间戳

为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配它的事件时间戳。这通常是通过访问/提取元素中的某个字段的时间戳来完成的。

时间戳的分配与生成的水印是同步的,它告诉系统在事件时间内的进展。

有两种分配时间戳和生成水印的方法:

  • 直接在数据流源中。
  • 通过timestamp assigner / watermark generator:在Flink时间戳生成器中,也定义了要发出的水印。

注意:时间戳和水印都被指定为自1970-01- 01T00:00 - 00Z的Java时代以来的毫秒数。

具有时间戳和水印的原函数

流Source可以为它们生成的元素直接分配时间戳,并且发送水印。当这样做时,不需要时间戳生成器。注意,如果使用了时间戳生成器,由Source生成的时间戳和水印都会被覆盖。

要直接为源中的元素分配时间戳,Source必须使用SourceContext的collectWithTimestamp(...) 方法。为了生成水印,Source必须使用emitWatermark(Watermark)方法。

@Override
public void run(SourceContext<MyType> ctx) throws Exception {
    while (/* condition */) {
        MyType next = getNext();
        ctx.collectWithTimestamp(next, next.getEventTimestamp());

        if (next.hasWatermarkTime()) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
        }
    }
}

Timestamp Assigners / Watermark Generators

时间戳分配器作用在流上,并且生成一条具有时间戳的元素和水印的新流。如果原始流已经有时间戳和/或水印,时间戳分配器会覆盖它们。

时间戳分配器通常在数据源之后立即指定,但它并不是严格要求这样做的。例如,通用的模式是在解析(MapFunction) 和过滤(FilterFunction) 在时间戳分配器之前。在任何情况下,时间戳分配器都需要在基于事件时间的第一个操作(例如第一个窗口操作)之前指定。作为一种特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在Source(或消费者)内部指定一个时间戳分配器/水印发射器。关于如何这样做的更多信息可以在Kafka连接器文档中找到。

注意:本节的其余部分介绍了一个程序员创建她自己的时间戳分配器/水印发射器必须实现的主要接口。若要查看使用Flink预定义的提取器,请参阅Pre-defined Timestamp Extractors / Watermark Emitters页面。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

周期性水印

AssignerWithPeriodicWatermarks分配时间戳并且周期性的生成水印(可能取决于流元素,或者纯粹基于处理时间)。

通过ExecutionConfig.setAutoWatermarkInterval(…)定义水印生成的间隔(每n毫秒)。每次都会调用assigner 的getCurrentWatermark()方法,如果水印不为空,且大于之前的水印,则会发出一个新的水印。

下面是两个具有周期性水印的时间戳分配器的简单示例。

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
}

With Punctuated Watermarks

当使用一个特定的事件指出一个新的水印需要生成,使用AssignerWithPunctuatedWatermarks。在这种情况下,Flink首先会调用extractTimestamp(…)方法来分配元素一个时间戳,然后立即在该元素上执行checkAndGetNextWatermark(…)方法。

checkAndGetNextWatermark(…)方法通过在extractTimestamp(…)方法中分配的时间戳,并可以决定是否要生成一个水印。每当checkAndGetNextWatermark(…)方法返回非空的水印,并且水印大于最新的水印时,就会发出新的水印。

public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}

注意:在每一个事件上都有可能产生水印。然而,由于每个水印都导致了一些下游的计算,过多的水印会降低性能。

Timestamps per Kafka Partition

当使用Apache Kafka作为数据源时,每个Kafka分区可能都有一个简单的事件时间模式(递增的时间戳或无序有界)。然而,当从Kafka消费流时,多个分区经常被并行地消费,会交错的从分区获取事件并且破坏预定义的模式(这是Kafka的消费者客户端工作的固有特性)。

在这种情况下,你可以使用Flink的Kafka-partition-aware水印生成。使用这个特性,水印在Kafka的消费者每个Kafka分区内部生成,并且每个分区的水印以与在流上合并水印相同的方式合并。

例如,如果时间时间戳严格按照Kafka分区递增,使用递增的时间戳水印生成器生成每个分区的水印会生成完美的整体水印。

下图展示了如何使用每个Kafka分区水印生成,以及这种情况下水印如何在流数据流中传递。

FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容