CoarseGrainedExecutorBackend:worker中为Applicaiton启动的executor,实际上启动了这个CoarseGrainedExecutorBackend进程
org.apache.spark.executor.CoarseGrainedExecutorBackend#preStart
org.apache.spark.executor.CoarseGrainedExecutorBackend#receiveWithLogging
//actor初始化
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
//获取driver的actor
driver = context.actorSelection(driverUrl)
//向driver发送RegisterExecutor消息(下图第一步)
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receiveWithLogging = {
//driver注册executor成功之后,会发送回来RegisteredExecutor消息
//此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄(下图第二步)
//其实,CoarseGrainedExecutorBackend的大部分功能都是通过Executor实现的
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
//driver内的taskScheduler向CoarseGrainedExecutorBackend发送LauncheTask消息,启动task(下图第三步)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
//反序列化task
val ser = env.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//用内部的执行句柄executor的launchTask()方法,来启动搞一个task(下图第四步)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
} else {
executor.killTask(taskId, interruptThread)
}
case x: DisassociatedEvent =>
if (x.remoteAddress == driver.anchorPath.address) {
logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
} else {
logWarning(s"Received irrelevant DisassociatedEvent $x")
}
case StopExecutor =>
logInfo("Driver commanded a shutdown")
executor.stop()
context.stop(self)
context.system.shutdown()
}
org.apache.spark.executor.Executor#launchTask
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer) {
//对于每一个task,都会创建一个TaskRunner
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
//将taskRunner放入内存缓存
runningTasks.put(taskId, tr)
//Java线程池执行taskRunner(下图第五步)
threadPool.execute(tr)
}
image.png