贯通Spark Streaming JobScheduler内幕实现和深入思考

       JobScheduler的地位非常的重要,所有的关键都在JobScheduler,它的重要性就相当于是Spark Core当中的DAGScheduler,因此,我们要花重点在JobScheduler上面。

       我们在进行sparkstreaming开发的时候,会对Dstream进行各种transform和action级别的操作,这些操作就构成Dstream graph,也就是Dstream 之间的依赖关系,随着时间的流逝,Dstream graph会根据batchintaval时间间隔,产生RDD的DAG,然后进行job的执行。Dstream 的Dstream graph是逻辑级别的,RDD的DAG是物理执行级别的。DStream是空间维度的层面,空间维度加上时间构成时空维度。

       JobSchedule是将逻辑级别的job物理的运行在spark core上。JobGenerator是产生逻辑级别的job,使用JobSchedule将job在线程池中运行。JobSchedule是在StreamingContext中进行实例化的,并在StreamingContext的start方法中开辟一条新的线程启动的。

// Start the streaming scheduler in a new thread, so that thread local properties

// like call sites and job groups can be reset without affecting those of the

// current thread.ThreadUtils.runInNewThread("streaming-start") {

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")

scheduler.start()

}

      1.大括号中的代码作为一个匿名函数在新的线程中执行。Sparkstreaming运行时至少需要两条线程,其中一条用于一直循环接收数据,现在所说的至少两条线程和上边开辟一条新线程运行scheduler.start()并没有关系。Sparkstreaming运行时至少需要两条线程是用于作业处理的,上边的代码开辟新的线程是在调度层面的中,不论Sparkstreaming程序运行时指定多少线程,这里都会开辟一条新线程,之间没有一点关系。

      2.每一条线程都有自己私有的属性,在这里给新的线程设置私有的属性,这些属性不会影响主线程中的。

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")

      源码中代码的书写模式非常值得学习,以后看源码的时候就把它当做是一个普通的应用程序,从jvm的角度看,spark就是一个分布式的应用程序。不要对源码有代码崇拜情节,不然就没有掌控源码的信心。

JobSchedule在实例化的时候会实例化JobGenerator和线程池。

private valnumConcurrentJobs= ssc.conf.getInt("spark.streaming.concurrentJobs", 1)

private valjobExecutor=

ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

private valjobGenerator=newJobGenerator(this)

      线程池中默认是有一条线程,当然可以在spark配置文件中配置或者使用代码在sparkconf中修改默认的线程数,在一定程度上增加默认线程数可以提高执行job的效率,这也是一个性能调优的方法(尤其是在一个程序中有多个job时)。

      Java在企业生产环境下已经形成了生态系统,在spark开发中和数据库、hbase、radis、javaEE交互一般都采用java,所以开发大型spark项目大部分都是scala+java的方式进行开发。

      JobGenerator和线程池在JobSchedule在实例化的时候就已经实例化了,而eventloop和receiverTracker是在调用JobGenerator的start方法时才被实例化。defstart(): Unit = synchronized {

if(eventLoop!=null)return// scheduler has already been started

eventLoop=newEventLoop[JobSchedulerEvent]("JobScheduler") {

override protected defonReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected defonError(e: Throwable): Unit = reportError("Error in job scheduler", e)

}

eventLoop.start()

receiverTracker=newReceiverTracker(ssc)

receiverTracker.start()

jobGenerator.start()

}

        在eventloop的start方法中会回调onStart方法,一般在onStart方法中会执行一些准备性的代码,在JobSchedule中虽然并没有复写onStart方法,不过sparkStreaming框架在这里显然是为了代码的可扩展性考虑的,这是开发项目时需要学习的。

defstart(): Unit = {

if(stopped.get) {

throw newIllegalStateException(name + " has already been stopped")

}

// Call onStart before starting the event thread to make sure it happens before onReceive

onStart()

eventThread.start()

}

Dstream的action级别的操作转过来还是会调用foreachRDD这个方法,生动的说明在对Dstream操作的时候其实还是对RDD的操作。defprint(num: Int): Unit = ssc.withScope {

defforeachFunc: (RDD[T], Time) => Unit = {

(rdd: RDD[T], time: Time) => {

valfirstNum = rdd.take(num + 1)

// scalastyle:off println

println("-------------------------------------------")

println("Time: " + time)

println("-------------------------------------------")

firstNum.take(num).foreach(println)

if(firstNum.length > num)println("...")

println()

}

}

foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps =false)

}

上边代码中foreachFunc这个方法是对Dstream action级别的方法的进一步封装,增加了如下代码,在运行spark streaming程序时对这些输出很熟悉。

println("-------------------------------------------")

println("Time: " + time)

println("-------------------------------------------")

foreachRDD方法,转过来new ForEachDstream

Apply a function to each RDD in this DStream. This is an output operator, so

* 'this' DStream will be registered as an output stream and therefore materialized.

private defforeachRDD(

foreachFunc: (RDD[T], Time) => Unit,

displayInnerRDDOps: Boolean): Unit = {

newForEachDStream(this,

context.sparkContext.clean(foreachFunc,false), displayInnerRDDOps).register()

}

注释中说的:将这个函数作用于这个Dstream中的每一个RDD,这是一个输出操作,因此这个Dstream会被注册成outputstream,并进行物化。

ForEachDstream中很重要的一个函数generateJob。考虑时间维度和action级别,每个Duration都基于generateJob来生成作业。foreachFunc(rdd, time)//这个方法就是对Dstream最后的操作 ,newJob(time, jobFunc)只是在RDD的基础上,加上时间维度的封装而已。这里的Job只是一个普通的对象,代表了一个spark的计算,调用Job的run方法时,真正的作业就触发了。foreachFunc(rdd, time)中的rdd其实就是通过DstreamGraph中最后一个Dstream来决定的。

override defgenerateJob(time: Time): Option[Job] = {

parent.getOrCompute(time)match{

caseSome(rdd) =>

valjobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {

foreachFunc(rdd, time)

}

Some(newJob(time, jobFunc))

caseNone => None

}

}

Jon是通过ForEachDstream的generateJob来生成的,值得注意的是在Dstream的子类中,只有ForEachDstream重写了generateJob方法。

现在考虑一下ForEachDstream的generateJob方法是谁调用的?当然是JobGenerator。ForEachDstream的generateJob方法是静态的逻辑级别,他如果想要真正运行起来变成物理级别的这时候就需要JobGenerator。

现在就来看看JobGenerator的代码,JobGenerator中有一个定时器timer和消息循环体eventloop,timer会基于batchinteval,一直向eventloop中发送JenerateJobs的消息,进而导致processEvent方法->generateJobs方法的执行。

private valtimer=newRecurringTimer(clock,ssc.graph.batchDuration.milliseconds,

longTime =>eventLoop.post(GenerateJobs(newTime(longTime))), "JobGenerator")

eventLoop =newEventLoop[JobGeneratorEvent]("JobGenerator") {

override protected defonReceive(event: JobGeneratorEvent): Unit = processEvent(event)

override protected defonError(e: Throwable): Unit = {

jobScheduler.reportError("Error in job generator", e)

}

}

generateJobs方法的代码:

private defgenerateJobs(time: Time) {

SparkEnv.set(ssc.env)

Try{

jobScheduler.receiverTracker.allocateBlocksToBatch(time)

graph.generateJobs(time) // generate jobs using allocated block

graph.generateJobs(time)这个方法的代码:

defgenerateJobs(time: Time): Seq[Job] = {

logDebug("Generating jobs for time " + time)

valjobs =this.synchronized {

outputStreams.flatMap { outputStream =>

valjobOption = outputStream.generateJob(time)

jobOption.foreach(_.setCallSite(outputStream.creationSite))

jobOption

}

}

logDebug("Generated " + jobs.length + " jobs for time " + time)

jobs

}

其中的outputStream.generateJob(time)中的outputStream就是前面说ForEachDstream,generateJob(time)方法就是ForEachDstream中的generateJob(time)方法。

这是从时间维度调用空间维度的东西,所以时空结合就转变成物理的执行了。

再来看看JobGenerator的generateJobs方法:

Try{

jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

graph.generateJobs(time) // generate jobs using allocated block

}match{

caseSuccess(jobs) =>

valstreamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

caseFailure(e) =>

jobScheduler.reportError("Error generating jobs for time " + time, e)

}

eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater =false))

基于graph.generateJobs产生job后,会封装成JobSet并提交给JobScheduler,JobSet(time, jobs, streamIdToInputInfos),其中streamIdToInputInfos就是接收的数据的元数据。

JobSet代表了一个batch duration中的一批jobs。就是一个普通对象,包含了未提交的jobs,提交的时间,执行开始和结束时间等信息。

JobSet提交给JobScheduler后,会放入jobSets数据结构中,jobSets.put(jobSet.time, jobSet) ,所以JobScheduler就拥有了每个batch中的jobSet.在线程池中进行执行。

defsubmitJobSet(jobSet: JobSet) {

if(jobSet.jobs.isEmpty) {

logInfo("No jobs added for time " + jobSet.time)

}else{

listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time, jobSet)

jobSet.jobs.foreach(job =>jobExecutor.execute(newJobHandler(job)))

logInfo("Added jobs for time " + jobSet.time)

}

}

在把job放入线程池中时,采用JonHandler进行封装。JonHandler是一个Runable接口的实例。

其中主要的代码就是job.run(),前面说过job.run()调用的就是Dstream的action级别的方法。

在job.run()前后会发送JonStarted和JobCompleted的消息,JobScheduler接收到这两个消息只是记录一下时间,通知一下job要开始执行或者执行完成,并没有过多的操作。

_eventLoop.post(JobStarted(job,clock.getTimeMillis()))

PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

job.run()

}

_eventLoop =eventLoop

if(_eventLoop !=null) {

_eventLoop.post(JobCompleted(job,clock.getTimeMillis()))

}

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容