基于spark1.6
在sparkContext里会创建 DAGScheduler,DAGScheduler 初始化了一个事件阻塞队列(action的触发,一个action 会封装一个JobSubmitted 类型的事件,放入DAGScheduler事件队列,并启动一个守护线程,从阻塞队列里取出事件对象)
开启了一个守护线程
private val eventThread = new Thread(name) {
setDaemon(true)// 说明是守护线程
override def run(): Unit = {
while (!stopped.get) {
val event = eventQueue.take()//从事件队列中取出事件
onReceive(event)//抽象方法调用子类的实现,调用子DAGSchedulerEventProcessLoop重写onRecive的方法
}
}
}
//如果是任务提交事件,它就会调用dagScheduler的handleJobSubmintted方法来提交任务
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)
}
到此DAGScheduler的调度队列会一直挂起,不断轮询事件队列中的任务,为什么要开辟线程来执行消息的读、取?这样可以异步处理多Job。