继续上篇,带着疑问,继续扒源码。
分析submit过程
针对python
python 提交任务的入口是:
bin/pyspark
扒开看下,实际作用语句是:
exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
跟看/bin/spark-submit
找到执行语句:
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
好的。这里看到的类是:
org.apache.spark.deploy.SparkSubmit
找到 org.apache.spark.deploy.SparkSubmit 类的main函数
/******
* file: /core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
*******/
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
进入函数:submit 。
submit 中会有这样的调用链
submit -> doRunMain -> runMain -> childMainClass.main()
/******
* file: /core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
*******/
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
// 略...
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
// 略...
}
//...
doRunMain()
//...
}
这里childMainClass及mainclass 由 prepareSubmitEnvironment(args) 构造生成
/******
* file: /core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
*******/
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
...
// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
...
args.mainClass = "org.apache.spark.deploy.PythonRunner"
...
}
}
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
}
...
}
看prepareSubmitEnvironment(args) ,当我们使用
isStandaloneCluster 时这里childMainClass 为
/******
* file: /core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
*******/
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
args.mainClass = "org.apache.spark.deploy.PythonRunner"
//mainCalss是在rest调用后辅助导致入我们写的driver包并执行的类。在服务端调用中会看到它的使用。
所以跟入 "org.apache.spark.deploy.rest.RestSubmissionClient"
找到RestSubmissionClient 的main函数。
在RestSubmissionClient中会将用户的class及参数封装在submitRequest 中,并调用 client.createSubmission(submitRequest) 进行post操作。将请求post到master中。
url为:
http://$masterUrl/$PROTOCOL_VERSION/submissions/create
到这里客户端操作结束。
下面要看服务端反应。
服务端
上一章master开启了6066端口 REST server。使用的类为:
org.apache.spark.deploy.rest.StandaloneRestServer
那么我们从这个类进入到master看一下submit后master干了啥。
下面是master 的rest server中。
/******
* file: /core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
*/
private[deploy] class StandaloneRestServer(
host: String,
requestedPort: Int,
masterConf: SparkConf,
masterEndpoint: RpcEndpointRef,
masterUrl: String)
extends RestSubmissionServer(host, requestedPort, masterConf)
StandaloneRestServer extends自 RestSubmissionServer
进入父类:RestSubmissionServer
/******
* file: /core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
*/
....
protected lazy val contextToServlet = Map[String, RestServlet](
s"$baseContext/create/*" -> submitRequestServlet,
s"$baseContext/kill/*" -> killRequestServlet,
s"$baseContext/status/*" -> statusRequestServlet,
"/*" -> new ErrorServlet // default handler
)
...
//启动函数。
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(host, startPort))
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val mainHandler = new ServletContextHandler
mainHandler.setContextPath("/")
//这里遍历一下contextToServlet,将路由路径与处理对象servlet进行绑定
contextToServlet.foreach { case (prefix, servlet) =>
mainHandler.addServlet(new ServletHolder(servlet), prefix)
}
server.setHandler(mainHandler)
server.start()
val boundPort = server.getConnectors()(0).getLocalPort
(server, boundPort)
}
上一节中,客户端调用的是/create路径这,这里create对应的servlet是submitRequestServlet
submitRequestServlet中看到doPost方法。
protected override def doPost(
requestServlet: HttpServletRequest,
responseServlet: HttpServletResponse): Unit = {
....
//这里调用了handleSubmit方法。
handleSubmit(requestMessageJson, requestMessage, responseServlet)
....
sendResponse(responseMessage, responseServlet)
回到子类:StandaloneRestServer中,找到重写的handleSubmit
/******
* file: /core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
*/
protected override def handleSubmit(
requestMessageJson: String,
requestMessage: SubmitRestProtocolMessage,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
//关键点1:生成driverDescription
val driverDescription = buildDriverDescription(submitRequest)
//关键点2:master 的 rpc调用。
val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription))
val submitResponse = new CreateSubmissionResponse
submitResponse.serverSparkVersion = sparkVersion
submitResponse.message = response.message
submitResponse.success = response.success
submitResponse.submissionId = response.driverId.orNull
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
submitResponse.unknownFields = unknownFields
}
submitResponse
...略过...
}
}
}
看下buildDriverDescription:
/******
* file: /core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
*/
private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = {
//略...
val mainClass = Option(request.mainClass).getOrElse {
throw new SubmitRestMissingFieldException("Main class is missing.")
}
//初始化driver的memonr数,cores数,java选项,包路径,类路径等环境信息
//略...
//关键点构建运行命令
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args
//...略...
new DriverDescription(
appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
}
这里command 由org.apache.spark.deploy.worker.DriverWrapper 调用 mainClass
mainClass 在client进行rest请求时传入的参数。为** "org.apache.spark.deploy.PythonRunner"**
好回到handleSubmit
val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription))
这里进行了远程rpc调用。调用了master的RequestSubmitDriver接口。
还记得上一章中。master启来时启动了NettyRpc吗。
好,那我们跟进入到master的NettyRpc中。
下面进入到master的 rpc server中
进入master类中。找到RequestSubmitDriver接口
/******
* file: /core/src/main/scala/org/apache/spark/deploy/master/Master.scala
*/
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
//...
} else {
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
}
在上一章中,我们分析了这个函数里面的RegisterWorker。这里我们分析RequestSubmitDriver
这里主要动作是创建一个driver,然后使用schedule将新driver派发到合适的worker中去执行。
到这里找到driver的来源了。上节留的问题答了一半
继续上节看了schedule,这里继续再看一遍。
跟到driver中去。
/******
* file: /core/src/main/scala/org/apache/spark/deploy/master/Master.scala
*/
private def schedule(): Unit = {
...
launchDriver(worker, driver)
...
}
这里选出worker后使用lauchDriver进行远程调用。
/******
* file: /core/src/main/scala/org/apache/spark/deploy/master/Master.scala
*/
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
调用了worker的LaunchDriver函数,将driver.id及driver.desc作为参数。
下面进入到worker中
/******
* file: /core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
*/
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
systemName: String,
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
...
override def receive: PartialFunction[Any, Unit] = synchronized {
...
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}
new 一个DriverRunner 并且调用start()启动。
看一下start()
private[worker] def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
try {
val driverDir = createWorkingDirectory()
//下载用户代码
val localJarFilename = downloadUserJar(driverDir)
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
//创建进行并执行用户代码
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
...
worker.send(DriverStateChanged(driverId, state, finalException))
}
}.start()
}
这里主要做的是创建一个线程,用于下载用户的代码,然后创建一个新的进程执行用户的代码,并在对标准输入输出进行重定向处理。最后线程通过rpc调用通知一下worker 状态的变更。
到这里基本完成了derver的加载过程了。
用户代码加载
上面derver启动进程使用command 由org.apache.spark.deploy.worker.DriverWrapper 调用 mainClass 即:"org.apache.spark.deploy.PythonRunner"
我们看一下DriverWrapper
/******
* file: /core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
*/
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
/*
+ IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both
+ backward and forward compatible across future Spark versions. Because this gateway
+ uses this class to launch the driver, the ordering and semantics of the arguments
+ here must also remain consistent across versions.
*/
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))
val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
val loader =
if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
} else {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
}
Thread.currentThread.setContextClassLoader(loader)
// Delegate to supplied main class
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
rpcEnv.shutdown()
case _ =>
// scalastyle:off println
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
// scalastyle:on println
System.exit(-1)
}
}
}
这里主要是设置了一下环境变量,然后通过反射调用了mainClass中的main函数。
进入mainClass:org.apache.spark.deploy.PythonRunner
/******
* file: /core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
*/
object PythonRunner {
def main(args: Array[String]) {
//参数读取,配置环境变量
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
val pythonExec =
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = formatPaths(pyFiles)
//启用Py4J gateway server 等待python进程连接
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
thread.join()
...
// 启动进程,启动用户代码。
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
try {
val process = builder.start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkUserAppException(exitCode)
}
} finally {
gatewayServer.shutdown()
}
}
用户代码启动
我们使用examples的pi.py进行中入。
/******
* file: python/pyspark/context.py
*/
import sys
from random import random
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonPi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
sc.stop()
看第一个语句。
sc = SparkContext(appName="PythonPi")
这里面干的事还挺多的。
看context.py,SparkContext类的init函数。
这里面调用了_do_init()
def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf, jsc, profiler_cls):
...//各种环境变量配置,参数配置
# Create the Java SparkContext through Py4J
self._jsc = jsc or self._initialize_context(self._conf._jconf)
_initialize_context上调用了java的JavaSparkContext
def _initialize_context(self, jconf):
"""
Initialize SparkContext in function to allow subclass specific initialization
"""
return self._jvm.JavaSparkContext(jconf)
进入javacontext
/******
* file: /core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
*/
class JavaSparkContext(val sc: SparkContext)
extends JavaSparkContextVarargsWorkaround with Closeable {
...
/**
+ @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
*/
def this(conf: SparkConf) = this(new SparkContext(conf))
调用SparkContext
这
/******
* file: /core/src/main/scala/org/apache/spark/SparkContext.scala
*/
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
...
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)//关键
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)//终于看到关键的DAG了。
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
...
}
先看一下createTaskScheduler
/******
* file: /core/src/main/scala/org/apache/spark/SparkContext.scala
*/
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
val MAX_LOCAL_TASK_FAILURES = 1
master match {
...
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)//初始化scheduler中的backend.
(backend, scheduler)
...
}
scheduler 是 TaskSchedulerImpl
backend 是 SparkDeploySchedulerBackend
def initialize(backend: SchedulerBackend) {
this.backend = backend //视频backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools() //创建
}
上面调用了_taskScheduler.start()
这里看一下怎么样start的。
override def start() {
backend.start()// backend start
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
看下backend做了什么。
/******
* file: /core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
*/
override def start() {
super.start()
launcherBackend.connect()
...
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
这里新建appclient。并使用rpc向master注册appclient
看它怎么处理的。跟入client.start()
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
注册了rpc AppClient.
看ClientEndpoint 注册时的启动函数:onStart()
/******
* file: /core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
*/
override def onStart(): Unit = {
registerWithMaster(1)
}
跟入 registerWithMaster(1)
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
这里设置注册,定时处理检测注册状态,重试注册,超时处理。
/******
* file: /core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
*/
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
向master 调用 RegisterApplication 。
那么我们看一下Master.scala里面。对于注册函数是怎么处理的。
/******
* file: /core/src/main/scala/org/apache/spark/deploy/master/Master.scala
*/
case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
}
看一下registerApplication(app)
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
}
找到waitingApps += app了。
**到这里上一章中的driver与app都找到了。
driver.send(RegisteredApplication(app.id, self))
向driver发送RegisteredApplication调用。
接着进行一次schedule
对无人接手的app进行重新分配。
我们看一下driver里面的RegisteredApplication做了什么。
/******
* file: /core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
*/
case RegisteredApplication(appId_, masterRef) =>
// FIXME How to handle the following cases?
// 1. A master receives multiple registrations and sends back multiple
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
更新AppClient的状态。
回到SparkDeploySchedulerBackend.scala中的start函数。
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
生成client时,将SparkDeploySchedulerBackend的指针this作为listener传给了AppClient
AppClient在RegisteredApplication中调用了listener.connected(appId.get)
因此
backend的.connected被调用:
override def connected(appId: String) {
logInfo("Connected to Spark cluster with app ID " + appId)
this.appId = appId
notifyContext()
launcherBackend.setAppId(appId)
}
其中notifyContext() 将
registrationBarrier.release()
于是SparkDeploySchedulerBackend.scala中的start函数中
waitForRegistration()
堵塞结束,start结束。
这里app的注册结束。
未完待续.....
下节小结一下画个小图。然后看一下用户程序的加载及执行过程。