Flink Window(窗口) 详解
概述:
windows 计算是流式计算中非常常用的数据计算方式之一.通过按照固定时间或长度将数据流切分成不同窗口,
然后对数据进行相应的聚合运算,从而得到一定时间范围内的统计结果,
eg:
例如统计最近5min 内某基站的呼叫数,此时基站的数据在不断地产生,但是通过5min中的窗口将数据限定在固定
时间范围内,就可以对该范围内的有界数据执行聚合处理,得出最近5min的基站的呼叫数量.
1. Window 分类
1. Global Window 和 keyed Window
概述:
在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型,对应的Window也会有所不同.
Keyed Window :
上游数据集如果是KeyedStream类型,则调用DataStream API 的Window()方法,数据会根据Key在不同的Task实例
中并行分别计算,最后得出针对每个Key统计的结果.
Global Window:
如果是Non-Keyey类型,则调用WindowsAll()方法,所有的数据都会在窗口算子中汇到一个Task中计算,并得出全局
统计结果
eg:
// Global Window
data.windowAll(自定义的WindowAssigner)
//KeyedWindow
data.keyBy(_.sid)
.window(自定义的WindowAssigner)
2. Time Window 和Count Window
概述:
基于业务数据的方面考虑,Flink又支持两种类型的窗口,一种是基于时间的窗口叫Time Window,
还有一种基于输入数据量的窗口叫Count Window
3. Time Window(时间窗口)
概述:
根据不同的业务场景,Time Window也可以分为三种类型,分别是滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口叫Count Window
1. 滚动窗口(Tumbling Window)
概述:
滚动窗口是根据固定时间进行切分,且窗口和窗口之间的元素互不重叠,
这种类型的窗口最大特点是比较简单,只需要指定一个窗口长度(window size)
eg:
// 每个5s统计每个基站的日志数量
data.map((_.sid,1))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
//window(TumblingEventTImeWindows.of(Time.seconds(5)))
.sum(1)//聚合
其中时间间隔可以是Time.milliseconds(x),Time.seconds(x)或Time.minutes(x)
2. 滑动窗口(Sliding Window)
概述:
滑动窗口也是一种比较常见的窗口类型,其特点是在滚动窗口基础上增加了窗口滑动时间(Slide TIme),且允许窗口
数据发生重叠,当Windows size固定之后,窗口并不像滚动窗口按照windows Size向前移动,而是根据设定的Slide Time
向前滑动.
窗口之间的数据重叠大小根据Windows Size和Slide Time决定,当SlideTime小于Windows size 便会发生窗口重叠
eg:
//每隔3s计算最近5s内,每个基站的日志数量
data.map((_,1))
.keyBy(_._1)
.timeWindow(Time.seconds(5),Time.seconds(3))
.sum(1)
3. 会话窗口(Session Window)
概述:
会话窗口主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是Session Gap,是指在规定
的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果,
注意:
需要注意的是如果数据一直不间断地进入窗口,也会导致窗口始终不触发的情况.
与滑动窗口不同的是,Session Windows不需要有固定Window size和slide tinme ,
需要定义SEssion gap,来规定不活跃数据的时间上限即可
eg:
// 3s内如果没有数据接入,则计算每个基站的日志数量
data.map((_.sid,1))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(3))))
.sum(1)
4. Count Window(数量窗口)
概述:
Count Window 也有滚动窗口、滑动窗口等.使用较少
Window 的 API
概述:
在实际案例中Keyed Window 使用最多,所以我们需要掌握Keyed Window的算子,在每个窗口算子中包含了
Windows Assigner、Windows Trigger(窗口触发器)、Evictor(数据剔除器)、Lateness(时延设定)、
Output (输出标签)以及Windows Function,其中Windows Assigner和Windows Functions是所有窗口算子
必须指定的属性,其余的属性都是根据实际情况选择指定.
code:
stream.keyBy(...)是Keyed类型数据集
.window(...)//指定窗口分配器类型
[.trigger(...)]//指定触发器类型(可选)
[.evictor(...)] // 指定evictor或者不指定(可选)
[.allowedLateness(...)] //指定是否延迟处理数据(可选)
[.sideOutputLateData(...)] // 指定Output lag(可选)
.reduce/aggregate/fold/apply() //指定窗口计算函数
[.getSideOutput(...)] //根据Tag输出数据(可选)
intro:
Windows Assigner : 指定窗口的类型,定义如何将数据流分配到一个或多个窗口
Windows Trigger : 指定窗口触发的时机,定义窗口满足什么样的条件触发计算
Evictor : 用于数据剔除
allowedLateness : 标记是否处理迟到数据,当迟到数据达到窗口是否触发计算
Output Tag: 标记输出标签,然后在通过getSideOutput将窗口中的数据根据标签输出
Windows Function: 定义窗口上数据处理的逻辑,例如对数据进行Sum操作
窗口聚合函数
概述:
如果定义了Window Assigner ,下一步就可以定义窗口内数据的计算逻辑,这也就是Window Function的定义,
Flink提供了四种类型的Window Function,分别为 ReduceFunction、AggregateFunction以及ProcessWindowFunction,
(sum和max)等.
前3种类型的Window Function按照计算原理的不同可以分为两大类
1. 一类是增量聚合函数: 对应有ReduceFunction、AggregateFunction;
2. 另一类是全量窗口函数,对应有ProcessWindowFunction(WindowFunction)
差异:
1. 增量聚合函数计算性能较高,占用内存空间少,主要因为基于中间状态的计算结果,窗口中只维护中间结果状态值,
不需要缓存原始数据.
2. 全量窗口函数使用的代价相对较高,性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,
然后等到窗口触发的时候,对所有的原始数据进行汇总计算.
1. ReduceFunction
概述
ReduceFunction 定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合的逻辑,
然后输出类型相同的一个结果元素
code:
// 每隔5s统计每个基站的日志数量
data.map((_.sid,1))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(TIme.seconds(5)))
.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
2. AggregateFunction
概述:
和ReduceFunction 相似,AggregateFunction也是基于中间状态计算结果的增量计算函数,但AggregateFunctino在窗口
计算上更加通用,AggregateFunction接口相对ReduceFunction更加灵活.实现复杂度也相对较高.
AggregateFunction接口中定义了三个需要复写的方法,其中
add()定义数据添加的逻辑,
getResult()定义了根据accmulator计算结果的逻辑,
merge()方法定义合并accumulator的逻辑
code:
//每隔3s内计算最近5s内,每个基站的日志数量
val data = env.readTextFile("D:\\Workspace\\IdeaProjects\\F1Demo\\src\\FlinkDemo\\functions\\station.log")
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
val result = data.map(stationLog => (stationLog.sid, 1))
.keyBy(_._1)
.timeWindow(Time.seconds(5), Time.seconds(3))
// new AggregateFunction[In,Acc,Out]
.aggregate(new AggregateFunction[(String, Int), (String, Long), (String, Long)] {
override def createAccumulator(): (String, Long) = ("", 0)
override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
(in._1, acc._2 + in._2)
}
override def getResult(acc: (String, Long)): (String, Long) = {
print(acc)
acc
}
override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
(acc._1, acc1._2 + acc._2)
}
})
env.execute()
}
3. ProcessWindowFunction
概念:
前面提到的ReduceFunction和AggregateFunction都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大
多数场景,但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据
和窗口元数据,这时就需要使用到ProcessWindowsFunction,ProcessWindowsFunction能够更加灵活地支持基于窗口
全部数据元素的结果计算,例如对整个窗口数据排序取TopN,这样的需要就必须使用ProcessWindowFunction.
code:
val result = data.map(stationLog => ((stationLog.sid, 1)))
.keyBy(_._1)
//.timeWindow(Time.seconds(5))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// ProcessWindowFunction[In,Out,Key,Window]
.process(new ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
//一个窗口结束的时候调用一次(一个分组执行一次)
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
print("----")
//注意:整个窗口的数据保存到Iterable,里面有很多行数据。Iterable的size就是日志的总条数
out.collect((key, elements.size))
}
}).print()
env.execute()
案例:
1. AggregateFunction
import FlinkDemo.functions.FunctionClassTransformation.StationLog
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object WindowDemos {
// 导入Flink隐式转换
import org.apache.flink.streaming.api.scala._
def main(args: Array[String]): Unit = {
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.socketTextStream("localhost", 9999)
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
val result = data.map(stationLog => (stationLog.sid, 1))
.keyBy(_._1)
// timeWindow(t1,t2) t1表示窗口大小,t2表示滑动窗口大小
.timeWindow(Time.seconds(10), Time.seconds(3))
// new AggregateFunction[In,Acc,Out]
.aggregate(new MyAggregateFunction, new MyWindowFunction)
.print()
env.execute()
}
/**
* AggregateFunction<IN, ACC, OUT>
* 1. In 表示输入参数类型
* 2. ACC 表示累加器类型
* 3. Out 表示输出值类型
* add => 表示来一条数据执行一次
* getResult => 表示在窗口结束的时候执行一次
*/
class MyAggregateFunction extends AggregateFunction[(String, Int), (String, Long), (String, Long)] {
// 初始化一个累加器,开始的时候为0
override def createAccumulator(): (String, Long) = ("", 0)
override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
(in._1, acc._2 + in._2)
}
override def getResult(acc: (String, Long)): (String, Long) = {
print(acc)
acc
}
//合并统计的值
override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
(acc._1, acc1._2 + acc._2)
}
}
/**
* WindowFunction[IN, OUT, KEY, W <: Window]
* 1. In 表示输入参数类型
* 2. OUT 表示输出参数类型
* 3. key表示 key的类型
* 4. W 表示window类型时间窗口
* 输入数据来自于AggregateFunction,在窗口结束的时候先执行AggregateFunction对象的getResult,
* 然后在执行apply()
*/
class MyWindowFunction extends WindowFunction[(String, Long), (String, Long), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
// 获取迭代器的第一个(迭代器中只有一个值)
out.collect((key, input.iterator.next()._2))
}
}
}