Flink 基础 - 时间语义&Watermark

Flink 中的时间语义

image.png
  • Event Time : 事件创建的时间
  • Ingestion Time: 数据进入 Flink 的时间
  • Processing Time: 执行操作算子的本地系统时间,与机器相关
    例如在业务库,生成了一条数据,这个是 Event Time, 数据通过 Kafka 传输到 Flink 集群Ingestion Time ,当数据真正被算子处理时,这个是Processing Time

设置 Event Time

在数据处理中,会有乱序情况,这个乱序都是依据的事件时间判定的,在flink中,默认是处理时间,我们可以手动设置为 事件时间 ,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 默认是以处理时间语义,这里设置为使用事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

通过这一行代码,可以修改程序中的默认的时间语义,但是使用Event time ,我们需要告诉程序,以哪个字段为准,作为事件时间的判断依据,后续将与 watemark 连用

基于 dataStream 使用watermark

package com.lxs.flink.realtime.watemark;

import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

/**
 * User: lixinsong
 * Date: 2021/1/19
 * Description:
 */

public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Long>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).map(new MapFunction<String, Tuple2<String, Long >>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] arr = s.split(",");
                return Tuple2.of(arr[0], Long.parseLong(arr[1]));
            }
        });

        // 升序数据设置时间事件和 watermark,不需要设置等待时间
//        DataStream<Tuple2<String, Long>> watermark1 = dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
//            @Override
//            public long extractAscendingTimestamp(Tuple2<String, Long> element) {
//                return element.f1;
//            }
//        });

        // 乱序数据设置时间戳和watermark , BoundedOutOfOrdernessTimestampExtractor 有界乱序情况下的时间戳提取器,设置延迟时间(也是最大乱序程度)为5秒
        DataStream<Tuple2<String, Long>> watermark2 = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element) {
                // 提取时间戳,必须是毫秒时间戳
                return element.f1;
            }
        });
        watermark2.print("watermark test");
        env.execute("watermark test");

    }
}

watermark可以直接使用在datastream后面,这样使用的时候,可以自己设定事件时间,但是设置了watermark ,其实对于程序本身处理迟到数据而言而言,意义并不大,还是在与窗口连用的时候,才能体现出作用

Watermark 类别

周期性生成 (AssignerWithPeriodicWatermarks)

周期性生成 watermark 例如上述的 AscendingTimestampExtractor, 源码如下

public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Returns the current watermark. This method is periodically called by the
     * system to retrieve the current watermark. The method may return {@code null} to
     * indicate that no new Watermark is available.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If the current watermark is still
     * identical to the previous one, no progress in event time has happened since
     * the previous call to this method. If a null value is returned, or the timestamp
     * of the returned watermark is smaller than that of the last emitted one, then no
     * new watermark will be generated.
     *
     * <p>The interval in which this method is called and Watermarks are generated
     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     * @see ExecutionConfig#getAutoWatermarkInterval()
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark getCurrentWatermark();
}

周期性生成即,每隔一段时间,生成一个 watermark

来一条数据,生成一个 watermark (AssignerWithPunctuatedWatermarks)

源码如下

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Asks this implementation if it wants to emit a watermark. This method is called right after
     * the {@link #extractTimestamp(Object, long)} method.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If a null value is returned, or the timestamp of the returned
     * watermark is smaller than that of the last emitted one, then no new watermark will
     * be generated.
     *
     * <p>For an example how to use this method, see the documentation of
     * {@link AssignerWithPunctuatedWatermarks this class}.
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容