spark提交流程-源码分析

org.apache.spark.deploy.SparkSubmit
-main

  -submit.doSubmit(args)
     - SparkSubmitArguments.parseArguments(args) 
       - - SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
        -  submit(args: SparkSubmitArguments, uninitLog: Boolean)
            -runMain(args: SparkSubmitArguments, uninitLog: Boolean)
            - new JavaMainApplication(mainClass)
              -app.start(childArgs.toArray, sparkConf)
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
  "org.apache.spark.deploy.yarn.YarnClusterApplication"

  // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
  if (isYarnCluster) {
    childMainClass = YARN_CLUSTER_SUBMIT_CLASS
   //mainClass 是SparkApplication类型,构建SparkApplication
   val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }

-YarnClusterApplication.start
//ClientArguments传递这spark submit的提交单数
- new Client(new ClientArguments(args), conf, null).run()
-Client.submitApplication()
private val yarnClient = YarnClient.createYarnClient
//用于与rm通信
protected ApplicationClientProtocol rmClient;

//获取appid
protected ApplicationClientProtocol rmClient;
//穿件容器上下文环境
val containerContext = createContainerLaunchContext(newAppResponse)
containerContext 上下文包含 运行ApplicationMaster的的classname 等启动参数

 val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

//创建提交app的上下文环境
val appContext = createApplicationSubmissionContext(newApp, containerContext)

-yarnClient.submitApplication(appContext)
-rmClient.submitApplication(request);

搜索”org.apache.spark.deploy.yarn.ApplicationMaster“
-org.apache.spark.deploy.yarn.ApplicationMaster.main
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
ApplicationMaster成员
private val client = new YarnRMClient()
//用于am和rm通信
-amClient = AMRMClient.createAMRMClient()
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
-master.run()
如果是集群模式,运行Driver

 if (isClusterMode) {
        runDriver()
      } else {
        runExecutorLauncher()
      }
 private def runDriver(): Unit = {
   addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
   userClassThread = startUserApplication()
  ... ...
   try {
    //等待startUserApplication的中driver线程将用户代码sparkContext的创建完成,否则一直阻塞在这里
     val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
       Duration(totalWaitTime, TimeUnit.MILLISECONDS))
     //sc不为空
     if (sc != null) {
       val rpcEnv = sc.env.rpcEnv

       val userConf = sc.getConf
       val host = userConf.get(DRIVER_HOST_ADDRESS)
       val port = userConf.get(DRIVER_PORT)
      //注册AM,申请西苑
       registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

       val driverRef = rpcEnv.setupEndpointRef(
         RpcAddress(host, port),
         YarnSchedulerBackend.ENDPOINT_NAME)
     //创建分配器,返回资源可用列表
       createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
     } else {
       // Sanity check; should never happen in normal operation, since sc should only be null
       // if the user app did not create a SparkContext.
       throw new IllegalStateException("User did not initialize spark context!")
     }
     //当资源准备就绪,调用resumeDriver方法,改变状态,让driver线程继续执行(用户代码逻辑)
     resumeDriver()
    userClassThread执行完之后,rundriver方法再继续执行
     userClassThread.join()
   } catch {
    ... ...
   } finally {
     resumeDriver()
   }
 }

userClassThread = startUserApplication()

 private def startUserApplication(): Thread = {
    logInfo("Starting the user application in a separate Thread")

 ... ...

    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

    val userThread = new Thread {
      override def run(): Unit = {
        try {
          if (!Modifier.isStatic(mainMethod.getModifiers)) {
            logError(s"Could not find static main method in object ${args.userClass}")
            finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
          } else {
            //静态方法,也就是用户的用户编写Job Object 的main方法,调用用户代码
            mainMethod.invoke(null, userArgs.toArray)
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
            logDebug("Done running user class")
          }
        } catch {
          ......
        } ... ...
      }
    }
    userThread.setContextClassLoader(userClassLoader)
   //设置drive线程
    userThread.setName("Driver")
//设置drive线程启动
    userThread.start()
    userThread
  }

创建SparkContext
val sc = new SparkContext(sparConf)
SparkContext进行sc初始化的时候,有一段代码如下

// Post init
   _taskScheduler.postStartHook()
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
...
  override def postStartHook(): Unit = {
//sparkContextInitialized干两个事儿
 //SparkContext 初始化后,唤醒runDriver 方法继续执行
  //暂停driver线程(user thread),为了让在runDriver函数中进行初始化
    ApplicationMaster.sparkContextInitialized(sc)

    super.postStartHook()
    logInfo("YarnClusterScheduler.postStartHook done")
  }

}
private def sparkContextInitialized(sc: SparkContext) = {
   sparkContextPromise.synchronized {
     //SparkContext 初始化后,唤醒runDriver 方法继续执行
     // Notify runDriver function that SparkContext is available
     sparkContextPromise.success(sc)
     //暂停driver线程(user thread),为了让在runDriver函数中进行初始化
     // Pause the user class thread in order to make proper initialization in runDriver function.
     sparkContextPromise.wait()
   }
 }

-TaskSchedulerImpl.postStartHook

TaskSchedulerImpl.waitBackendReady
private def waitBackendReady(): Unit = {
    if (backend.isReady) {
      return
    }
   //循环等待知道资源就绪,此时用户代码不会往下执行
   //那什么时候driver线程会继续执行?
//当rundrive方法调用resumeDriver,改变backend 状态,代表资源就绪
    while (!backend.isReady) {
      // Might take a while for backend to be ready if it is waiting on resources.
      if (sc.stopped.get) {
        // For example: the master removes the application for some reason
        throw new IllegalStateException("Spark context stopped while waiting for backend")
      }
      synchronized {
        this.wait(100)
      }
    }
  }

runDriver方法的registerAM
private val client = new YarnRMClient()
client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

amClient = AMRMClient.createAMRMClient()
通过AMRMClient向RM注册
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

  private def createAllocator(
      driverRef: RpcEndpointRef,
      _sparkConf: SparkConf,
      rpcEnv: RpcEnv,
      appAttemptId: ApplicationAttemptId,
      distCacheConf: SparkConf): Unit = {
    ... ...

    val appId = appAttemptId.getApplicationId().toString()
    val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val localResources = prepareLocalResources(distCacheConf)
      ... ...
    //创建分配器
    allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      appAttemptId,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)

   ...  ...
//获取可用资源列表
    allocator.allocateResources()
    val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,
      sparkConf, securityMgr)
    val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
    ms.registerSource(new ApplicationMasterSource(prefix, allocator))
    // do not register static sources in this case as per SPARK-25277
    ms.start(false)
    metricsSystem = Some(ms)
    reporterThread = launchReporterThread()
  }

//处理可用于分配的容器
handleAllocatedContainers(allocatedContainers.asScala.toSeq)

 def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

    // Match incoming requests by host
    val remainingAfterHostMatches = new ArrayBuffer[Container]
   //可分配容器 分类整理按主机名和机架(首选位置的应用)
    for (allocatedContainer <- allocatedContainers) {
      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
        containersToUse, remainingAfterHostMatches)
    }
... ...
    val remainingAfterRackMatches = new ArrayBuffer[Container]
    if (remainingAfterHostMatches.nonEmpty) {
      var exception: Option[Throwable] = None
      val thread = new Thread("spark-rack-resolver") {
        override def run(): Unit = {
          try {
            for (allocatedContainer <- remainingAfterHostMatches) {
              val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
              matchContainerToRequest(allocatedContainer, rack, containersToUse,
                remainingAfterRackMatches)
            }
          } catch {
            case e: Throwable =>
         ... ...
    }
    //运行已分配容器进行
    runAllocatedContainers(containersToUse)

    logInfo("Received %d containers from YARN, launching executors on %d of them."
      .format(allocatedContainers.size, containersToUse.size))
  }

runAllocatedContainers 启动容器

 private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
    for (container <- containersToUse) {
      val rpId = getResourceProfileIdFromPriority(container.getPriority)
      executorIdCounter += 1
      val executorHostname = container.getNodeId.getHost
      val containerId = container.getId
      val executorId = executorIdCounter.toString
      val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
      assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
      logInfo(s"Launching container $containerId on host $executorHostname " +
        s"for executor with ID $executorId for ResourceProfile Id $rpId")
      ... ...
          TargetNum容器大于Running的容器数,说明还需要启动容器
      if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
        getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
        if (launchContainers) {
启动线程池,启动容器
          launcherPool.execute(() => {
            try {
              new ExecutorRunnable(
                Some(container),
                conf,
                sparkConf,
                driverUrl,
                executorId,
                executorHostname,
                containerMem,
                containerCores,
                appAttemptId.getApplicationId.toString,
                securityMgr,
                localResources,
                rp.id
              ).run()
              updateInternalState()
            } catch {
              ... ...
            }
          })
        } else {
          // For test only
          updateInternalState()
        }
      ... ...
    }
  }

ExecutorRunnable.run

//
nmClient = NMClient.createNMClient()
    nmClient.init(conf)
    nmClient.start()
   //通过nmClient 通知指定nm启动contanter
    startContainer()

startContainer

def startContainer(): java.util.Map[String, ByteBuffer] = {
   ... ...
   //prepareCommand 是准备启动容器进程的脚本
  //启动org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 进程(excutor 的通信后台)
    val commands = prepareCommand()

    ctx.setCommands(commands.asJava)
    ctx.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)

     ...  ....

    // Send the start request to the ContainerManager
    try {
      //启动container,携带启动容器的上下文ctx
      nmClient.startContainer(container.get, ctx)
    } catch {
      ... ...
    }
  }

org.apache.spark.executor.YarnCoarseGrainedExecutorBackend

-CoarseGrainedExecutorBackend.run

def run(
     arguments: Arguments,
     backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
       CoarseGrainedExecutorBackend): Unit = {

   ... ...
//可以找到driver 与driver通信
     val fetcher = RpcEnv.create(
       "driverPropsFetcher",
       arguments.bindAddress,
       arguments.hostname,
       -1,
       executorConf,
       new SecurityManager(executorConf),
       numUsableCores = 0,
       clientMode = true)

     var driver: RpcEndpointRef = null
     val nTries = 3
     for (i <- 0 until nTries if driver == null) {
       try {
         driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
       } catch {
         case e: Throwable => if (i == nTries - 1) {
           throw e
         }
       }
     }
     //创建Executor的evn环境
     val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
    //设置rpcEnv环境的通信终端
backendCreateFn 其实就是YarnCoarseGrainedExecutorBackend
     env.rpcEnv.setupEndpoint("Executor",
       backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
     arguments.workerUrl.foreach { url =>
       env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
     }
     env.rpcEnv.awaitTermination()
   }
 }

//消息循环器
var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
             //匹配消息循环器类型
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }

-DedicatedMessageLoop
//收件箱
private val inbox = new Inbox(name, endpoint)

-Inbox
protected val messages = new java.util.LinkedListInboxMessage
// OnStart should be the first message to process
//放入一个OnStart消息
inbox.synchronized {
messages.add(OnStart)
}

//RpcEndpoint的生命周期
 * {@code constructor -> onStart -> receive* -> onStop}

private[spark] trait RpcEndpoint {

CoarseGrainedExecutorBackend.onStart

override def onStart(): Unit = {
  ... ...

    logInfo("Connecting to driver: " + driverUrl)
    try {
      _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      //获取driver
      driver = Some(ref)
     //像driver发送RegisterExecutor消息
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id))
    }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

driver是一个线程,所有是SparkContext接收消息
_schedulerBackend是driver通信后台
private var _schedulerBackend: SchedulerBackend = _
//集群模式的SchedulerBackend
CoarseGrainedSchedulerBackend
//消息回复
CoarseGrainedSchedulerBackend.receiveAndReply

匹配RegisterExecutor消息
     case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
          attributes, resources, resourceProfileId) =>
总的核数 注册数量增加
    totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
//最后回复一个true表示注册成功
   context.reply(true)

CoarseGrainedExecutorBackend.收到消息

case Success(_) =>
   //给自己发送一条消息,表示注册完毕
       self.send(RegisteredExecutor)

CoarseGrainedExecutorBackend.收到给自己发送的RegisteredExecutor消息

override def receive: PartialFunction[Any, Unit] = {
  case RegisteredExecutor =>
    logInfo("Successfully registered with driver")
    try {
    //创建Executor计算对象,区别CoarseGrainedExecutorBackend(通信对象)
      executor = new (executorId, hostname, env, userClassPath, isLocal = false,
        resources = _resources)
   //给driver发送LaunchedExecutor 消息
      driver.get.send(LaunchedExecutor(executorId))

driver端的CoarseGrainedSchedulerBackend receive到
LaunchedExecutor消息

  case LaunchedExecutor(executorId) =>
      //增加核数
        executorDataMap.get(executorId).foreach { data =>
          data.freeCores = data.totalCores
        }
   //做一些操作 //tode
        makeOffers(executorId)
      case e =>
        logError(s"Received unexpected message. ${e}")
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容