spark-源码-sparkContext DagScheduler

基于spark1.6

在sparkContext里会创建 DAGScheduler,DAGScheduler 初始化了一个事件阻塞队列(action的触发,一个action 会封装一个JobSubmitted 类型的事件,放入DAGScheduler事件队列,并启动一个守护线程,从阻塞队列里取出事件对象)

开启了一个守护线程

private val eventThread = new Thread(name) {

    setDaemon(true)// 说明是守护线程

override def run(): Unit = {

  while (!stopped.get) {

          val event = eventQueue.take()//从事件队列中取出事件

          onReceive(event)//抽象方法调用子类的实现,调用子DAGSchedulerEventProcessLoop重写onRecive的方法

          }

      }

  }

//如果是任务提交事件,它就会调用dagScheduler的handleJobSubmintted方法来提交任务

      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,  listener, properties)

  }

到此DAGScheduler的调度队列会一直挂起,不断轮询事件队列中的任务,为什么要开辟线程来执行消息的读、取?这样可以异步处理多Job。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容