Spark Streaming之MapWithStateDSteam

MapWithStateDStream

MapWithStateDStreammapWithState算子的结果;

def stateSnapshots(): DStream[(KeyType, StateType)]
  • MapWithStateDStreamsealed abstract class类型,因此所有其实现均在其srouce文件中可见;
  • MapWithStateDStreamImplMapWithStateDStream的唯一实现;

sealed关键字的作用:

其修饰的trait,class只能在当前文件里面被继承
用sealed修饰这样做的目的是告诉scala编译器在检查模式匹配的时候,让scala知道这些case的所有情况,scala就能够在编译的时候进行检查,看你写的代码是否有没有漏掉什么没case到,减少编程的错误。

MapWithStateDStreamImpl

  • MapWithStateDStreamImpl为内部(私有)、其父依赖为key-value的DStream;
  • 其内部实现依赖`InternalMapWithStateDStream类;
  • slideDuration/dependencies值均取自internalStream变量;

InternalMapWithStateDStream

  • InternalMapWithStateDStream用于实现MapWithStateDStreamImpl
  • 其集成DStream[MapWithStateRDDRecord[K, S, E]]类,并默认使用MEMORY_ONLY存储级别;
  • 其使用StateSpecHashPartitioner作为其分区;
  • 其强制执行checkpoint(override val mustCheckpoint = true),如果checkpointDuration为空,则设置为sliceDuration窗口大小;

InternalMapWithStateDStream.compute()

  /** Method that generates an RDD for the given time */
  // 生成给定时间的RDD,其主要作用是将State操作->转换为MapWithRecordRDD
  override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
    // Get the previous state or create a new empty state RDD
    val prevStateRDD = getOrCompute(validTime - slideDuration) match {
      case Some(rdd) =>
        if (rdd.partitioner != Some(partitioner)) {
          // If the RDD is not partitioned the right way, let us repartition it using the
          // partition index as the key. This is to ensure that state RDD is always partitioned
          // before creating another state RDD using it
          // 如果之前的RDD的partition不一致,需要基于partition index作为key进行repartition,
          // 这是确保state RDD 在使用之前是paritition正确
          MapWithStateRDD.createFromRDD[K, V, S, E](
            rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
        } else {
          rdd
        }
      case None =>
        MapWithStateRDD.createFromPairRDD[K, V, S, E](
          spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
          partitioner,
          validTime
        )
    }


    // Compute the new state RDD with previous state RDD and partitioned data RDD
    // Even if there is no data RDD, use an empty one to create a new state RDD
    // 基于之前的state RDD,计算新的RDD
    // 如果没有data RDD,使用一个空的创建
    val dataRDD = parent.getOrCompute(validTime).getOrElse {
      context.sparkContext.emptyRDD[(K, V)]
    }
    val partitionedDataRDD = dataRDD.partitionBy(partitioner)
    val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
      (validTime - interval).milliseconds
    }
    Some(new MapWithStateRDD(
      prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
  }

下面我们研究MapWithStateRDD.createFromPairRDD方法,

def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
      pairRDD: RDD[(K, S)],
      partitioner: Partitioner,
      updateTime: Time): MapWithStateRDD[K, V, S, E] = {
    
    // 将pairRDD转换为 MapWithStateRDDRecord()
    val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions ({ iterator =>
      val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
      iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) }
      Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
    }, preservesPartitioning = true)

    val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)

    val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None

    new MapWithStateRDD[K, V, S, E](
      stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
  }

MapWithStateRDD

  • 继承RDD, 其Dependencies依赖prevStateRDD和partitionedDataRDD;
RDD[MapWithStateRDDRecord[K, S, E]](
    partitionedDataRDD.sparkContext,
    List(
      new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD),
      new OneToOneDependency(partitionedDataRDD))

其compute()逻辑:

 override def compute(
      partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {

    val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
    val prevStateRDDIterator = prevStateRDD.iterator(
      stateRDDPartition.previousSessionRDDPartition, context)
    val dataIterator = partitionedDataRDD.iterator(
      stateRDDPartition.partitionedDataRDDPartition, context)

    val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
    val newRecord = MapWithStateRDDRecord.updateRecordWithData(
      prevRecord,
      dataIterator,
      mappingFunction,
      batchTime,
      timeoutThresholdTime,
      removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
    )
    Iterator(newRecord)
  }

其主要依赖MapWithStateRDDRecord.updateRecordWithData的方法,生成一个Iterator迭代器,其中stateMap存储了key的状态,mappedData存储了mapping function函数的返回值

    // Create a new state map by cloning the previous one (if it exists) or by creating an empty one
    // 如果之前的state map存在,则clone它;
    // 否则则创建一个空的;
    // Key -> State之间的mapping ,存储了key的状态
    val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }
    
    // 调动mappingFunction()的返回结果集,mapping function函数的返回值
    val mappedData = new ArrayBuffer[E]
    
    // State的wrap实现
    val wrappedState = new StateImpl[S]()

    // Call the mapping function on each record in the data iterator, and accordingly
    // update the states touched, and collect the data returned by the mapping function
    // 此处调用mappingFunction方法,并更新其state存储状态
    dataIterator.foreach { case (key, value) =>
      wrappedState.wrap(newStateMap.get(key))
      val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
      if (wrappedState.isRemoved) {
        newStateMap.remove(key)
      } else if (wrappedState.isUpdated
          || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
        newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
      }
      mappedData ++= returned
    }

    // Get the timed out state records, call the mapping function on each and collect the
    // data returned
    // 用户可以设置超时时的处理机制,此处遍历所有超时key,并触发其超时逻辑
    if (removeTimedoutData && timeoutThresholdTime.isDefined) {
      newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
        wrappedState.wrapTimingOutState(state)
        val returned = mappingFunction(batchTime, key, None, wrappedState)
        mappedData ++= returned
        newStateMap.remove(key)
      }
    }

    MapWithStateRDDRecord(newStateMap, mappedData)
  }

StateMap

/** Internal interface for defining the map that keeps track of sessions. */
private[streaming] abstract class StateMap[K, S] extends Serializable {

  /** Get the state for a key if it exists */
  def get(key: K): Option[S]

  /** Get all the keys and states whose updated time is older than the given threshold time */
  def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]

  /** Get all the keys and states in this map. */
  def getAll(): Iterator[(K, S, Long)]

  /** Add or update state */
  def put(key: K, state: S, updatedTime: Long): Unit

  /** Remove a key */
  def remove(key: K): Unit

  /**
   * Shallow copy `this` map to create a new state map.
   * Updates to the new map should not mutate `this` map.
   */
  def copy(): StateMap[K, S]

  def toDebugString(): String = toString()
}
  • 位置org.apache.spark.streaming.util.StateMap;
  • 存储Spark Streaming 状态信息类;
  • 默认提供EmptyStateMapOpenHashMapBasedStateMap两种实现;
  • OpenHashMap为支持nullabled的HashMap,其性能为jdk默认HashMap的5倍以上,但是当处理0.0/0/0L/non-exist值时,用户需要小心;

Demo

object SparkStatefulRunner {
  /**
    * Aggregates User Sessions using Stateful Streaming transformations.
    *
    * Usage: SparkStatefulRunner <hostname> <port>
    * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
    */
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: SparkRunner <hostname> <port>")
      System.exit(1)
    }

    val sparkConfig = loadConfigOrThrow[SparkConfiguration]("spark")

    val sparkContext = new SparkContext(sparkConfig.sparkMasterUrl, "Spark Stateful Streaming")
    val ssc = new StreamingContext(sparkContext, Milliseconds(4000))
    ssc.checkpoint(sparkConfig.checkpointDirectory)

    val stateSpec =
      StateSpec
        .function(updateUserEvents _)
        .timeout(Minutes(sparkConfig.timeoutInMinutes))

    ssc
      .socketTextStream(args(0), args(1).toInt)
      .map(deserializeUserEvent)
      .filter(_ != UserEvent.empty)
      .mapWithState(stateSpec)
      .foreachRDD { rdd =>
        if (!rdd.isEmpty()) {
          rdd.foreach(maybeUserSession => maybeUserSession.foreach {
            userSession =>
              // Store user session here
              println(userSession)
          })
        }
      }

    ssc.start()
    ssc.awaitTermination()
  }

  def deserializeUserEvent(json: String): (Int, UserEvent) = {
    json.decodeEither[UserEvent] match {
      case \/-(userEvent) =>
        (userEvent.id, userEvent)
      case -\/(error) =>
        println(s"Failed to parse user event: $error")
        (UserEvent.empty.id, UserEvent.empty)
    }
  }

  def updateUserEvents(key: Int,
                       value: Option[UserEvent],
                       state: State[UserSession]): Option[UserSession] = {
    def updateUserSessions(newEvent: UserEvent): Option[UserSession] = {
      val existingEvents: Seq[UserEvent] =
        state
          .getOption()
          .map(_.userEvents)
          .getOrElse(Seq[UserEvent]())

      val updatedUserSessions = UserSession(newEvent +: existingEvents)

      updatedUserSessions.userEvents.find(_.isLast) match {
        case Some(_) =>
          state.remove()
          Some(updatedUserSessions)
        case None =>
          state.update(updatedUserSessions)
          None
      }
    }

    value match {
      case Some(newEvent) => updateUserSessions(newEvent)
      case _ if state.isTimingOut() => state.getOption()
    }
  }
}

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容