spark 内核源码剖析九:Executor原理

CoarseGrainedExecutorBackend:worker中为Applicaiton启动的executor,实际上启动了这个CoarseGrainedExecutorBackend进程

org.apache.spark.executor.CoarseGrainedExecutorBackend#preStart
org.apache.spark.executor.CoarseGrainedExecutorBackend#receiveWithLogging

//actor初始化
  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
//获取driver的actor
    driver = context.actorSelection(driverUrl)
//向driver发送RegisterExecutor消息(下图第一步)
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

  override def receiveWithLogging = {
//driver注册executor成功之后,会发送回来RegisteredExecutor消息
//此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄(下图第二步)
//其实,CoarseGrainedExecutorBackend的大部分功能都是通过Executor实现的
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

    case RegisterExecutorFailed(message) =>
      logError("Slave registration failed: " + message)
      System.exit(1)
//driver内的taskScheduler向CoarseGrainedExecutorBackend发送LauncheTask消息,启动task(下图第三步)
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
//反序列化task
        val ser = env.closureSerializer.newInstance()
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
//用内部的执行句柄executor的launchTask()方法,来启动搞一个task(下图第四步)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

    case KillTask(taskId, _, interruptThread) =>
      if (executor == null) {
        logError("Received KillTask command but executor was null")
        System.exit(1)
      } else {
        executor.killTask(taskId, interruptThread)
      }

    case x: DisassociatedEvent =>
      if (x.remoteAddress == driver.anchorPath.address) {
        logError(s"Driver $x disassociated! Shutting down.")
        System.exit(1)
      } else {
        logWarning(s"Received irrelevant DisassociatedEvent $x")
      }

    case StopExecutor =>
      logInfo("Driver commanded a shutdown")
      executor.stop()
      context.stop(self)
      context.system.shutdown()
  }

org.apache.spark.executor.Executor#launchTask

 def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
//对于每一个task,都会创建一个TaskRunner
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
//将taskRunner放入内存缓存
    runningTasks.put(taskId, tr)
//Java线程池执行taskRunner(下图第五步)
    threadPool.execute(tr)
  }

image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 我偶有几次做梦,梦中又漫步在月夜下的老哈河边。微风轻拂着我的脸颊,抚摸着我的头发。它掠过平静的河...
    老哈河阅读 404评论 0 0
  • 2018-05-01 赵俊利 应用力量教练部 1、大事难事,看担当;逆境顺境,看胸襟;是喜是怒,看涵养;有舍有得...
    远古鲶鱼阅读 371评论 0 1
  • 金晓钰,是五莲县实验小学六年级二班的一名学生,在班级中担任副班长一职,她活泼开朗,爱好广泛,喜欢书法、绘画、读书、...
    笑盈子阅读 1,279评论 4 11