第五篇|Spark-Streaming编程指南(2)

第四篇|Spark-Streaming编程指南(1)对Spark Streaming执行机制、Transformations与Output Operations、Spark Streaming数据源(Sources)、Spark Streaming 数据汇(Sinks)进行了讨论。本文将延续上篇内容,主要包括以下内容:

  • 有状态的计算
  • 基于时间的窗口操作
  • 持久化
  • 检查点Checkpoint
  • 使用DataFrames & SQL处理流数据

有状态的计算

updateStateByKey

上一篇文章中介绍了常见的无状态的转换操作,比如在WordCount的例子中,输出的结果只与当前batch interval的数据有关,不会依赖于上一个batch interval的计算结果。spark Streaming也提供了有状态的操作: updateStateByKey,该算子会维护一个状态,同时进行信息更新 。该操作会读取上一个batch interval的计算结果,然后将其结果作用到当前的batch interval数据统计中。其源码如下:

def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = ssc.withScope {
    updateStateByKey(updateFunc, defaultPartitioner())
  }

该算子只能在key–value对的DStream上使用,需要接收一个状态更新函数 updateFunc作为参数。使用案例如下:

object StateWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName(StateWordCount.getClass.getSimpleName)
    val ssc = new StreamingContext(conf, Seconds(5))
    // 必须开启checkpoint,否则会报错
    ssc.checkpoint("file:///e:/checkpoint")
    val lines = ssc.socketTextStream("localhost", 9999)

    // 状态更新函数
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {

      var oldvalue = stateValue.getOrElse(0) // 获取状态值
      // 遍历当前数据,并更新状态
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的状态
      Option(oldvalue)
    }

    val count = lines.flatMap(_.split(" "))
      .map(w => (w, 1))
      .updateStateByKey(updateFunc)
    count.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

尖叫提示:上面的代码必须要开启checkpoint,否则会报错:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()

updateStateByKey缺点

运行上面的代码会发现一个现象:即便没有数据源输入,Spark也会为新的batch interval更新状态,即如果没有数据源输入,则会不断地输出之前的计算状态结果。

updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

mapwithState

mapwithState是Spark提供的另外一个有状态的算子,该操作克服了updateStateByKey的缺点,从Spark 1.5开始引入。源码如下:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
      spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
      self,
      spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
  }

mapWithState只返回发生变化的key的值,对于没有发生变化的Key,则不返回。这样做可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(生产环境中建议使用)。

object StatefulNetworkWordCount {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAppName("StatefulNetworkWordCount")
      .setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("file:///e:/checkpoint")

    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    /**
      * word:当前key的值
      * one:当前key对应的value值
      * state:状态值
      */
    val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      println(s">>> batchTime = $batchTime")
      println(s">>> word      = $word")
      println(s">>> one     = $one")
      println(s">>> state     = $state")
      val output = (word, sum)
      state.update(sum) //更新当前key的状态值
      Some(output) //返回结果
    }
    // 通过StateSpec.function构建StateSpec
    val spec = StateSpec.function(mappingFunc)
    val stateDstream = wordDstream.mapWithState(spec)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

基于时间的窗口操作

Spark Streaming提供了两种类型的窗口操作,分别是滚动窗口和滑动窗口。具体分析如下:

滚动窗口(Tumbling Windows)

滚动窗口的示意图如下:滚动窗口只需要传入一个固定的时间间隔,滚动窗口是不存在重叠的。


源码如下:

/**
   * @param windowDuration:窗口的长度; 必须是batch interval的整数倍.
   */
  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)

滑动窗口(Sliding Windows)

滑动窗口的示意图如下:滑动窗口只需要传入两个参数,一个为窗口的长度,一个是滑动时间间隔。可以看出:滑动窗口是存在重叠的。


源码如下:

/**
   * @param windowDuration 窗口长度;必须是batching interval的整数倍
   *                       
   * @param slideDuration  滑动间隔;必须是batching interval的整数倍
   */
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
  }

窗口操作

  • window(windowLength, slideInterval)

    • 解释

      基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream

    • 源码

        def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
        def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
          new WindowedDStream(this, windowDuration, slideDuration)
        }
      
  • countByWindow(windowLength, slideInterval)

    • 解释

    返回一个滑动窗口的元素个数

    • 源码

      /**
         * @param windowDuration window长度,必须是batch interval的倍数 
         * @param slideDuration  滑动的时间间隔,必须是batch interval的倍数
         * 底层调用的是reduceByWindow
         */
        def countByWindow(
            windowDuration: Duration,
            slideDuration: Duration): DStream[Long] = ssc.withScope {
          this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
        }
      
  • reduceByWindow(func, windowLength, slideInterval)

    • 解释

    返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算

    • 源码

        def reduceByWindow(
            reduceFunc: (T, T) => T,
            windowDuration: Duration,
            slideDuration: Duration
          ): DStream[T] = ssc.withScope {
          this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
        }
      
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

    • 解释

    应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数

    • 源码

        def reduceByKeyAndWindow(
            reduceFunc: (V, V) => V,
            windowDuration: Duration,
            slideDuration: Duration
          ): DStream[(K, V)] = ssc.withScope {
          reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
        }
      
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

    • 解释

    更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行逆向reduce操作。但是,只能用于可逆reduce函数,即那些reduce函数都有一个对应的逆向reduce函数(以InvFunc参数传入)注意:必须开启 checkpointing

    • 源码

      def reduceByKeyAndWindow(
            reduceFunc: (V, V) => V,
            invReduceFunc: (V, V) => V,
            windowDuration: Duration,
            slideDuration: Duration,
            partitioner: Partitioner,
            filterFunc: ((K, V)) => Boolean
          ): DStream[(K, V)] = ssc.withScope {
      
          val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
          val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
          val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
          new ReducedWindowedDStream[K, V](
            self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
            windowDuration, slideDuration, partitioner
          )
        }
      
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])
    • 解释

      当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的对应的value值都是它们在滑动窗口中出现的频率

    • 源码

      def countByValueAndWindow(
            windowDuration: Duration,
            slideDuration: Duration,
            numPartitions: Int = ssc.sc.defaultParallelism)
            (implicit ord: Ordering[T] = null)
            : DStream[(T, Long)] = ssc.withScope {
          this.map((_, 1L)).reduceByKeyAndWindow(
            (x: Long, y: Long) => x + y,
            (x: Long, y: Long) => x - y,
            windowDuration,
            slideDuration,
            numPartitions,
            (x: (T, Long)) => x._2 != 0L
          )
        }
      

使用案例

val lines = ssc.socketTextStream("localhost", 9999)

    val count = lines.flatMap(_.split(" "))
      .map(w => (w, 1))
      .reduceByKeyAndWindow((w1: Int, w2: Int) => w1 + w2, Seconds(30), Seconds(10))
      .print()
//滚动窗口

/*    lines.window(Seconds(20))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()*/

持久化

持久化是提升Spark应用性能的一种方式,在第二篇|Spark core编程指南一文中讲解了RDD持久化的使用方式。其实,DStream也是支持持久化的,同样是使用persist()与cache()方法,持久化通常在有状态的算子中使用,比如窗口操作,默认情况下,虽然没有显性地调用持久化方法,但是底层已经帮用户做了持久化操作,通过下面的源码可以看出。

private[streaming]
class WindowedDStream[T: ClassTag](
    parent: DStream[T],
    _windowDuration: Duration,
    _slideDuration: Duration)
  extends DStream[T](parent.ssc) {
  // 省略代码...
  // Persist parent level by default, as those RDDs are going to be obviously reused.
  parent.persist(StorageLevel.MEMORY_ONLY_SER)
}

注意:与RDD的持久化不同,DStream的默认持久性级别将数据序列化在内存中,通过下面的源码可以看出:

/** 给定一个持计划级别 */
  def persist(level: StorageLevel): DStream[T] = {
    if (this.isInitialized) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of a DStream after streaming context has started")
    }
    this.storageLevel = level
    this
  }

  /** 默认的持久化级别为(MEMORY_ONLY_SER) */
  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
  def cache(): DStream[T] = persist()

从上面的源码可以看出persist()与cache()的主要区别是:

  • cache()方法底层调用的是persist()方法
  • persist()方法有两个重载的方法
    • 无参数的persist(),默认是内存
    • perisist(level: StorageLevel),可以选择与RDD持久化相同的持久化级别

检查点Checkpoint

简介

流应用程序通常是24/7运行的,因此必须对与应用程序逻辑无关的故障(例如系统故障,JVM崩溃等)具有弹性的容错能力。为此,Spark Streaming需要将足够的信息checkpoint到容错存储系统(比如HDFS),以便可以从故障中恢复。检查点包括两种类型:

  • 元数据检查点

    元数据检查点可以保证从Driver程序失败中恢复。即如果运行drive的节点失败时,可以查看最近的checkpoin数据获取最新的状态。典型的应用程序元数据包括:

    • 配置 :用于创建流应用程序的配置。
    • DStream操作 :定义流应用程序的DStream操作。
    • 未完成的batch :当前运行batch对应的job在队列中排队,还没有计算到该batch的数据。
  • 数据检查点

    将生成的RDD保存到可靠的存储中。在某些有状态转换中,需要合并多个批次中的数据,所以需要开启检查点。在此类转换中,生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间不断增加。为了避免恢复时间无限制的增加(与依赖链成比例),有状态转换的中间RDD定期 checkpoint到可靠的存储(例如HDFS),以切断依赖链,功能类似于持久化,只需要从当前的状态恢复,而不需要重新计算整个lineage。

总而言之,从Driver程序故障中恢复时,主要需要元数据检查点。而如果使用有状态转换,则需要数据或RDD检查点。

什么时候启用检查点

必须为具有以下类型的应用程序启用检查点:

  • 使用了有状态转换转换操作

    如果在应用程序中使用updateStateByKeyreduceByKeyAndWindow,则必须提供检查点目录以允许定期进行RDD检查点。

  • 从运行应用程序的Driver程序故障中恢复

    元数据检查点用于恢复进度信息。

注意,没有前述状态转换的简单流应用程序可以在不启用检查点的情况下运行。在这种情况下,从驱动程序故障中恢复也将是部分的(某些丢失但未处理的数据可能会丢失)。这通常是可以接受的,并且许多都以这种方式运行Spark Streaming应用程序。预计将来会改善对非Hadoop环境的支持。

如何配置检查点

可以通过具有容错的、可靠的文件系统(例如HDFS,S3等)中设置目录来启用检查点,将检查点信息保存到该目录中。开启检查点,需要开启下面的两个配置:

  • streamingContext.checkpoint(<dir>):配置检查点的目录,比如HDFS路径
  • dstream.checkpoint(<duration>):检查点的频率

其中配置检查点的时间间隔是可选的。如果不设置,会根据DStream的类型选择一个默认值。对于MapWithStateDStream,默认的检查点间隔是batch interval的10倍。对于其他的DStream,默认的检查点间隔是10S,或者是batch interval的间隔时间。需要注意的是:checkpoint的频率必须是 batch interval的整数倍,否则会报错

此外,如果要使应用程序从Driver程序故障中恢复,则需要使用下面的方式创建StreamingContext:

def createStreamingContext (conf: SparkConf,checkpointPath: String):
StreamingContext = {
val ssc = new StreamingContext( <ConfInfo> )
// .... other code ...
ssc.checkPoint(checkpointDirectory)
ssc
}
#创建一个新的StreamingContext或者从最近的checkpoint获取
val context = StreamingContext.getOrCreate(checkpointDirectory,
createStreamingContext _)
#启动
context.start()
context.awaitTermination()
  • 程序首次启动时,它将创建一个新的StreamingContext,然后调用start()。
  • 失败后重新启动程序时,它将根据检查点目录中的检查点数据重新创建StreamingContext。

注意:

RDD的检查点需要将数据保存到可靠存储上,由此带来一些成本开销。这可能会导致RDD获得检查点的那些批次的处理时间增加。因此,需要设置一个合理的检查点的间隔。在batch interval较小时(例如1秒),每个batch interval都进行检查点可能会大大降低吞吐量。相反,检查点时间间隔太长会导致 lineage和任务规模增加,这可能会产生不利影响。对于需要RDD检查点的有状态转换,默认间隔为batch interval的倍数,至少应为10秒。可以使用 dstream.checkpoint(checkpointInterval)进行配置。通常,DStream的5-10个batch interval的检查点间隔是一个较好的选择。

检查点和持久化之间的区别

  • 持久化

    • 当我们将RDD保持在DISK_ONLY存储级别时,RDD将存储在一个位置,该RDD的后续使用将不会重新计算lineage。
    • 在调用persist()之后,Spark会记住RDD的lineage,即使它没有调用它。
    • 作业运行完成后,将清除缓存并销毁文件。
  • 检查点

    • 检查点将RDD存储在HDFS中,将会删除lineage血缘关系。
    • 在完成作业运行后,与持计划不同,不会删除检查点文件。
    • 当checkpoint一个RDD时,将导致双重计算。即该操作在完成实际的计算工作之前,首先会调用持久化方法,然后再将其写入检查点目录。

使用DataFrames & SQL处理流数据

在Spark Streaming应用中,可以轻松地对流数据使用DataFrames和SQL操作。使用案例如下:

object SqlStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(SqlStreaming.getClass.getSimpleName)
      .setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))

    words.foreachRDD { rdd =>
      // 调用SparkSession单例方法,如果已经创建了,则直接返回
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      val wordsDataFrame = rdd.toDF("word")
      wordsDataFrame.show()

      wordsDataFrame.createOrReplaceTempView("words")

      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()

    }


    ssc.start()
    ssc.awaitTermination()
  }
}
/** SparkSession单例 */
object SparkSessionSingleton {

  @transient private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

总结

本文是Spark Streaming编程指南的第二篇分享,主要包括有状态的计算、基于时间的窗口操作、检查点等内容。下一篇将分享Spark MLLib机器学习

关注公众号大数据技术与数仓,及时了解最新动态

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351