Spark Streaming(2) - JobScheduler、JobGenerator

本文基于Spark 2.11

1. 前言

Spark Streaming(1)中提到JobScheduler使用JobGenerator可以每隔一段时间根据DStream DAG创建出RDD DAG,并提交job,本文主要介绍JobScheduler的细节。

2. JobScheduler

JobScheduler在StreamingContext调用start时启动,启动序列如下:

StreamingContext#start
       ->JobScheduler#start
             -> ReceiverTracker#start
                   ->JobGenerator#start

JobScheduler有如下成员:

 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
  private val jobGenerator = new JobGenerator(this)

  // These two are created only when scheduler starts.
  // eventLoop not being null means the scheduler has been started and not stopped
  var receiverTracker: ReceiverTracker = null
  // A tracker to track all the input stream information as well as processed record number
  1. jobSets。
    job生成时间到jobs的映射,JobGenerator调用DStreamGraph为持有的每一个DStream DAG生成一个job返回给JobGenerator,JobGenerator将时间以及生成的jobs反馈给Jobscheudler,保存在jobSets里。JobGenerator并没有提交job,job是由JobScheudler提交的。
  2. numConcurrentJobs
    控制同时能运行的job数量。
  3. jobExecutor
    线程池,由numConccurrentJobs控制线程数量,jobExecutor里提交job并等待结果。由于等待结果是一个阻塞操作,所以一个线程同时只能提交一个job
  4. jobGenerator
    JobScheduler委托用来生成job
  5. receiverTracker,JobScheduler启动,接收Receiver上报的数据batch信息。

3. JobGenerator生成job

上面说到JobScheduler委托JobGenerator生成job,
下面是JobGenerator的核心成员:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

  
  private var eventLoop: EventLoop[JobGeneratorEvent] = null

  // last batch whose completion,checkpointing and metadata cleanup has been completed
  1. timer
    定时器,JobGenerator定时生成job,时间间隔batchDuration就是创建StreamingContext是传入的,这个timer每隔timeDuration时间网eventLoop中发送一条生成job的消息。
  2. eventLoop
    一直运行,接收消息,做出处理。接受的消息类型有:
  • GenerateJobs, 使用DSteamGraph生成job
  • DoCheckpoint,提交新的job去做checkpoint
  • ClearCheckpointData,DoCheckpoint都是在job完成后清楚信息的

生成job
timer定时器每隔batchDuration往eventLoop发送GenerateJob事件生成job,下面是eventLoop时间主循环中处理GenerateJob事件调用如下:

eventLoop#processEvent
   --> jobGenerator#generateJobs

下面是JobGenerator的generateJobs

 private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        //将jobs反馈给JobScheudler,等待调度
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
  1. receiverTracker.allocateBlocksToBatch(time)根据当前时间time,从已经汇报的数据中生成数据块,后续根据DStream生成RDD的数据就是根据time检索到本次生成的数据块
  2. graph.generateJobs生成jobs
  3. jobScheduler.submitJobSet,反馈给Jobscheudler等待人物调度
  4. eventLoop.post,创建job做checkpoint

第二步创建中创建job有如下调用序列:

DStreamGraph#generateJobs
    ->DStream#generateJob

//DStream#generateJob
private[streaming] def generateJob(time: Time): Option[Job] = {
    // 将DStream转换成RDD
    getOrCompute(time) match {
      case Some(rdd) =>
        // 此处创建了函数,函数里基于当前RDD提交了job
        // JobScheduler在jobExecutor线程池中调度job时,该函数会执行
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

使用time,以及一个jobFunc的函数创建Job,jobFunc在调度时执行。

4. JobScheduler调度job

3中面提到JobGenerator生成jobs并将生成的job反馈给JobScheduler,2中说到到JobScheduler使用jobExecutor调度job

下面是JobScheduler的submitJobSet方法:

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

上面代码中jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))对JobGenerator传递过来的每一个job包装成JobHandler,然后在jobExecutor线程池中调度执行。

JobHandler实现了Runnable接口,是的能在线程池中运行,它的run方法如下:

 def run() {
      val oldProps = ssc.sparkContext.getLocalProperties
      try {
        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }

调用Job#run方法,run方法中执行jobFunc完成job的提交。

job 并行度的控制
JobScheduler的成员numConcurrentJobs控制同时能有多少stream job在运行,numConcurrentJobs通过spark.streaming.concurrentJobs配置项获取,默认为1. numCOncurrentJobs控制jobExecutor线程池中线程的数量从而实现控制同时运行的JobHandler数量(而一个JobHandler封装一个job)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容