本文基于Spark 2.11
1. 前言
Spark Streaming(1)中提到JobScheduler使用JobGenerator可以每隔一段时间根据DStream DAG创建出RDD DAG,并提交job,本文主要介绍JobScheduler的细节。
2. JobScheduler
JobScheduler在StreamingContext调用start时启动,启动序列如下:
StreamingContext#start
->JobScheduler#start
-> ReceiverTracker#start
->JobGenerator#start
JobScheduler有如下成员:
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private val jobGenerator = new JobGenerator(this)
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
- jobSets。
job生成时间到jobs的映射,JobGenerator调用DStreamGraph为持有的每一个DStream DAG生成一个job返回给JobGenerator,JobGenerator将时间以及生成的jobs反馈给Jobscheudler,保存在jobSets里。JobGenerator并没有提交job,job是由JobScheudler提交的。 - numConcurrentJobs
控制同时能运行的job数量。 - jobExecutor
线程池,由numConccurrentJobs控制线程数量,jobExecutor里提交job并等待结果。由于等待结果是一个阻塞操作,所以一个线程同时只能提交一个job - jobGenerator
JobScheduler委托用来生成job - receiverTracker,JobScheduler启动,接收Receiver上报的数据batch信息。
3. JobGenerator生成job
上面说到JobScheduler委托JobGenerator生成job,
下面是JobGenerator的核心成员:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
private var eventLoop: EventLoop[JobGeneratorEvent] = null
// last batch whose completion,checkpointing and metadata cleanup has been completed
- timer
定时器,JobGenerator定时生成job,时间间隔batchDuration就是创建StreamingContext是传入的,这个timer每隔timeDuration时间网eventLoop中发送一条生成job的消息。 - eventLoop
一直运行,接收消息,做出处理。接受的消息类型有:
- GenerateJobs, 使用DSteamGraph生成job
- DoCheckpoint,提交新的job去做checkpoint
- ClearCheckpointData,DoCheckpoint都是在job完成后清楚信息的
生成job
timer定时器每隔batchDuration往eventLoop发送GenerateJob事件生成job,下面是eventLoop时间主循环中处理GenerateJob事件调用如下:
eventLoop#processEvent
--> jobGenerator#generateJobs
下面是JobGenerator的generateJobs
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//将jobs反馈给JobScheudler,等待调度
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
- receiverTracker.allocateBlocksToBatch(time)根据当前时间time,从已经汇报的数据中生成数据块,后续根据DStream生成RDD的数据就是根据time检索到本次生成的数据块
- graph.generateJobs生成jobs
- jobScheduler.submitJobSet,反馈给Jobscheudler等待人物调度
- eventLoop.post,创建job做checkpoint
第二步创建中创建job有如下调用序列:
DStreamGraph#generateJobs
->DStream#generateJob
//DStream#generateJob
private[streaming] def generateJob(time: Time): Option[Job] = {
// 将DStream转换成RDD
getOrCompute(time) match {
case Some(rdd) =>
// 此处创建了函数,函数里基于当前RDD提交了job
// JobScheduler在jobExecutor线程池中调度job时,该函数会执行
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}
使用time,以及一个jobFunc的函数创建Job,jobFunc在调度时执行。
4. JobScheduler调度job
3中面提到JobGenerator生成jobs并将生成的job反馈给JobScheduler,2中说到到JobScheduler使用jobExecutor调度job
下面是JobScheduler的submitJobSet方法:
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
上面代码中jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
对JobGenerator传递过来的每一个job包装成JobHandler,然后在jobExecutor线程池中调度执行。
JobHandler实现了Runnable接口,是的能在线程池中运行,它的run方法如下:
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sparkContext.setLocalProperties(oldProps)
}
}
调用Job#run方法,run方法中执行jobFunc完成job的提交。
job 并行度的控制
JobScheduler的成员numConcurrentJobs控制同时能有多少stream job在运行,numConcurrentJobs通过spark.streaming.concurrentJobs
配置项获取,默认为1. numCOncurrentJobs控制jobExecutor线程池中线程的数量从而实现控制同时运行的JobHandler数量(而一个JobHandler封装一个job)。