这是本人第一次发表技术帖,借鉴了很多大神的文章和自己的一些拙见,有什么不正确的大家可以指出来,共同进步
Spark底层RPC通信:记住这里是以事件进行驱动的!!!!
三个主要的类:
RpcEndpoint:是一个通信端,例如Spark集群中的Master,或Worker,都是一个RpcEndpoint.
RpcEndpointRef:RPCEndPoint的引用,我们想要和RPCEndpoint通信的话就必须要获得它的引用.
RpcEnv:是RPC通信的框架和环境,有RPC的启动,停止,关闭等方法,它有一个setupEndPoint方法,用来注册一个RPCEndPoint,同时将RpcEndpointRef和RpcEndpoint以键值对的形式存放在线程安全的ConcurrentHashMap里面
spark的启动消息通信
(代码不会精细的分析,只会提一下比较重要的方法,和主要实现的功能)
主要的组件:
- Master
- Worker
- master端:
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
#val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))#
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
在master方法的main方法里面首先就是去调用上面的方法,开始构建通信环境,通过RPCEnv获取的自己的RPCEndpoinRef,这是master的启动
- worker端:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
//通过master的信息生成MasterRpcEndPoint 像master发送注册信息
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
//这里就是发送注册信息的代码
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
在worker端的 tryRegisterAllMasters()方法会尝试向Masters(因为可能会配置HA)进行注册
我们之前提到过要向RpcEndpoint通信必须要获取到它的RpcEndpointRef,所以
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)这个方法就是获取master的RpcEndpointRef接着通过* sendRegisterMessageToMaster(masterEndpoint)这个方法发送消息在spark底层通信大量用到的模式匹配类进行的通信*
之后要是代码里面方法有详细的注释的话就不会单独的拿出来讲了,只是会提一下应该注意的点,同时这里的1,2,3就是代码分析执行的流程,比如接下来就到master端了
3.master端
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
//判断各种状态
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
//发送的是注册失败的消息
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
//把这个worker对象里面的一系列信息保存到自己的内部,返回值是一个Boolean类型的 避免重复注册
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
//像worker返回一个注册成功的消息 这里的self就相当于 自身的RpcEndPoint
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
schedule()
} else {
val workerAddress = worker.endpoint.address
//打印日志信息
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
//发送失败信息
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
这段代码主要就是master端对woker端发送过来的注册消息进行处理的逻辑.registerWorker(worker)这个方法会将worker信息加入到自己内部的列表里面后面会用于集群的任务调度.
4.回到了woker端
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
if (preferConfiguredMasterAddress) {
logInfo("Successfully registered with master " + masterAddress.toSparkURL)
} else {
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
}
//改变注册状态
registered = true
//同时更新自己内部维护的master的信息
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
//这里是自己给自己发送消息,请求向master发送心跳信息
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
//又创建了一个线程 去执行删除目录的任务
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
//执行关于executor的代码
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
//发送一些任务信息
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
上面一些这里暂时不关注的方法给了简单的注释,这里就不再解释了,我们这里关注的是* self.send(SendHeartbeat)*这个方法,我们再往里面走!
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
这里就开始真正的向master发送"心跳"所谓的"心跳机制"就是每隔一段时间(这个时间间隔我们可以自己设置)向master返回自己最新的状态信息,而master端的处理我们看下面的代码
5.master端
case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
//更新最后一次心跳时间
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
//这下面就是打印些错误信息
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
workerInfo.lastHeartbeat = System.currentTimeMillis()这段代码很重要!!它会通过发送过来wokerId去更新之前注册在里面的workInfo信息中的lastHeartbeat, 而这里它是怎么做到检测的呢,原因是master它自己有一个超时检测机制,会在我们设置的时间的间隔内去检测workInfo列表的最后一次心跳时间,要是超过我们设置的时间就代表这个worker已经"挂掉了",这里超时检测机制的代码就不去看了,有兴趣的小伙伴可以自己去看看,在此spark的启动消息通信就已经分析完了,这里只是简单的分析,并且借鉴了很多大神的资料.有问题请小伙伴及时纠正哦
Spark运行时的消息通信
牵扯到的几个主要的RPCEndPoint
- Worker :
Executor :- Master :
下面两者在SparkContext中创建出来的
- DriverEndpoint :负责和executor进行通信 ,真正创建的是CoarseGrainedScheduler的EndPointRef
在创建 CoarseGrainedSchedulerBackend中创建
ENDPOINT_NAME的值是"CoarseGrainedScheduler"
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
- ClientEndpoint : 负责和master进行通信,真正 创建的是StandaloneAppClient的EndPointRef.但是它到Master端的变量名为driver (我也很困惑...为什么这样叫)
在StandaloneAppClient中创建:
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
这块在spark中的命名真的有点懵逼.....
简单的流程
当用户提交程序的时候,SparkContext会向Master发送注册消息接着Master通知Woker启动Executor执行这个App,当executor启动成功就会,executor会向Sparkcontext的DriverEndPoint进行通信,向SparkContext进行注册,接着Rdd的Action算子触发,将创建RDD的DAG,通过DAGSheduler进行stage的划分,接着生成TaskSet,有TaskSheduler像Executor发送执行消息
首先明确一个点: 在我们提交APP的时候SparkContext会做哪些事情?(在这里是将他们进行了初始化,当Action算子的执行的时候他们才会真正的执行,这一点要记住哦!)
- 创建SparkEvn: 用来创建DriverEndpoint和ClientEndpoint.
- 创建TaskScheduler:
- 创建DAGScheduler:
1.SparkContext端:
一:创建SparkEnv:
这里就是创建RPCEnv的
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}
在段代码创建了SparkEnv(在它里面有个属性private[spark] val rpcEnv: RpcEnv),有了它之后我们就可以创建我们上面提到的在SparkContext端的两个RPCEndpoint对象了.
二:创建TaskScheduler和DAGScheduler以及后端调度器
// Create and start the scheduler
//返回一个后端调度器和taskScheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// 开始执行
_taskScheduler.start()
这句代码SparkContext.createTaskScheduler(this, master, deployMode)就是创建TaskSheduler的入口,它会返回一个后端调度器和taskScheduler
_dagScheduler = new DAGScheduler(this)这里是用SparkContext去创建DAGScheduler对象
我们重点关注的是通信所以我们现在要知道怎么创建DriverEndpoint和ClientEndpoint.它们是在后端调度器的启动方法里面会创建这个两个对象
SparkContext.createTaskScheduler(this, master, deployMode)这个方法往里走:
//创建后端调度器,在new后端调度器的同时会去创造DriverEndp和ClientEndPoint
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
再往后端调度器里面看,这个时候就来到了StandaloneSchedulerBackend这个类它继承了"CoarseGrainedSchedulerBackend"在它的start()方法里面调用了父类的start()方法,在CoarseGrainedSchedulerBackend的start方法里面
// TODO (prashant) send conf instead of properties
//创建DriverEndpoint
driverEndpoint = createDriverEndpointRef(properties)
接着在StandaloneSchedulerBackend这个类里面创建StandaloneAppClient的客户端
/**
* 创建StandaloneAppClient的客户端 同时启动
*/
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
创建StandaloneAppClient的客户端所以这里我们的ClientEndpoint是StandaloneApp模式的
三:StandaloneAppClient和Master端进行通信
- SparkContext端standaloneAppClient这个类里面
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
//在线程池里面启动向Master注册的消息
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
这段代码就类似与worker向master注册这里就不再赘述了
- Master端
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//把客户端的信息保存到下来
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
/**
* 这个方法很重要!!!表示要开始资源调度了!!!!!
*/
schedule()
}
master端收到消息后将信息保存下来,同时执行一个很重要的方法schedule,它表示master要开始资源调度了!!!!!!!!!!!我们现在只关注它里面的一个主要的方法startExecutorsOnWorkers()这个方法用来通知Worker启动executor
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
//获取的是任务执行的时候,分配在executor上core数
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
//找出能够执行程序的worker 接着对它们进行排序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
//获得被分配的核数
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// 现在我们已经决定在每个worker上分配多少个内核,让我们来分配它们
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
//通知worker启动executor
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))这个方法里面会去调用一个launchExecutor(worker, exec)方法,同时将 manager上面app的状态改成RUNNING
launchExecutor方法
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
//向wroker端发送请求
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
//向diver端返回消息executor的信息
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
这里主要做了两件事
- 通知Worker启动executor 同时把executor的基本信息发送过去
- 同时将这些基本信息返回给driver端
在Worker主要做了什么事情呢
- 创建工作的目录
- 实例化一个ExecutorRunner
- 在ExecutorRunner中通过comm(它之前在SchedulerdBackend中构建的comm接着通过上述的StandaloneAppClient发送给Master再发送给Worker的)
去创建CoarseGrainedExecutorBackend,它就牛逼了,是运行executor的容器,并且还是负责和driverEndpoint进行通信的.- Worker发送executorStateChanged消息给Master通知创建完毕.
接下来我们来看代码
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// Create the executor's working directory
//创建一个工作目录
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
//这个本地的工作目录会在程序执行完了之后由worker删除
val appLocalDirs = appDirectories.getOrElse(appId, {
val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
val dirs = localRootDirs.flatMap { dir =>
try {
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
Some(appDir.getAbsolutePath())
} catch {
case e: IOException =>
logWarning(s"${e.getMessage}. Ignoring this directory.")
None
}
}.toSeq
if (dirs.isEmpty) {
throw new IOException("No subfolder can be created in " +
s"${localRootDirs.mkString(",")}.")
}
dirs
})
appDirectories(appId) = appLocalDirs
//实例化executorRunner对象
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
//开始启动
manager.start()
coresUsed += cores_
memoryUsed += memory_
//告诉master executor创建完毕
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
manager.start()这是executorRunning里面的start()方法,它里面会去调用executorRunning的fetchAndRunExecutor()方法,所以这才是我们重点需要关注的方法!!!
private def fetchAndRunExecutor() {
try {
// Launch the process
/**每个 ProcessBuilder 实例管理一个进程属性集。
它的start() 方法利用这些属性创建一个新的 Process 实例。
start() 方法可以从同一实例重复调用,以利用相同的或相关的属性创建新的子进程。
**/
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
/**
* 这里是真正的启动,通过上面的buildProcessBuilder 方法将属性集放在里面,
* 现在就可启动coarsegrainedExecutorBackend的main函数了
* 启动构造器,创建CoarsegrainedExecutorBackend实例,这个是executor运行的容器
*/
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// 输出CoarsegrainedExecutorBackend实例运行信息
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
//向wroker发型退出状态请求
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
}
不知道大家伙在这里会不会感到疑惑,我们之前不是说了在里面会创建一个CoarseGrainedExecutorBackend吗?怎么在这里还是没有见到创建它了语法
反正我当时完全懵逼,根本找不到在哪里创建,别人和我说都开始调用了,后面经过查找终于知道了原来所有的一切都在comm这个小东西上面,我们还记得在前面是不是创建StandaloneAppClient是在创建后段调度器(StandaloneSchedulerBackend)里面实现的.comm就是它里面的一个属性
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
这里我们可以发现它把关于CoarseGrainedExecutorBackend的一些基本信息封装进来了,接下来我们回到方法里面* val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf)拿到一个builder,它jdk里面用来创建系统进程的,我们通过把CoarseGrainedExecutorBackend*的一些基本信息封装进行,就能通过它去new CoarseGrainedExecutorBackend 了
在CoarseGrainedExecutorBackend 的启动方法里面会发生注册消息给DriverEndPoint
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
//这里就是向driver进行通信, 把executor的信息送给driver端
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))向SparkContext端发送了一个ack的响应.通过的DriverEndpointRef发送的,还记得我们之前是在什么地方创建的DriverEndpoint吗,没错!就是在CoarseGrainedSchedulerBackend这个类创建的!
回到这个方法:
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
//各种条件判断
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist != null &&
scheduler.nodeBlacklist.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
//向executor端发送注册成功的消息
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
//添加一个监听事件,类型于master对于woker的计时器
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
//开始准备提交任务了!!!!!!!!
makeOffers()
}
这个方法会做什么事情呢?
- 向executor端返回一个注册成功的消息
- 添加一个注册成功的消息
- makeOffers()方法里面调用!!!!launchTasks()!!!!方法,这个方法太重要了,向Executor发送launchTasks消息执行任务
executor端收到注册成功的消息后
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//实例化executor对象,在spark里面它才是真正的执行任务的人
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
主要做的事情:
- executor = new Executor(executorId, hostname, env, userClassPath, isLocalnew了一个executor 它才是真正执行任务的代码
- 在初始化Executor的时候执行startDriverHeartbeater()方法向Driver发送心跳信息 等待Driver端发送消息
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
//向Driver发送心跳
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
}
我们还记得上面的makeOffers()的launchTasks()方法不?哈哈,现在就在executor里面执行launchTask()方法,它里面new TaskRunner(context, taskDescription)的时候处理任务信息,处理完毕后发送StatusUpdate消息给CoarseGrainedExecutorBackend,接着CoarseGrainedExecutorBackend会向DriverEndpoint发送msg消息,代码如下
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
//向Driver端发送executor的消息
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
在Driver端的回收资源,同时给Executor分配新的任务,这些都是在makeOffer方法里面执行的哦!
至此,core的通信简单分析完了.
之后还会有Spark的内存等源码分析哦