42、Spark内核源码深度剖析之Executor原理剖析与源码分析

流程图

Executor原理剖析.png

源码

worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程

Executor注册机制

/**
    * 在actor的初始化方法中
    */
  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    // 获取了driver的executor
    driver = context.actorSelection(driverUrl)
    // 向driver发送RegisterExecutor消息,driver是CoarseGrainedSchedulerBackend的一个内部类
    // driver注册executor成功之后,会发送回来RegisteredExecutor消息
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

driver ! RegisterExecutor()

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
    override protected def log = CoarseGrainedSchedulerBackend.this.log
    private val addressToExecutorId = new HashMap[Address, String]

    override def preStart() {
      // Listen for remote client disconnection events, since they don't go through Akka's watch()
      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

      // Periodically revive offers to allow delay scheduling to work
      val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
      import context.dispatcher
      context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
    }

    def receiveWithLogging = {
      case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorDataMap.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor

          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val (host, _) = Utils.parseHostPort(hostPort)
          val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          makeOffers()
        }

收到RegisteredExecutor消息

override def receiveWithLogging = {
    // driver注册executor成功之后,会发送回来RegisteredExecutor消息
    // 此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄
    // 其实它的大部分功能,都是通过Executor实现的
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

启动task机制

    // 启动task
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        // 反序列化task
        val ser = env.closureSerializer.newInstance()
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        // 用内部的执行句柄,Executor的launchTask()方法来启动一个task
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

executor.launchTask()

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    // 对于每一个task,都会创建一个TaskRunner
    // TaskRunner继承的是Java多线程中的Runnable接口
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    // 将TaskRunner放入内存缓存
    runningTasks.put(taskId, tr)
    // Executor内部有一个Java线程池,这里其实将task封装在一个线程中(TaskRunner),直接将线程丢入线程池,进行执行
    // 线程池是自动实现了排队机制的,也就是说,如果线程池内的线程暂时没有空闲的,那么丢进去的线程都是要排队的
    threadPool.execute(tr)
  }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校...
    达微阅读 629评论 0 0
  • 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所能分配的CPU数...
    miss幸运阅读 3,215评论 3 15
  • Spark提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实...
    Abelsun阅读 2,528评论 0 2
  • Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校...
    三万_chenbing阅读 646评论 0 5
  • 写一篇。成龙叫什么?昨天我昨天在舟山的课程,我讲的成功,马云说的马云说的一句话,我感觉他悟性很高,他说了,成功就是...
    大鹏4455阅读 154评论 0 0