大数据学习之(二十一)structured streaming 基于事件时间窗口操作

[TOC]

简介

根据事件时间进行滑动窗口计算。

  • 必须要TimeStamp列

  • 默认每次输出都包含所有窗口对应的结果值

  • 处理迟到的事件 watermarking(容忍or丢弃)

  • 窗口滑动时间必须小于等于窗口时间。

  • 窗口划分规则

    根据窗口时间和滑动时间来判断输入的事件时间该条数据属于哪个窗口不好判断,所以这里预估多了一些窗口,并在输出时过滤有效的窗口。

  • 输出模式

    • outputMode("complete") 维护所有数据状态,无论迟到的数据有没有小于watermark。

demo源码

package com.mashibing.stscode.scalacode.windows

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable

/**
  *  实时读取scoket数据,对数据按照事件时间进行划分窗口统计wordcount
  *   1641780000000 zhangsan,lisi,maliu,zhangsan
  *   1641780002000 zhangsan,lisi,wangwu
  *   1641780005000 lisi,maliu,lisi
  *   1641780010000 zhangsan,lisi
  *   1641780003000 wangwu,zhangsan
  */
object WindowOnEventTime {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("WindowOnEventTime")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")
    import spark.implicits._

    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()

    //处理数据,将数据中的时间列转换成时间戳类型: 1641780000000 zhangsan,lisi,maliu,zhangsan
    val tsAndWordsDF : DataFrame = df.as[String].flatMap(line=>{
      val ts: String = line.split(" ")(0)
      val arr: mutable.ArraySeq[(Timestamp, String)] = line.split(" ")(1).split(",").map(word => {
        (new Timestamp(ts.toLong), word)
      })
      arr
    }).toDF("timestamp","word")

    //4.使用window 必须导入以下functions函数
    import org.apache.spark.sql.functions._
    //设置窗口
       //将数据按照窗口和单词分组,对每个窗口内数据进行统计,统计之后的DataFrame 多一个window Struct 类型的字段,包含窗口起始时间
    val transDF: DataFrame = tsAndWordsDF.groupBy(window($"timestamp", "10 seconds", "3 seconds"),
      $"word"
    ).count()

    transDF.printSchema()
 //5.获取设置窗口后的数据
    val result: DataFrame = transDF.map(row => {
      val startTime: Timestamp = row.getStruct(0).getTimestamp(0)
      val endTime: Timestamp = row.getStruct(0).getTimestamp(1)
      val word: String = row.getString(1)
      val count: Long = row.getLong(2)

      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

      (sdf.format(startTime.getTime), sdf.format(endTime.getTime), word, count)
    }).toDF("start", "end", "word", "count")

    val query: StreamingQuery = result.orderBy("start","end").writeStream
      .format("console")
      .outputMode("complete")
      .start()

    query.awaitTermination()


  }

}
#node1
nc -l 9999
#依次输入如下数据
   参考时间              timestamp       words
2022-01-10 10:00:00    1641780000000 zhangsan,lisi,maliu,zhangsan
2022-01-10 10:00:02    1641780002000 zhangsan,lisi,wangwu
2022-01-10 10:00:05    1641780005000 lisi,maliu,lisi
2022-01-10 10:00:10    1641780010000 zhangsan,lisi
2022-01-10 10:00:03    1641780003000 wangwu,zhangsan

watermarking

WaterMarking机制主要解决了延迟数据是否聚合和减少内存聚合状态问题。

作用

  1. 系统可以删除过期的状态数据,用于释放内存。
  2. 晚到的事件数据是否丢弃。

计算公式

*Watermark时间(T) = 最后触发窗口内最大的事件时间(MaxTime) - 允许数据迟到的时间(LateTime)*

每个窗口都有start time和end time属性,当watermark T时间大于一个窗口的end time后,当前窗口的状态会被系统丢弃。

注意

  1. 引入watermark之后代码中outputMode只能设置update、append模式系统才会删除过期数据,设置complete不会删除过期数据。设置update更新模式时,如果watermark值没过时间窗口的end time之前,如果有迟到数据落入到该窗口,该窗口会重复触发。
  2. watermak在非流数据处理上没有任何作用。
  3. watermark 可以基于窗口,也可以基于 eventtime事件时间本身。

output mode

complete

  1. 某些操作必须在complete模式下使用,比如 order by,在ouput update模式下会报错。
  2. 所有状态保留,事件保留。

update

  1. 窗口存在重复触发的可能:迟到的数据落入该窗口
  2. 状态,事件 根据water marking 废弃

append

  1. watermark必须大于等于一个窗口结束时间,那么这个窗口数据才会被输出
  2. 状态,事件 根据water marking 废弃

窗口类型

滑动窗口

目前演示的,默认的

滚动窗口

固定时间大小,不重叠的,连续时间间隔的窗口,也就是步长等于窗口长度的滑动窗口。

会话窗口

  1. 大小是动态不固定的,
  2. 会话窗口从输入开始如果在间隔时间内收到后续输入,则窗口长度自动扩展,如果在指定的间隔时间内没有接收到数据,则会话窗口自动关闭。
  3. 会话窗口中如果groupBy中有对应的列,是根据列字段划分窗口。
    比如demo中根据groupby时的 word,每个word 一个窗口。
  4. 可以根据设置的列条件****动态设置间隔时间。

join

StructuredStreaming结构化流支持与静态Dataset/DataFrame进行join,也支持和流式的Dataset/DataFrame进行join。

流与静态数据join

package com.mashibing.stscode.scalacode.jointest

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  *  Structured Streaming 流与静态数据join 关联
  */
object StreamAndStaticJoin {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("StreamAndStaticJoin")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")
    import spark.implicits._

    //创建 批量的DataFrame
    val list = List[String](
      "{\"aid\":1,\"name\":\"zs\",\"age\":18}",
      "{\"aid\":2,\"name\":\"ls\",\"age\":19}",
      "{\"aid\":3,\"name\":\"ww\",\"age\":20}",
      "{\"aid\":4,\"name\":\"ml\",\"age\":21}"
    )
    val personInfo: DataFrame = spark.read.json(list.toDS())

    //创建流式数据
    /**
      * 1,zs,100
      * 2,ls,200
      * 3,ww,300
      * 5,tq,500
      */
    val scoreInfo: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0).toInt, arr(1), arr(2).toInt)
      }).toDF("bid", "name", "score")

    //将流数据与静态数据进行关联
    val result: DataFrame = scoreInfo.join(personInfo,scoreInfo.col("bid") === personInfo.col("aid"),"left_semi")

    result.printSchema()

    result.writeStream
      .format("console")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start()
      .awaitTermination()

  }

}

输出如下,注意 join类型

-------------------------------------------
Batch: 1
-------------------------------------------
+---+----+-----+
|bid|name|score|
+---+----+-----+
|  1|  zs|  100|
|  2|  ls|  200|
|  3|  ww|  300|
+---+----+-----+

关联类型支持

https://spark.apache.org/docs/3.2.0/structured-streaming-programming-guide.html#types-of-time-windows

structured_stream_join_1.png

流-流Joins

一个流接收到的一条数据对于另外一条流有可能在任何时刻有对应数据与之匹配,所以我们对两条流数据进行缓存。

  1. 两条流的实时数据有可能有延迟数据,所以我们需要设置watermark机制自动处理迟到数据
  2. 对于缓存的数据也不能无限增大保存,所以需要设置Time Constraint 时间约束来删除缓存数据,使得过于旧的输入数据无法与将来的输入数据进行匹配。
package com.mashibing.stscode.scalacode.jointest

import java.sql.Timestamp

import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryListener}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  *  流和流join
  */
object StreamAndStreamJoin {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("StreamAndStaticJoin")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")
    import spark.implicits._

    //设置第一个流
    // xxx,1,zs,18
    val df1: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node5")
      .option("port", 9998)
      .load()
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (new Timestamp(arr(0).toLong), arr(1).toInt, arr(2),arr(3).toInt)
      }).toDF("ats","aid", "aname", "age")
      .withWatermark("ats","3 seconds")

    //设置第二个流
    val df2: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node5")
      .option("port", 9999)
      .load()
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (new Timestamp(arr(0).toLong), arr(1).toInt, arr(2),arr(3).toInt)
      }).toDF("bts","bid", "bname", "score")
      .withWatermark("bts","5 seconds")


    //两个流进行关联
    import org.apache.spark.sql.functions._
    val result = df1.join(df2,expr(
      """
        | aid = bid and
        | bts >= ats and
        | bts <= ats + interval 10 seconds
      """.stripMargin
    ),"leftOuter")


    val query1: StreamingQuery = df1.writeStream
      .format("console")
      .queryName("query1")
      .start()

    val query2: StreamingQuery = df2.writeStream
      .format("console")
      .queryName("query2")
      .start()

    val query3: StreamingQuery = result.writeStream
      .format("console")
      .queryName("query3")
      .start()

    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
      }
      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
      }
      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {


        if("query1".equals(queryProgress.progress.name)){
          println("query1 watermark : "+queryProgress.progress.eventTime.get("watermark"))
        }else if("query2".equals(queryProgress.progress.name)){
          println("query2 watermark : "+queryProgress.progress.eventTime.get("watermark"))
        }else{
          println("query3 watermark : "+queryProgress.progress.eventTime.get("watermark"))
        }
      }
    })

   spark.streams.awaitAnyTermination()
  }

}

注意点

对于批数据DataFrame/Dataset的一些操作,目前在实时数据流StructuredStreaming有一些Operator不被支持和一些注意点,如下:
(所以目前还比不上flink)

  1. Streaming Dataset不支持多个流group by 聚合操作

  2. Streaming Dataset不支持limit和获取topN操作

  3. Stremaing Dataset不支持Distinct操作

  4. Streaming Dataset 如果聚合后不支持Deduplication 流去重

  5. Streaming Dataset 在聚合后并且是complete输出模式才支持order by 排序

  6. Stream和Static进行join操作时,Right Outer、Full Outer不被支持。

  7. Dataset count操作在Streaming Dataset中使用ds.groupBy(col).count()代替。

  8. Dataset foreach() 操作在Streaming Dataset中使用ds.writeStream.foreach(...)代替。

  9. Dataset show()操作在Stream Dataset中使用ds.writeStrea.format("console")代替。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容