Flink Time 详解
概述:
对于流式数据处理,最大的特点是数据上具有时间的属性特征,Flink根据时间产生的位置不同,将时间区分为三种时间语义
分别为事件生成时间(Event Time)、事件接入时间(Ingestion Time)和事件处理时间(Processing Time)
1. Event Time: 事件产生的时间,它通常有事件中的时间戳描述
2. Ingestion Time: 事件进入Flink的时间
3. Processing Time : 事件被处理时当前系统的时间.
1. 时间语义Time
概述:
数据从终端产生,或者从系统中产生的过程中生成的时间为事件生成时间,当时间经过消息中间件传入到Flink系统中,在DataSource中接入
的时候会生成事件接入时间,当数据在Flink系统中通过各个算子实例执行转换操作的过程中,算子实例所在系统的时间为数据处理时间,
1. 设置时间语义:
概述:
在Flink中默认情况下使用的是Process Time时间语义,如果用户选择使用Event Time或者Ingestion Time 语义, 则需要在创建的
StreamExecutionEnvironment 中调用setStreamTimeCharacteristic()方法设定系统的时间概念,
如下代码使用TimeCharacteristic.EventTime作为系统的时间语义:
code:
//设置使用EventTime
env.setStreamTImeCharacteristic(TimeCharacteristic.EventTime)
// 使用IngestionTime
env.setStreamTImeCharacteristic(TimeCharacteristic.IngestionTime)
2. WaterMark水位线
概述:
在使用EventTIme处理Stream数据的时候会遇到数据乱序的问题,流处理从Event(事件)产生,流经Source,再到Operator,这中间需要一定
的时间,虽然大部分情况下,传输到Operator的数据都是按照事件产生的时间顺序来的,但是也不能排除由于网络延迟等原因而导致乱序的
产生,特别是使用Kafka的时候,多个分区之间的数据无法保证有序,因此,在进行Window计算的时候,不能无限期地等下去,必须要有个机制
来保证在特定的时间后,必须要触发Window进行计算,WaterMark是用于处理乱序事件的.
1. Watermark原理
概述:
在Filnk的窗口处理过程中,如果确定全部数据到达,就可以对Window的所有数据做窗口计算操作(汇总,分组),如果数据没有全部到达,则
继续等待该窗口中的数据全部到达才开始处理,这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据达到
的完整性),保证事件数据(全部)达到Flink系统,或者在乱序以及延迟到达时,也能够想预期一样计算出正确并且连续的结果,
当任何Event进入到FLink系统时,会根据当前最大事件时间产生Watermarks时间戳(t)
Flink是怎么计算Watermark的值呢?
概述:
Watermark = 进入Flink的最大的事件时间(MaxEventTime)-指定的延迟时间(t)
那么有Watermark的Window是怎么处罚窗口函数的呢?
概述:
如果有窗口的停止时间等于或者小于maxEventTIme -t(当时的Watermark),那么这个窗口被触发执行.
注意:
Watermark本质可以理解成一个延迟触发机制.
Watermark的使用存在三种情况:
1. 本来有序的Stream中的Watermark
概述:
如果数据元素的事件时间是有序的,Watermark时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和
事件时间保持一致(因为既然是有序的时间,就需要设置延迟了,那么t就是0,所以watermark= maxTime-0 = maxTime),也
就是理想状态下的水位线,
当Watermark大于Windows结束时间就会触发对Windows的数据计算,以此类推
下一个Window也是一样
2. 乱序事件中的Watermark
概述:
现实情况下数据元素往往并不是按照其产生顺序接入到Flink系统中进行处理,而频繁出现乱序或迟到的情况,这种
情况就需要使用Watermark来应对.
3. 并行数据流中的Watermark
概述:
在多并行度的情况下,Watermark会有一个对齐机制,这个对齐机制会取所有Channel中最小的Watermark.
引入Wateramark和EventTime
1. 有序数据流中引入Watermark和EventTime
对于有序的数据,代码比较简洁,主要需要从源Event中抽取EventTime.
code:
// 读取Socket数据
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.socketTextStream("localhost", 9999)
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
// 根据EventTime有序的数据流
data.assignAscendingTimestamps(_.callTime)
// StationLog对象中抽取EventTime就是callTime属性
2. 乱序数据流中引入Watermark和EventTime
对于乱序数据流,有两种常见的引入方法:周期性和间断性.
2.1 With Periodic(周期性的)Watermark
周期性地生成Watermark的生成,默认是100ms,每隔N毫秒自动向流里注入一个Watermark,时间间隔由streamEnv.getConfig.
setAutoWatermarkInterval()决定.最简单写入
// 如果EventTime是乱序的,需要考虑一个延迟时间t
// 当前代码设置的延迟时间为3s
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(3)) {
override def extractTimestamp(t: StationLog): Long = {
t.callTime// 设置EventTIme
}
})
2.2 With Punctuated(间断性的)Watermark
间断性的生成Watermark一般都是基于某些事件触发Watermark的生成和发送,比如:在我们的基站数据中,有一个基站的CallTime
总是没有按照顺序传入,其他基站的时间都是正常的,那我们需要对这个基站来专门生成Watermark
// 1. 只有satation_1的Event是无序的,所以只需要针对Station_1做处理
// 当前代码设置station_1基站的延迟处理时间为3s
data.assignTimestampsAndWatermarks(new MyCustomerPunctuatedWatermarks(3000L)) //自定义延迟
class MyCustomerPunctuatedWatermarks(delary: Long) extends
AssignerWithPunctuatedWatermarks[StationLog] {
var maxTime: long = 0
override def checkAndGetNextWatermark(t: StationLog, l: Long): Watermark = {
if (t.sid.equals("station_1")) {
///当基站ID为station_1才生成水位线
maxTime = math.max(maxTime, l)
new Watermark(maxTime)
} else {
return null //
}
}
override def extractTimestamp(t: StationLog, l: Long): Long = {
// 抽取EventTime的值
t.callTime
}
}
Watermark 案例
概述:
每隔5s中统计一下最近10s内每个基站中通话时间最长的一次通话发生的
呼叫时间、主叫号码、被叫号码,通话时长. 并且还得告诉我是那个时间范围(10s)
code:
package FlinkDemo.time
import java.text.SimpleDateFormat
import FlinkDemo.functions.FunctionClassTransformation.StationLog
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* 每隔5s中统计一下最近10s内每个基站中通话时间最长的一次通话发生的
* 呼叫时间、主叫号码、被叫号码,通话时长
* 并且还得告诉我是那个时间范围(10s)
*/
object Watermark_demo {
def main(args: Array[String]): Unit = {
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置并行度1
env.setParallelism(1)
// 导入隐式转换
import org.apache.flink.streaming.api.scala._
val data = env.socketTextStream("localhost", 9999)
// 分配事件时间提取器,
.assignTimestampsAndWatermarks(new TimestampExtractor1())
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
// 引入Watermark
.keyBy(_.sid)
// 设置滑动窗口大小为10s,滚动时间为5s
.timeWindow(Time.seconds(10), Time.seconds(5))
// reduce 函数做增量聚合,MaxTimeAggregate能做到来一条数据处理一条,
// ReturnMaxTime 在窗口触发的时候调用.
// reduce( preAggregator: (T, T) => T,
// function: ProcessWindowFunction[T, R, K, W])
.reduce(new MaxTimeReduce, new ReturnMaxTime)
.print()
env.execute()
}
class MaxTimeReduce extends ReduceFunction[StationLog] {
// 每个基站中传入的数据
override def reduce(value1: StationLog, value2: StationLog): StationLog = {
if (value1.duration > value2.duration) value1 else value2
}
}
class ReturnMaxTime extends WindowFunction[StationLog, String, String, TimeWindow] {
// 获取时间范围
override def apply(key: String, window: TimeWindow, input: Iterable[StationLog], out: Collector[String]): Unit = {
val sb = new StringBuilder
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
sb.append(s"时间范围: Start: ${format.format(window.getStart)},${format.format(window.getEnd)}")
sb.append("\n")
sb.append(s"接入时间: ${format.format(input.iterator.next.callTime)}")
sb.append("\n")
sb.append("通话日志: ").append(input.iterator.next())
out.collect(sb.toString())
}
}
}
提取器:
package FlinkDemo.time
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import java.text.SimpleDateFormat
class TimestampExtractor1 extends AssignerWithPeriodicWatermarks[String] with
Serializable {
var currentTimestamp: Long = 0L
//设置最大允许的乱序事件是5s
val maxDelayTime = 5000L
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
var watermark: Watermark = null
override def getCurrentWatermark: Watermark = {
watermark =
new Watermark(currentTimestamp - maxDelayTime)
//println("new watermark: " + format.format(watermark.getTimestamp) + " ")
watermark
}
override def extractTimestamp(t: String, l: Long): Long = {
val timeStamp = t.split(",")(4).toLong
currentTimestamp = Math.max(timeStamp, currentTimestamp)
println("timestamp: " + format.format(timeStamp) + "|" + format.format(currentTimestamp) + "|" + format.format(getCurrentWatermark.getTimestamp))
timeStamp
}
}
Watermark demo2
code:
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/*
*想使用WaterMark,需要3个步骤
* 1. 对数据进行timestamp提取,即调用assignTimestampsAndWaterMarks函数
* 实例化BoundedOutOfOrdernessTimestampExtractor,重写extractTimestamp方法
* 2. 是指使用事件时间,因为WaterMark是基于事件时间
* 3. 定义时间窗口: 翻滚窗口(TumblingEventWindows)、滑动窗口(timeWindow)
* 任意要给没有实现都会报异常
*/
object EtDemo {
/**/
def main(args: Array[String]): Unit = {
//获取flink实时流处理的环境
val senv = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
// 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 导入隐式转换
//便于测试,并行度设置为1
senv.setParallelism(1)
// 2. 创建数据源
// 3. 绑定数据源
val text = senv.socketTextStream("localhost", 9999)
//senv.fromCollection(data)
// 添加事件时间提取器
.assignTimestampsAndWatermarks(new TimestampExtractor())
// 对自定的数据集进行窗口计数
val counts = text
.map { (m: String) => new Character(m.split(",")(1), 1) }
.keyBy(_.character)
//划分规则时按1分钟,内秒数划分的1-10,5-15,..30-40,35-45,40-50,45-55,50-60,55-60
.timeWindow(Time.seconds(10), Time.seconds(5))
.reduce(new ReduceDemo, new MyWFunction)
.print()
senv.execute("EventTime processing examkple")
}
case class Character(character: String, num: Int)
class ReduceDemo extends ReduceFunction[Character] {
// 计算传入的数据累加
override def reduce(value1: Character, value2: Character): Character = {
new Character(value1.character, value2.num + value1.num)
}
}
class MyWFunction extends WindowFunction[Character, String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Character], out: Collector[String]): Unit = {
val sb = new StringBuilder
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
sb.append(s"时间范围: Start: ${format.format(window.getStart)},${format.format(window.getEnd)} ")
sb.append(input.iterator.next().toString())
out.collect(sb.toString())
}
}
}
input:
1522827251000,a
1522827252000,a
1522827251000,b
1522827252000,b
1522827256000,a
1522827254000,a
1522827261000,a
1522827270000,a
result:
timestamp: 2018-04-04 15:34:11.000|2018-04-04 15:34:11.000|2018-04-04 15:34:06.000
timestamp: 2018-04-04 15:34:12.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
timestamp: 2018-04-04 15:34:11.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
timestamp: 2018-04-04 15:34:12.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
timestamp: 2018-04-04 15:34:16.000|2018-04-04 15:34:16.000|2018-04-04 15:34:11.000
timestamp: 2018-04-04 15:34:14.000|2018-04-04 15:34:16.000|2018-04-04 15:34:11.000
timestamp: 2018-04-04 15:34:21.000|2018-04-04 15:34:21.000|2018-04-04 15:34:16.000
时间范围: Start: 2018-04-04 15:34:05,2018-04-04 15:34:15 Character(a,3)
时间范围: Start: 2018-04-04 15:34:05,2018-04-04 15:34:15 Character(b,2)
timestamp: 2018-04-04 15:34:30.000|2018-04-04 15:34:30.000|2018-04-04 15:34:25.000
时间范围: Start: 2018-04-04 15:34:10,2018-04-04 15:34:20 Character(a,4)
时间范围: Start: 2018-04-04 15:34:10,2018-04-04 15:34:20 Character(b,2)
时间范围: Start: 2018-04-04 15:34:15,2018-04-04 15:34:25 Character(a,2)
Window的 allowedLateness
概述:
基于Event-Time的窗口处理流式数据,虽然提供了Watermark机制,却只能在一定程度上解决数据乱序的问题,
但在某些情况下数据可能延时会非常严重,即使通过Watermark机制也无法等到数据全部进入窗口再进行处理.
Flink中默认会将这些迟到的数据做丢弃处理,但是有些时候用户希望即使数据延迟达到的情况下,也能够正常
按照流程处理并输出结果,此时就需要使用Allowed Lateness机制来对迟到的数据进行额外的处理
情况:
通常情况下用户虽然希望对迟到的数据进行窗口计算,但并不想将结果混入正常的计算流程中,
例如: 用户大屏数据展示系统,即使正常的窗口中没有将迟到的数据进行统计,但为了保证页面数据显示的连续型,
后来接入到系统中迟到数据所统计出来的结果不希望显示在屏幕上,而是将延时数据和结果存储到数据库中,
便于后期对延时数据进行分析.
解决:
对于这种情况需要借助SideOutput来处理,通过使用sideOutputLateDate(OutputTag)来标记迟到数据计算的结果,
然后使用getSideOutput(lateOutputTag)从窗口结果中获取lateOutputTag标签对应的数据,之后转成独立的DataStream
数据集进行处理,创建late-date的OutputTag,再通过该标签从窗口结果中将迟到的数据筛选出来.
注意:
如果有Watermark同时也有Allowed Lateness,name窗口函数再次触发的条件是:
watermark < end-of-window +allowdLateness
案例:
import java.text.SimpleDateFormat
import FlinkDemo.functions.FunctionClassTransformation.StationLog
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object LateDataOnWindow {
/**
* 案例: 每隔5s统计最近10s,每个基站的呼叫数量
* 1. 每隔基站的数据会存在乱序
* 2. 大多数数据延迟2s到,但是有些数据迟到时间比较长
* 3. 迟到时间超过2s的数据不能丢弃,放入测流
*/
def main(args: Array[String]): Unit = {
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 导入隐式转换
import org.apache.flink.streaming.api.scala._
val data = env.socketTextStream("localhost", 9999)
// .assignTimestampsAndWatermarks(new TimestampExtractor1())
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
// 引入Watermark
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)) {
//延迟2s
override def extractTimestamp(t: StationLog): Long = {
t.callTime
}
}
)
//分组,开窗处理
// 定义一个侧输出流的 标签
val lateTage = new OutputTag[StationLog]("late")
val mainStream = data.keyBy(_.sid)
.timeWindow(Time.seconds(10), Time.seconds(5))
// 注意: 只要符合 watermark < end-of-window + allowedLateness 之内达到
// 的数据,都会被再次出发窗口的计算
// 迟到之外的迟到数据会被放入侧输出流
.allowedLateness(Time.seconds(5)) // 允许数据迟到5s
// 迟到的数据, 输出另一个位置保存
.sideOutputLateData(lateTage)
.aggregate(new AggregateCount, new OutputResult)
// 迟到很久的数据可以另外再处理
mainStream.getSideOutput(lateTage).print("late")// 迟到很久的数据可以另外再处理
mainStream.print("main")
env.execute()
}
/**
* 累加聚合操作
*/
class AggregateCount extends AggregateFunction[StationLog, Long, Long] {
// 创建累加器
override def createAccumulator(): Long = 0
// 累加器加值
override def add(value: StationLog, accumulator: Long): Long = accumulator + 1
// 获取累加器结果
override def getResult(accumulator: Long): Long = accumulator
//合并Value
override def merge(a: Long, b: Long): Long = a + b
}
class OutputResult extends WindowFunction[Long, String, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
var sb = new StringBuilder
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
sb.append(s"时间范围: Start: ${format.format(window.getStart)},----${format.format(window.getEnd)}")
sb.append("\n")
sb.append("\n")
sb.append("当前基站是:").append(key)
.append(" 呼叫数量是: ").append(input.iterator.next())
out.collect(sb.toString())
}
}
}