Event Time / Processing Time / Ingestion Time
Flink在流程序中支持不同概念的时间。
Processing Time: Processing Time是指执行程序时对应的物理机额系统时间。
当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。例如:一个每小时处理的时间窗口将包括所有在系统指定的一个小时内到达指定操作的所有记录。
处理时间是最简单的时间概念,不需要流和物理机之间的协调,它有最好的性能和最低的延迟。然而,在分布式或者异步环境中,因为受到记录到达系统时间的影响,处理时间不能够决定系统内操作之间记录流的速度。
Event time: Event Time是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink之前就已经嵌入到事件中了,并且事件的时间戳是可以从记录中抽取出来的。一个按小时处理的事件时间窗口将包含携带事件时间在当前小时内的所有事件,而不考虑记录什么时候到达,以什么样的顺序到达。
事件时间可以通过备份或者持久化日志获取无序数据、延迟事件或者重试数据的正确结果。在事件时间中,时间进度依赖于数据而不是其他形式的时钟。事件时间程序必须要指定如何产生事件时间水印(Event Time Watermarks
),这是事件时间处理进度的信号机制,这个机制在下面描述。
事件时间处理常常会引起一个特定的延迟,因为它需要等待一个特定时间的延迟事件和无序事件。因此,事件时间程序也常常跟处理时间操作一起使用。
Ingestion time:摄入时间(Ingestion Time
)是事件进入Flink的时间,在源操作中每个记录都会获得源的当前时间作为时间戳,后续基于时间的操作(如: time window)
会依赖这个时间戳
摄入时间从概念上来讲是处在事件时间和处理时间之间,与处理时间相比,成本可能会高一点,但是会提供更加可预测的结果。因为摄入时间使用的是固定的时间戳(都是在源处指定的),记录中的不同窗口操作依赖同一个时间戳,而在处理时间中每个窗口操作可能将记录赋给不同的窗口(根据本地的系统时钟和传输时延)。
与事件时间相比,摄入时间程序不能处理任何无序事件或者延迟事件,但是程序无需指定如何产生水印。
在内部摄入时间和事件时间相似,但是是自动分配时间戳,自动产生水印。
设置时间特性(Setting a Time Characteristic)
Flink DataStream
程序的第一部分常常是设置时间特性,这个设置定义了数据流源的行为(例如: 它们是否分配时间戳) 以及window
操作需要用哪个时间概念,如:KeyedStream.timeWindow(Time.seconds(30))
。
下面的例子中展示了一个在小时级时间窗口中聚合事件的Flink程序,window的行为与时间特性相适应。
Java 代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
Scala 代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)
注意:为了在事件时间中运行这个例子,程序需要使用已经定义了事件时间和自己能发出水印的源数据,或者程序中必须在源后面添加一个Timestamp Assigner
和Watermark Generator
。这些函数描述了如何获取事件的时间戳以及哪些程度的无序事件流需要展示。
下面部分描述了timestamp
和watermark
背后的基本原理,至于如何在Flink DataStream API*中使用timestamp
分配和watermark
生成,请参考:(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html)。
事件时间和水印 (Event Time and Watermarks)
注意: Flink实现了很多 DataFlow Model中的技术,想要获得更多关于event time
和watermark
的介绍,请查看下面的文章:
Streaming 101 by Tyler Akidau
The Dataflow Model paper
一个支持事件时间的流处理器需要一种方式来衡量事件时间的进度,例如:一个小时级的窗口操作在事件时间达到小时的末尾之后需要被通知,以便操作能够在进程中关闭窗口。
Event Time可以独立于处理时间来进行,例如:程序中一个操作的当前事件时间可能会稍微晚于处理中的时间,而它们是以相同的速度进行的;另一方面,一个流程序可能处理一周的事件时间,通过快速转发缓存在Kafka topic
中的数据,仅用几秒的时间来处理。
Flink衡量event time 进度的机制是watermark,watermark流作为数据流的一部分,携带了一个时间戳t。一个Watermark(t)声明了数据流中的事件时间已经到t了,也就是说数据流中不应该再有时间戳t'<=t的元素了。
下图中展示了一个带时间戳的事件流,并且内联一个水印。在这个例子中,事件是有序的,也就是说水印仅是在流中做简单标记而已。
watermark对于无序流来说是非常重要的,无序数据流如下所述,事件并未按照他们的时间戳排序。通常,一个watermark是在数据流中一个点的一个声明,所有事件到一个特定的时间戳之后都必须到达。
watermark对于无序流来说是非常重要的,无序数据流如下所述,事件并未按照他们的时间戳排序。通常,一个watermark是在数据流中一个点的一个声明,所有事件到一个特定的时间戳之后都必须到达。一旦watermark到达一个操作,这个操作就可以将事件时间的时钟提前到watermark值的前面。
并行流中的水印(Watermarks in Parallel Streams)
watermark通过源函数直接产生或者在源函数之后产生,每个原函数的并发子任务通常独立地生成自己的watermark。这些水印在特定并行源中定义了的事件时间。
因为watermark流贯穿整个流程序,它们将它们抵达的操作的事件时间提前。无论一个操作何时提前它的事件时间,它都会为下游的后继操作生成一个新的watermark。
有些操作消费多个输入流,如:union
或者后面跟着keyBy(...)
或者partition(...)
函数的操作。这些操作当前的事件时间是输入流中最小的事件时间。由于输入流会更新他们的事件时间,因此操作也一样。
下图中展示了并行数据流中的事件和水印,以及追踪事件时间的操作。
延迟元素(Late Elements)
可能某些特定的元素会违背水印的条件,也就是说即使是Watermark(t)已经发生了,但是还会有许多时间戳t'<=t的事件发生。事实上,在真实的设置中,某些元素可以任意延迟,因此指定一个时间,在这个时间内所有在一个特定事件时间戳的事件都会发生是不可能的。此外,即使延迟是有界的,延迟太多的水印也是不可取的,因为会在事件时间窗口的评价中导致过多的延迟。
基于这个原因,流程序可以明确地指定一些延迟元素,延迟元素是指抵达的系统事件时间已经过了延迟元素时间戳的元素。请参考允许时延(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#allowed-lateness )来获取更多关于如何在事件时间窗口中处理延迟元素的更多信息。