Flink 中的时间语义

image.png
- Event Time : 事件创建的时间
- Ingestion Time: 数据进入 Flink 的时间
- Processing Time: 执行操作算子的本地系统时间,与机器相关
例如在业务库,生成了一条数据,这个是 Event Time, 数据通过 Kafka 传输到 Flink 集群Ingestion Time ,当数据真正被算子处理时,这个是Processing Time
设置 Event Time
在数据处理中,会有乱序情况,这个乱序都是依据的事件时间判定的,在flink中,默认是处理时间,我们可以手动设置为 事件时间 ,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 默认是以处理时间语义,这里设置为使用事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
通过这一行代码,可以修改程序中的默认的时间语义,但是使用Event time ,我们需要告诉程序,以哪个字段为准,作为事件时间的判断依据,后续将与 watemark 连用
基于 dataStream 使用watermark
package com.lxs.flink.realtime.watemark;
import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
/**
* User: lixinsong
* Date: 2021/1/19
* Description:
*/
public class WatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Long>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).map(new MapFunction<String, Tuple2<String, Long >>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(",");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
});
// 升序数据设置时间事件和 watermark,不需要设置等待时间
// DataStream<Tuple2<String, Long>> watermark1 = dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
// @Override
// public long extractAscendingTimestamp(Tuple2<String, Long> element) {
// return element.f1;
// }
// });
// 乱序数据设置时间戳和watermark , BoundedOutOfOrdernessTimestampExtractor 有界乱序情况下的时间戳提取器,设置延迟时间(也是最大乱序程度)为5秒
DataStream<Tuple2<String, Long>> watermark2 = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(5)) {
@Override
public long extractTimestamp(Tuple2<String, Long> element) {
// 提取时间戳,必须是毫秒时间戳
return element.f1;
}
});
watermark2.print("watermark test");
env.execute("watermark test");
}
}
watermark可以直接使用在datastream后面,这样使用的时候,可以自己设定事件时间,但是设置了watermark ,其实对于程序本身处理迟到数据而言而言,意义并不大,还是在与窗口连用的时候,才能体现出作用
Watermark 类别
周期性生成 (AssignerWithPeriodicWatermarks)
周期性生成 watermark 例如上述的 AscendingTimestampExtractor, 源码如下
public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
/**
* Returns the current watermark. This method is periodically called by the
* system to retrieve the current watermark. The method may return {@code null} to
* indicate that no new Watermark is available.
*
* <p>The returned watermark will be emitted only if it is non-null and its timestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If the current watermark is still
* identical to the previous one, no progress in event time has happened since
* the previous call to this method. If a null value is returned, or the timestamp
* of the returned watermark is smaller than that of the last emitted one, then no
* new watermark will be generated.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
* @see ExecutionConfig#getAutoWatermarkInterval()
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark getCurrentWatermark();
}
周期性生成即,每隔一段时间,生成一个 watermark
来一条数据,生成一个 watermark (AssignerWithPunctuatedWatermarks)
源码如下
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
/**
* Asks this implementation if it wants to emit a watermark. This method is called right after
* the {@link #extractTimestamp(Object, long)} method.
*
* <p>The returned watermark will be emitted only if it is non-null and its timestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If a null value is returned, or the timestamp of the returned
* watermark is smaller than that of the last emitted one, then no new watermark will
* be generated.
*
* <p>For an example how to use this method, see the documentation of
* {@link AssignerWithPunctuatedWatermarks this class}.
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}