6 Spark Streaming 中Job的动态生成

  1. 本文内容以以Socket数据来源为例,通过WordCount计算来跟踪Job的生成
    代码如下:
    objectNetworkWordCount {
      defmain(args:Array[String]) {
        if (args.length< 2) {
          System.err.println("Usage:NetworkWordCount<hostname> <port>")
          System.exit(1)
        }
        val sparkConf= newSparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
        val ssc = newStreamingContext(sparkConf,Seconds(1))
        val lines= ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
        val words= lines.flatMap(_.split(""))
        val wordCounts= words.map(x => (x,1)).reduceByKey(_+ _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  1. 从ssc.start()开始看,在start方法中调用了scheduler的start()方法,这里的scheduler就是
    JobScheduler,我们看start的代码
def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started
    
    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    // 启动JobScheduler的事件循环器
    eventLoop.start()
    
    // attach rate controllers of input streams to receive batch completion updates
    for { inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)
    
    listenerBus.start(ssc.sparkContext)
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    // 启动ReceiverTracker,数据的接收逻辑从这里开始
    receiverTracker.start()
    // 启动JobGenerator,job的生成从这里开始
    jobGenerator.start()
    logInfo("Started JobScheduler")
}

Spark Streaming由JobScheduler、ReceiverTracker、JobGenerator三大组件组成,其中ReceiverTracker、
JobGenerator包含在JobScheduler中。这里分别执行三大组件的start方法。

  1. 我们先看Job的生成,jobGenerator.start()方法。在JobGenerator的start方法中都做了什么,继续往下看。
    首先启动了一个EventLoop并来回调processEvent方法,那么什么时候会触发回调呢,来看一下EventLoop的内部结构
 private[spark] abstract class EventLoop\[E](name: String) extends Logging {

  //线程安全的阻塞队列
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque\[E]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    //后台线程
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            //回调子类的onReceive方法,就是事件的逻辑代码
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }
  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    // 启动事件循环器
    eventThread.start()
  }

  def stop(): Unit = {
    // stopped.compareAndSet(false, true) 判断是否为false,同时赋值为true
    if (stopped.compareAndSet(false, true)) {
     eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
      } catch {
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          }
      }
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
  }

  def post(event: E): Unit = {
    eventQueue.put(event)
  }

  def isActive: Boolean = eventThread.isAlive

  protected def onStart(): Unit = {}

  protected def onStop(): Unit = {}

  protected def onReceive(event: E): Unit

  protected def onError(e: Throwable): Unit

 }

在EventLoop内部其实是维护了一个队列,开辟了一条后台线程来回调实现类的onReceive方法。
那么是什么时候把事件放入EventLoop的队列中呢,就要找EventLoop的post方法了。在JobGenerator实例化的时
候创建了一个RecurringTimer,代码如下:

 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  // 回调 eventLoop.post(GenerateJobs(new Time(longTime)))将GenerateJobs事件放入事件循环器
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

RecurringTimer就是一个定时器,看一下他的构造参数和内部代码,
* @param clock 时钟
* @param period 间歇时间
* @param callback 回调方法
* @param name 定时器的名称
很清楚的知道根据用户传入的时间间隔,周期性的回调callback方法。Callback就是前面看到的

longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

将GenerateJobs事件提交到EventLoop的队列中,此时RecurringTimer还没有执行。
回到JobGenerator中的start方法向下看,因为是第一次运行,所以调用了startFirstTime方法。
在startFirstTime方法中,有一行关键代码timer.start(startTime.milliseconds),终于看到了定时器的启动

  1. 从定时器的start方法开始往回看,周期性的回调eventLoop.post方法将GenerateJobs事件发送到EvenLoop的队列,然后回调rocessEvent方法,看generateJobs(time)。
    generateJobs代码如下
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      // 获取元数据信息
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      // 提交jobSet
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
进入graph.generateJobs(time) ,调用每一个outputStream的generateJob方法,generateJob代码如下
private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) => {
      // jobRunc中包装了runJob的方法
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    }
    case None => None
  }
} 

getOrCompute返回一个RDD,RDD的生成以后再说,定义了一个函数jobFunc,可以看到函数的作用是提交job,
把jobFunc封装到Job对象然后返回。

  1. 返回的是多个job,jobs生成成功后提交JobSet,代码如下
    jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    然后分别提交每一个job,把job包装到JobHandler(Runnable子类)交给线程池运行,执行JobHandler的run
    方法,调用job.run(),在Job的run方法中就一行,执行Try(func()),这个func()函数就是上面代码中
    的jobFunc,看到这里整个Job的生成与提交就连通了。

  2. 下面附上一张Job动态生成流程图

以上内容如有错误,欢迎指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容