Flink Window窗口详解

Flink Window(窗口) 详解

概述:
    windows 计算是流式计算中非常常用的数据计算方式之一.通过按照固定时间或长度将数据流切分成不同窗口,
    然后对数据进行相应的聚合运算,从而得到一定时间范围内的统计结果,
eg:
    例如统计最近5min 内某基站的呼叫数,此时基站的数据在不断地产生,但是通过5min中的窗口将数据限定在固定
    时间范围内,就可以对该范围内的有界数据执行聚合处理,得出最近5min的基站的呼叫数量.

1. Window 分类

1. Global Window 和 keyed Window
概述:
    在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型,对应的Window也会有所不同.
Keyed Window :
    上游数据集如果是KeyedStream类型,则调用DataStream API 的Window()方法,数据会根据Key在不同的Task实例
    中并行分别计算,最后得出针对每个Key统计的结果.
Global Window:
    如果是Non-Keyey类型,则调用WindowsAll()方法,所有的数据都会在窗口算子中汇到一个Task中计算,并得出全局
    统计结果
eg:
    // Global Window
    data.windowAll(自定义的WindowAssigner)
    //KeyedWindow
    data.keyBy(_.sid)
    .window(自定义的WindowAssigner)
2. Time Window 和Count Window
概述:
    基于业务数据的方面考虑,Flink又支持两种类型的窗口,一种是基于时间的窗口叫Time Window,
    还有一种基于输入数据量的窗口叫Count Window
3. Time Window(时间窗口)
概述:
    根据不同的业务场景,Time Window也可以分为三种类型,分别是滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口叫Count Window
1. 滚动窗口(Tumbling Window)
概述:
    滚动窗口是根据固定时间进行切分,且窗口和窗口之间的元素互不重叠,
    这种类型的窗口最大特点是比较简单,只需要指定一个窗口长度(window size)
eg:
    // 每个5s统计每个基站的日志数量
    data.map((_.sid,1))
    .keyBy(_._1)
    .timeWindow(Time.seconds(5))
    //window(TumblingEventTImeWindows.of(Time.seconds(5)))
    .sum(1)//聚合
    其中时间间隔可以是Time.milliseconds(x),Time.seconds(x)或Time.minutes(x)
2. 滑动窗口(Sliding Window)
概述:
    滑动窗口也是一种比较常见的窗口类型,其特点是在滚动窗口基础上增加了窗口滑动时间(Slide TIme),且允许窗口
    数据发生重叠,当Windows size固定之后,窗口并不像滚动窗口按照windows Size向前移动,而是根据设定的Slide Time
    向前滑动.
    窗口之间的数据重叠大小根据Windows Size和Slide Time决定,当SlideTime小于Windows size 便会发生窗口重叠
eg: 
    //每隔3s计算最近5s内,每个基站的日志数量
    data.map((_,1))
    .keyBy(_._1)
    .timeWindow(Time.seconds(5),Time.seconds(3))
    .sum(1)
3. 会话窗口(Session Window)
概述:
    会话窗口主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是Session Gap,是指在规定
    的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果,
注意:
    需要注意的是如果数据一直不间断地进入窗口,也会导致窗口始终不触发的情况.
    与滑动窗口不同的是,Session Windows不需要有固定Window size和slide tinme ,
    需要定义SEssion gap,来规定不活跃数据的时间上限即可
eg:
    // 3s内如果没有数据接入,则计算每个基站的日志数量
    data.map((_.sid,1))
    .keyBy(_._1)
    .window(EventTimeSessionWindows.withGap(Time.seconds(3))))
    .sum(1)
4. Count Window(数量窗口)
概述:
    Count Window 也有滚动窗口、滑动窗口等.使用较少

Window 的 API

概述:
    在实际案例中Keyed Window 使用最多,所以我们需要掌握Keyed Window的算子,在每个窗口算子中包含了 
    Windows Assigner、Windows Trigger(窗口触发器)、Evictor(数据剔除器)、Lateness(时延设定)、
    Output (输出标签)以及Windows Function,其中Windows Assigner和Windows Functions是所有窗口算子
    必须指定的属性,其余的属性都是根据实际情况选择指定.
code:
    stream.keyBy(...)是Keyed类型数据集
    .window(...)//指定窗口分配器类型
    [.trigger(...)]//指定触发器类型(可选)
    [.evictor(...)] // 指定evictor或者不指定(可选)
    [.allowedLateness(...)] //指定是否延迟处理数据(可选)
    [.sideOutputLateData(...)] // 指定Output lag(可选)
    .reduce/aggregate/fold/apply() //指定窗口计算函数
    [.getSideOutput(...)] //根据Tag输出数据(可选)
intro:
    Windows Assigner : 指定窗口的类型,定义如何将数据流分配到一个或多个窗口
    Windows Trigger : 指定窗口触发的时机,定义窗口满足什么样的条件触发计算
    Evictor : 用于数据剔除
    allowedLateness : 标记是否处理迟到数据,当迟到数据达到窗口是否触发计算
    Output Tag: 标记输出标签,然后在通过getSideOutput将窗口中的数据根据标签输出
    Windows Function: 定义窗口上数据处理的逻辑,例如对数据进行Sum操作

窗口聚合函数

概述:
    如果定义了Window Assigner ,下一步就可以定义窗口内数据的计算逻辑,这也就是Window Function的定义,
    Flink提供了四种类型的Window Function,分别为 ReduceFunction、AggregateFunction以及ProcessWindowFunction,
    (sum和max)等.
    前3种类型的Window Function按照计算原理的不同可以分为两大类
1. 一类是增量聚合函数: 对应有ReduceFunction、AggregateFunction;
2. 另一类是全量窗口函数,对应有ProcessWindowFunction(WindowFunction)
差异:
    1. 增量聚合函数计算性能较高,占用内存空间少,主要因为基于中间状态的计算结果,窗口中只维护中间结果状态值,
    不需要缓存原始数据.
    2. 全量窗口函数使用的代价相对较高,性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,
    然后等到窗口触发的时候,对所有的原始数据进行汇总计算.
1. ReduceFunction
概述
    ReduceFunction 定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合的逻辑,
    然后输出类型相同的一个结果元素
code:
    // 每隔5s统计每个基站的日志数量
    data.map((_.sid,1))
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(TIme.seconds(5)))
    .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
2. AggregateFunction
概述:
    和ReduceFunction 相似,AggregateFunction也是基于中间状态计算结果的增量计算函数,但AggregateFunctino在窗口
    计算上更加通用,AggregateFunction接口相对ReduceFunction更加灵活.实现复杂度也相对较高. 
    AggregateFunction接口中定义了三个需要复写的方法,其中
    add()定义数据添加的逻辑,
    getResult()定义了根据accmulator计算结果的逻辑,
    merge()方法定义合并accumulator的逻辑
code:
    //每隔3s内计算最近5s内,每个基站的日志数量
        val data = env.readTextFile("D:\\Workspace\\IdeaProjects\\F1Demo\\src\\FlinkDemo\\functions\\station.log")
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        val result = data.map(stationLog => (stationLog.sid, 1))
          .keyBy(_._1)
          .timeWindow(Time.seconds(5), Time.seconds(3))
          // new AggregateFunction[In,Acc,Out]
          .aggregate(new AggregateFunction[(String, Int), (String, Long), (String, Long)] {
          override def createAccumulator(): (String, Long) = ("", 0)
    
          override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
            (in._1, acc._2 + in._2)
          }
    
          override def getResult(acc: (String, Long)): (String, Long) = {
            print(acc)
            acc
          }
    
          override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
            (acc._1, acc1._2 + acc._2)
          }
        })
    
        env.execute()
      }
3. ProcessWindowFunction
概念:
    前面提到的ReduceFunction和AggregateFunction都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大
    多数场景,但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据
    和窗口元数据,这时就需要使用到ProcessWindowsFunction,ProcessWindowsFunction能够更加灵活地支持基于窗口
    全部数据元素的结果计算,例如对整个窗口数据排序取TopN,这样的需要就必须使用ProcessWindowFunction.
code:
    val result = data.map(stationLog => ((stationLog.sid, 1)))
  .keyBy(_._1)
  //.timeWindow(Time.seconds(5))
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  // ProcessWindowFunction[In,Out,Key,Window]
  .process(new ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
  //一个窗口结束的时候调用一次(一个分组执行一次)
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
    print("----")
    //注意:整个窗口的数据保存到Iterable,里面有很多行数据。Iterable的size就是日志的总条数
    out.collect((key, elements.size))
  }
}).print()
env.execute()

案例:

1. AggregateFunction
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    object WindowDemos {
    
      // 导入Flink隐式转换
      import org.apache.flink.streaming.api.scala._
    
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val data = env.socketTextStream("localhost", 9999)
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        val result = data.map(stationLog => (stationLog.sid, 1))
          .keyBy(_._1)
          // timeWindow(t1,t2) t1表示窗口大小,t2表示滑动窗口大小
          .timeWindow(Time.seconds(10), Time.seconds(3))
          // new AggregateFunction[In,Acc,Out]
          .aggregate(new MyAggregateFunction, new MyWindowFunction)
          .print()
    
        env.execute()
      }
    
      /**
        * AggregateFunction<IN, ACC, OUT>
        *   1. In 表示输入参数类型
        *   2. ACC 表示累加器类型
        *   3. Out 表示输出值类型
        * add => 表示来一条数据执行一次
        * getResult => 表示在窗口结束的时候执行一次
        */
      class MyAggregateFunction extends AggregateFunction[(String, Int), (String, Long), (String, Long)] {
        // 初始化一个累加器,开始的时候为0
        override def createAccumulator(): (String, Long) = ("", 0)
    
        override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
          (in._1, acc._2 + in._2)
        }
    
        override def getResult(acc: (String, Long)): (String, Long) = {
          print(acc)
          acc
        }
        //合并统计的值
        override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
          (acc._1, acc1._2 + acc._2)
        }
      }
    
      /**
        * WindowFunction[IN, OUT, KEY, W <: Window]
        * 1. In 表示输入参数类型
        * 2. OUT 表示输出参数类型
        * 3. key表示 key的类型
        * 4. W 表示window类型时间窗口
        * 输入数据来自于AggregateFunction,在窗口结束的时候先执行AggregateFunction对象的getResult,
        * 然后在执行apply()
        */
      class MyWindowFunction extends WindowFunction[(String, Long), (String, Long), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
          // 获取迭代器的第一个(迭代器中只有一个值)
          out.collect((key, input.iterator.next()._2))
        }
      }
    
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容

  • 原文连接 https://ci.apache.org/projects/flink/flink-docs-rele...
    Alex90阅读 3,456评论 0 5
  • 说明:本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推...
    大数据研习社阅读 2,130评论 0 0
  • 摘要 Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,...
    尼小摩阅读 5,090评论 0 13
  • 概述 Apache Flink 是一个为生产环境而生的流处理器,具有易于使用的 API,可以用于定义高级流分析程序...
    木戎阅读 4,557评论 0 2
  • 在这条逐梦的路上太久,经历得越多,伤害也越多,我们总会在麻木中不经意地丢掉梦想,偏离了离梦想最近的那条轨道,往往就...
    上官冒冒阅读 200评论 0 0