一篇文章带你深入理解FlinkSQL中的窗口

前言

         时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看 Table API 和 SQL 中,怎么利用时间字段做窗口操作。在 Table API 和 SQL 中,主要有两种窗口:Group Windows 和 Over Windows(时间语义的文章推荐

一、分组窗口(Group Windows)

         分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。         Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由 as 子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。例子:

val table = input
.window([w: GroupWindow] as 'w)
.groupBy('w, 'a)
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)

         Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层DataStream 或 DataSet 的窗口操作。         

        Table API 支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)滑动(Sliding会话(Session)

1.1  滚动窗口

       滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法:

  • over:定义窗口长度
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的 groupBy 中

实现案例

  1. 需求        设置滚动窗口为10秒钟统计id出现的次数。
  2. 数据准备
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
  1. 代码实现
package windows

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble}
import org.apache.flink.types.Row

/**
 * @Package Windows
 * @File :FlinkSQLTumBlingTie.java
 * @author 大数据老哥
 * @date 2020/12/25 21:58
 * @version V1.0
 *          设置滚动窗口 */
object FlinkSQLTumBlingTie {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, settings)

    // 读取数据
    val inputPath = "./data/sensor.txt"
    val inputStream = env.readTextFile(inputPath)
   

    // 先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
    // 注册表
    tableEnv.createTemporaryView("sensor", sensorTable)
    // table 实现
    val resultTable = sensorTable
      .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒统计一次,滚动时间窗口
      .groupBy('id, 'tw)
      .select('id, 'id.count, 'tw.end)
    //sql 实现
    val sqlTable = tableEnv.sqlQuery(
      """
        |select
        |id,
        |count(id) ,
        |tumble_end(ts,interval '10' second)
        |from sensor
        |group by
        |id,
        |tumble(ts,interval '10' second)        |""".stripMargin)

    /***
     * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件时间字段 rowtime)
     * .window(Tumble over 10.minutes on 'proctime as 'w)(处理时间字段 proctime)
     * .window(Tumble over 10.minutes on 'proctime as 'w) (类似于计数窗口,按处理时间排序,10 行一组)     */
    resultTable.toAppendStream[Row].print("talbe")
    sqlTable.toRetractStream[Row].print("sqlTable")
    
    env.execute("FlinkSQLTumBlingTie")
  }

  case class SensorReading(id: String, timestamp: Long, temperature: Double)

}

1.2 滑动窗口

滑动窗口(Sliding windows)要用 Slide 类来定义,另外还有四个方法:

  • over:定义窗口长度
  • every:定义滑动步长
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的 groupBy 中

实现案例

  1. 需求描述           设置窗口大小为10秒钟设置滑动距离为5秒钟,统计id的出现的次数。
  2. 数据准备
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
  1. 实现代码
package windows

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import windows.FlinkSQLTumBlingTie.SensorReading

/**
 * @Package windows
 * @File :FlinkSQLSlideTime.java
 * @author 大数据老哥
 * @date 2020/12/27 22:19
 * @version V1.0
 *          滑动窗口 */
object FlinkSQLSlideTime {
  def main(args: Array[String]): Unit = {
    //构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置分区为1 方便后面测试
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    // 创建表env
    val tableEnv = StreamTableEnvironment.create(env, settings)

    // 读取数据
    val inputPath = "./data/sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    // 先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
    // 注册表
    tableEnv.createTemporaryView("sensor", sensorTable)
    // table API 实现
    val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w)
      .groupBy('w, 'id)
      .select('id, 'id.count, 'w.end)
    val tableSql = tableEnv.sqlQuery(
      """
        |select
        |id,
        |count(id),
        |HOP_END(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND )as w
        |from sensor
        |group by
        |HOP(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND),id        |""".stripMargin)

    tableApi.toAppendStream[Row].print("tableApi")
    tableSql.toAppendStream[Row].print("tableSql")
    /**
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) (事件时间字段 rowtime)
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) (处理时间字段 proctime) 
.window(Slide over 10.rows every 5.rows on 'proctime as 'w) (类似于计数窗口,按处理时间排序,10 行一组)   **/
    env.execute("FlinkSQLSlideTime")
  }
}

1.3 会话窗口

       会话窗口(Session windows)要用 Session 类来定义,另外还有三个方法:

  • withGap:会话时间间隔
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的 groupBy 中实现案例
  1. 需求描述          设置一个session 为10秒钟 统计id的个数
  2. 准备数据
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
  1. 编写代码
package windows

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Session, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import windows.FlinkSQLTumBlingTie.SensorReading

/**
 * @Package windows
 * @File :FlinkSqlSessionTime.java
 * @author 大数据老哥
 * @date 2020/12/27 22:52
 * @version V1.0 */
object FlinkSqlSessionTime {
  def main(args: Array[String]): Unit = {
    //构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置分区为1 方便后面测试
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    // 创建表env
    val tableEnv = StreamTableEnvironment.create(env, settings)

    // 读取数据
    val inputPath = "./data/sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    // 先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
    // 注册表
    tableEnv.createTemporaryView("sensor", sensorTable)

    val tableApi = sensorTable.
      window(Session withGap 10.seconds on 'ts as 'w)
      .groupBy('id, 'w)
      .select('id, 'id.count, 'w.end)
    val tableSQL = tableEnv.sqlQuery(
      """
        |SELECT
        |id,
        |COUNT(id),
        |SESSION_END(ts, INTERVAL '10' SECOND) AS w
        |FROM sensor
        |GROUP BY
        |id,
        |SESSION(ts, INTERVAL '10' SECOND)        |""".stripMargin)
    tableApi.toAppendStream[Row].print("tableApi")
    tableSQL.toAppendStream[Row].print("tableSQL")

    /**
     * .window(Session withGap 10.minutes on 'rowtime as 'w) 事件时间字段 rowtime)
     * .window(Session withGap 10.minutes on 'proctime as 'w) 处理时间字段 proctime)     */
    env.execute("FlinkSqlSessionTime")
  }
}

二、 Over Windows

         Over window 聚合是标准 SQL 中已有的(Over 子句),可以在查询的 SELECT 子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Over windows使用.window(w:overwindows*)子句定义,并在 select()方法中通过别名来引用。例子:

val table = input
.window([w: OverWindow] as 'w)
.select('a, 'b.sum over 'w, 'c.min over 'w)

         Table API 提供了 Over 类,来配置 Over 窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows。         

        无界的 over window 是使用常量指定的。也就是说,时间间隔要指定 UNBOUNDED_RANGE,或者行计数间隔要指定 UNBOUNDED_ROW。而有界的 over window 是用间隔的大小指定的。

2.1 无界的 over window

// 无界的事件时间 over window (时间字段 "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
//无界的处理时间 over window (时间字段"proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
// 无界的事件时间 Row-count over window (时间字段 "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
//无界的处理时间 Row-count over window (时间字段 "rowtime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

2.2 有界的 over window

// 有界的事件时间 over window (时间字段 "rowtime",之前 1 分钟)
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
// 有界的处理时间 over window (时间字段 "rowtime",之前 1 分钟)
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
// 有界的事件时间 Row-count over window (时间字段 "rowtime",之前 10 行)
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
// 有界的处理时间 Row-count over window (时间字段 "rowtime",之前 10 行)
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

2.3 代码练习

         我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,统计每个sensor每条数据,与之前两行数据的平均温度。

数据准备

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9

代码分析:

package windows

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Over, Tumble}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

/**
* @Package windows
* @File :FlinkSqlTumBlingOverTime.java
* @author 大数据老哥
* @date 2020/12/28 21:45
* @version V1.0*/
object FlinkSqlTumBlingOverTime {
 def main(args: Array[String]): Unit = {
   // 构建运行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1) // 设置并行度为1方便后面进行测试
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置事件时间

   val settings = EnvironmentSettings.newInstance()
     .useBlinkPlanner()
     .inStreamingMode()
     .build()
   //构建table Env
   val tableEnv = StreamTableEnvironment.create(env, settings)

   // 读取数据
   val inputPath = "./data/sensor.txt"
   val inputStream = env.readTextFile(inputPath)
   // 先转换成样例类类型(简单转换操作)
   // 解析数据 封装成样例类
   val dataStream = inputStream
     .map(data => {
       val arr = data.split(",")
       SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
     })
     .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
       override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
     })
   // 将数据注册成一张临时表
   val dataTable = tableEnv.fromDataStream(dataStream,'id, 'temperature, 'timestamp.rowtime as 'ts)
   tableEnv.createTemporaryView("sensor",dataTable)
   var tableRes= dataTable.window( Over partitionBy 'id orderBy  'ts preceding 2.rows as 'ow)
    .select('id,'ts,'id.count over 'ow, 'temperature.avg over 'ow)

  var tableSql= tableEnv.sqlQuery(
     """
       |select
       |id,
       |ts,
       |count(id) over ow,
       |avg(temperature) over ow
       |from sensor
       |window ow as(
       | partition by id
       | order by ts
       | rows between 2 preceding and current row
       |)       |""".stripMargin)

   tableRes.toAppendStream[Row].print("tableRes")
   tableSql.toAppendStream[Row].print("tableSql")
   env.execute("FlinkSqlTumBlingOverTime")
 }
 case class SensorReading(id: String, timestamp: Long, temperature: Double)

}

总结

         好了到这里FlinkSql中窗口使用到这里就结束啦,喜欢的可以给了三连。其中FlinkSql中的窗口的用法还是比较多得,所有还是要多加练习。老话说的好,师傅领进门,修行在个人。有什么不明白的可以在评论区留言,也可以加我微信就进行一起讨论。我是大数据老哥,我们下期见~~~。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容