Spark Streaming运行架构分析

韩晶晶 严律 黄春超

简介

Spark StreamingSpark Core的扩展,是构建于Spark Core之上的实时流处理系统。相对于其他实时流处理系统,Spark Streaming最大的优势在于其位于Spark技术栈中,也即流处理引擎与数据处理引擎在同一个软件栈中。在Spark Streaming中,数据的采集是以逐条方式,而数据处理是按批进行的。因此,其系统吞吐量会比流行的纯实时流处理引擎Storm高2~5倍。

Spark Streaming对流数据处理的过成大致可以分为:启动流处理引擎、接收和存储流数据、处理流数据和输出处理结果等四个步骤。其运行架构图如下所示:

[图片上传失败...(image-f1cfaf-1542849231639)]

Step1 启动流处理引擎

StreamingContextSpark StreamingDriver端的上下文,是spark streaming程序的入口。在该对象的启 动过程中,会初始化其内部的组件,其中最为重要的是DStreamGraph以及JobScheduler组件的初始化。

class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {
...
private[streaming] val conf = sc.conf

private[streaming] val env = sc.env

private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      _cp.graph.setContext(this)
      _cp.graph.restoreCheckpointData()
      _cp.graph
    } else {
      require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(_batchDur)
      newGraph
    }
  }
...    
private[streaming] val scheduler = new JobScheduler(this)
...
}

Spark Streaming中作业的生成方式类似Spark核心,对DStream进行的各种操作让他们之间构建起依赖关系,DStreamGraph记录了DStream之间的依赖关系等信息。

JobSchedulerSpark StreamingJob总调度者。JobScheduler 有两个非常重要的成员:JobGeneratorReceiverTrackerJobGenerator维护一个定时器,定时为每个 batch 生成RDD DAG的实例;ReceiverTracker负责启动、管理各个 receiver及管理各个receiver 接收到的数据。

通过调用StreamingContext#start()方法启动流处理引擎。在StreamingContext#start()中,调用StreamingContext#validate()方法对DStreamGraphcheckpoint等做有效性检查,然后启动新的线程设置SparkContext,并启动JobScheduler

 def start(): Unit = synchronized {
...
     validate()
     ThreadUtils.runInNewThread("streaming-start") {
         sparkContext.setCallSite(startSite.get)
         sparkContext.clearJobGroup()
         sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,                  "false")      
         savedProperties.set(SerializationUtils.clone(sparkContext
                .localProperties.get())) 
         scheduler.start()
     }
     state = StreamingContextState.ACTIVE
     StreamingContext.setActiveContext(this)
...
  }

Step2 接收与存储流数据

JobScheduler启动时,会创建一个新的 ReceiverTracker 实例 receiverTracker,并调用其start() 方法。在ReceiverTracker #start()中会初始化一个endpoint: ReceiverTrackerEndpoint对象,该对象用于接收和处理ReceiverTrackerreceivers之间 发送的消息。此外,在ReceiverTracker#start()中还会调用 launchReceivers 将各个receivers 分发到 executors 上。

def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }
    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

ReceiverTracker#launchReceivers()会从DStreamGraph.inputStreams 中抽取出receivers,也即数据接收器。得到receivers后,给消息接收处理器 endpoint 发送 StartAllReceivers(receivers)消息。

  private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map { nis =>
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    }
    runDummySparkJob()
    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
  }

endpoint在接收到消息后,首先会判别消息的类型,对不同的消息执行不同的处理操作。当收到StartAllReceivers类型的消息时,首先会计算每一个receiver要发送的目的executors,其计算主要遵循两条原则:一是尽可能的使receiver分布均匀;二是如果receiver本身的preferredLocation不均匀,则以preferredLocation为准。然后遍历每一个receiver,根据计算出的executors调用startReceiver方法来启动receivers

case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }

由于ReceiverInputDStream实例只有一个receiver,但receiver可能需要在多个worker上启动线程来接收数据,因此在startReceiver中需要将receiver及其对应的目的excutors转换成RDD

val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }

转换为RDD后,需要把receiver所进行的计算定义为startReceiverFunc函数,该函数以receiver实例为参数构造ReceiverSupervisorImpl实例supervisor,构造完毕后使用新线程启动该supervisor并阻塞该线程。

val supervisor = new ReceiverSupervisorImpl(
  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()

最后,将receiverRDD以及要在receiverRDD上执行的函数作为Job提交,以真正在各个executors上启动ReceiverJob执行后将会持续的进行数据的接收。

val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

Receiver源源不断的接收到实时流数据后,根据接收数据的大小进行判断,若数据量很小,则会聚集多条数据成一块,然后进行块存储;若数据量很大,则直接进行块存储。对于这些数据,Receiver会直接交由ReceiverSupervisor,由其进行数据的转储操作。配置参数spark.streaming.receiver.writeAheadLog.enable的值决定是否预写日志。根据参数值会产生不同类型的存储receivedBlockHandler对象。

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    //先写 WAL,再存储到 executor 的内存或硬盘
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    //直接存到 executor 的内存或硬盘
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}

根据receivedBlockHandler进行块存储。将 block 存储之后,会获得 block 描述信息 blockInfo:ReceivedBlockInfo,这其中包含:streamId、数据位置、数据条数、数据 size 等信息。接着,封装以 block 作为参数的 AddBlock(blockInfo) 消息并发送给 ReceiverTracker 以通知其有新增 block 数据块。

//调用 receivedBlockHandler.storeBlock 方法存储 block,并得到一个 blockStoreResult
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//使用blockStoreResult初始化一个ReceivedBlockInfo实例
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//发送消息通知 ReceiverTracker 新增并存储了 block
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

ReceiverTracker再把这些信息转发给ReceivedBlockTracker,由其负责管理收到数据块元信息。

private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

step3 处理流数据

JobScheduler中有两个主要的成员,一个是上文提到的ReceiverTracker,另一个则是JobGenerator 。在JobScheduler启动时,会创建一个新的 JobGenerator 实例 jobGenerator,并调用其start() 方法。在 JobGenerator 的主构造函数中,会创建一个定时器:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

定时器中定义了批处理时间间隔ssc.graph.batchDuration.milliseconds。每当批处理时间到来时,会执行一次eventLoop.post(GenerateJobs(new Time(longTime)))方法来向 eventLoop 发送 GenerateJobs(new Time(longTime))消息,eventLoop收到消息后会基于当前batch内的数据进行Job的生成及提交执行。

private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
    // allocate received blocks to batch
    jobScheduler.receiverTracker.allocateBlocksToBatch(time)
    // generate jobs using allocated block
    graph.generateJobs(time)
} match {
    case Success(jobs) =>
    val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
    jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
    jobScheduler.reportError("Error generating jobs for time " + time, e)
    PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
  }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
 }

由源码可知,eventLoop 在接收到 GenerateJobs(new Time(longTime))消息后首先调用了allocateBlocksToBatch()方法将已收到的blocks分配给batch。紧接着调用DStreamGraph类中的generateJobs()方法来生成基于该batchJob序列。然后将批处理时间time、作业序列Seq[Job]和本批次数据的源信息包装为JobSet,调用JobScheduler.submitJobSet(JobSet)提交给JobSchedulerJobScheduler将这些作业发送给Spark核心进行处理。

Step4 输出处理结果

由于数据的处理有Spark核心来完成,因此处理的结果会从Spark核心中直接输出至外部系统,如数据库或者文件系统等,同时输出的数据也可以直接被外部系统所使用。由于实时流数据的数据源源不断的流入,Spark会周而复始的进行数据的计算,相应也会持续输出处理结果。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容