1 start-slave.sh
# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.deploy.worker.Worker"
# Start up the appropriate number of workers on this machine.
# quick local function to start a worker
function start_instance {
WORKER_NUM=$1
shift
if [ "$SPARK_WORKER_PORT" = "" ]; then
PORT_FLAG=
PORT_NUM=
else
PORT_FLAG="--port"
PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 ))
fi
WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 ))
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
}
if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
start_instance 1 "$@"
else
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
start_instance $(( 1 + $i )) "$@"
done
fi
2 调用主函数
- 进入
org.apache.spark.deploy.worker.Worker.scala
private[deploy] object Worker extends Logging {
val SYSTEM_NAME = "sparkWorker"
val ENDPOINT_NAME = "Worker"
private val SSL_NODE_LOCAL_CONFIG_PATTERN = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
// With external shuffle service enabled, if we request to launch multiple workers on one host,
// we can only successfully launch the first worker and the rest fails, because with the port
// bound, we may launch no more than one external shuffle service on each host.
// When this happens, we should give explicit reason of failure instead of fail silently. For
// more detail see SPARK-20989.
val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
"Starting multiple workers on one host is failed because we may launch no more than one " +
"external shuffle service on each host, please set spark.shuffle.service.enabled to " +
"false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.")
rpcEnv.awaitTermination()
}
}
看看startRpcEnvAndEndpoint
方法
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
- 创建SecurityManager;
- 创建RpcEnv;
- 创建Worker(RpcEndpoint);
- 以"Worker"为服务名,注册Worker到RpcEnv。
其过程和Master启动类似,可参见Spark源码:启动Master。
3 创建RpcEnv
调用 NettyRpcEnvFactory 创建 RpcEnv。
- 进入
org.apache.spark.rpc.netty.NettyRpcEnvFactory.scala
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
- 新建NettyRpcEnv时,在其内部创建Dispatcher、NettyStreamManager、TransportContext等;
- NettyRpcEnv.startServer 时,调用其内部的TransportContext创建TransportServer;
- 在TransportContext创建TransportServer时,会调用 init 初始化TransportServer;
- 在初始化TransportServer时,基于Netty API初始化ServerBootstrap,设置管道初始化器ChannelInitializer到ServerBootstrap内部的ChannelHandler,为ServerBootstrap绑定InetSocketAddress等。
4 创建并注册Worker(RpcEndpoint)
- 进入
org.apache.spark.rpc.netty.Dispatcher.scala
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data) // for the OnStart message
}
endpointRef
}
- 创建NettyRpcEndpointRef;
- 创建EndpointData(name, endpoint, endpointRef),加入Dispatcher的内部队列;
- 创建EndpointData时会创建Inbox,创建Inbox时会加入
OnStart
到Inbox的内部队列;
5. 取出OnStart处理
放入到Inbox内部队列的OnStart
,会被Dispatcher.MessageLoop取出来,然后调用Inbox.process()处理,最终会调用endpoint.onStart()。
- 进入
org.apache.spark.deploy.worker.Worker.scala
override def onStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
startExternalShuffleService()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
- 创建Worker工作目录;
- 绑定WorkerWebUI;
- 向Master进行注册;
- 启动MetricsSystem。
- 进入
org.apache.spark.deploy.worker.Worker.scala
private def registerWithMaster() {
// onDisconnected may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
registrationRetryTimer match {
case None =>
registered = false
registerMasterFutures = tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
注册 Worker 到所有 Master 上。
5.1 尝试注册到所有Master上
- 进入
org.apache.spark.deploy.worker.Worker.scala
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
- 遍历 masterRpcAddresses: Array[RpcAddress],每个 masterRpcAddress 都启一个新线程提交到线程池中;
- 新线程中利用之前创建的 RpcEnv,基于 masterRpcAddress 和 masterEndpointName 获取对应的 masterRpcEndpointRef;
- 利用 masterRpcEndpointRef 发送注册消息 RegisterWorker 给 masterRpcEndpoint,用于注册 Worker 到 Master。
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.send(RegisterWorker(
workerId,
host,
port,
self,
cores,
memory,
workerWebUiUrl,
masterEndpoint.address))
}
消息的发送接收流程是通用的。
5.2 通用消息发送流程
- 进入
org.apache.spark.rpc.netty.NettyRpcEndpointRef.scala
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
- 进入
org.apache.spark.rpc.netty.NettyRpcEnv.scala
private[netty] def send(message: RequestMessage): Unit = {
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
// Message to a local RPC endpoint.
try {
dispatcher.postOneWayMessage(message)
} catch {
case e: RpcEnvStoppedException => logDebug(e.getMessage)
}
} else {
// Message to a remote RPC endpoint.
postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
}
}
此处,remoteAddr != address,走postToOutbox分支,发送OneWayOutboxMessage。
- 进入
org.apache.spark.rpc.netty.NettyRpcEnv.scala
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
if (receiver.client != null) {
message.sendWith(receiver.client)
} else {
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
val newOutbox = new Outbox(this, receiver.address)
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
if (oldOutbox == null) {
newOutbox
} else {
oldOutbox
}
} else {
outbox
}
}
if (stopped.get) {
// It's possible that we put `targetOutbox` after stopping. So we need to clean it.
outboxes.remove(receiver.address)
targetOutbox.stop()
} else {
targetOutbox.send(message)
}
}
}
- 从ConcurrentHashMap[RpcAddress, Outbox]中,根据receiver.address获取Outbox,如果未获取到,则新创建Outbox;
- targetOutbox.send(message) 发送消息。
- 进入
org.apache.spark.rpc.netty.Outbox.scala
/**
* Send a message. If there is no active connection, cache it and launch a new connection. If
* [[Outbox]] is stopped, the sender will be notified with a [[SparkException]].
*/
def send(message: OutboxMessage): Unit = {
val dropped = synchronized {
if (stopped) {
true
} else {
messages.add(message)
false
}
}
if (dropped) {
message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
} else {
drainOutbox()
}
}
- 将消息加入队列LinkedList[OutboxMessage]中;
- drainOutbox取出队列中的消息。
- 进入
org.apache.spark.rpc.netty.Outbox.scala
/**
* Drain the message queue. If there is other draining thread, just exit. If the connection has
* not been established, launch a task in the `nettyEnv.clientConnectionExecutor` to setup the
* connection.
*/
private def drainOutbox(): Unit = {
var message: OutboxMessage = null
synchronized {
if (stopped) {
return
}
if (connectFuture != null) {
// We are connecting to the remote address, so just exit
return
}
if (client == null) {
// There is no connect task but client is null, so we need to launch the connect task.
launchConnectTask()
return
}
if (draining) {
// There is some thread draining, so just exit
return
}
message = messages.poll()
if (message == null) {
return
}
draining = true
}
while (true) {
try {
val _client = synchronized { client }
if (_client != null) {
message.sendWith(_client)
} else {
assert(stopped == true)
}
} catch {
case NonFatal(e) =>
handleNetworkFailure(e)
return
}
synchronized {
if (stopped) {
return
}
message = messages.poll()
if (message == null) {
draining = false
return
}
}
}
}
- 调用 launchConnectTask 获取 TransportClient;
- messages.poll 取出队列中的消息;
- 利用 TransportClient 发送消息。
- 进入
org.apache.spark.rpc.netty.OneWayOutboxMessage.scala
override def sendWith(client: TransportClient): Unit = {
client.send(content)
}
- 进入
org.apache.spark.network.client.TransportClient.java
/**
* Sends an opaque message to the RpcHandler on the server-side. No reply is expected for the
* message, and no delivery guarantees are made.
*
* @param message The message to send.
*/
public void send(ByteBuffer message) {
channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
}
调用 Netty 的 Channel.writeAndFlush 发送 OneWayMessage。
5.3 通用消息处理流程
消息发送出去后,经由Netty发送给Server端的TransportChannelHandler去处理。
在 NettyRpcEnv.startServer() 时,会利用 TransportContext 创建 TransportServer,在创建始化 TransportServer 时会初始化管道,加入TransportChannelHandler。
- 进入
org.apache.spark.network.server.TransportChannelHandler.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
判断消息类型,调用不同的Handler处理。
- 进入
org.apache.spark.network.server.TransportRequestHandler.java
@Override
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
processFetchRequest((ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else if (request instanceof UploadStream) {
processStreamUpload((UploadStream) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
在Worker端,发送的是OneWayMessage。
- 进入
org.apache.spark.network.server.TransportRequestHandler.java
private void processOneWayMessage(OneWayMessage req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
} finally {
req.body().release();
}
}
调用 RpcHandler 来真正处理消息。
- 进入
org.apache.spark.rpc.netty.NettyRpcHandler.scala
override def receive(
client: TransportClient,
message: ByteBuffer): Unit = {
val messageToDispatch = internalReceive(client, message)
dispatcher.postOneWayMessage(messageToDispatch)
}
- 创建用于内部接收处理的消息;
- 调用Dispatcher.postOneWayMessage发送内部消息。
- 进入
org.apache.spark.rpc.netty.Dispatcher.scala
/** Posts a one-way message. */
def postOneWayMessage(message: RequestMessage): Unit = {
postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
(e) => throw e)
}
/**
* Posts a message to a specific endpoint.
*
* @param endpointName name of the endpoint.
* @param message the message to post
* @param callbackIfStopped callback function if the endpoint is stopped.
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
这里就是向Dispatcher内部队列中加入待处理消息的地方,Dispatcher.MessageLoop 循环取出队列中的消息调用 EndpointData.inbox.process 处理。
具体过程可见 Spark源码:启动Master 中的 Dispatcher 里消息处理过程。
- 进入
org.apache.spark.rpc.netty.Inbox.scala
/**
* Process stored messages.
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
return
}
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case e: Throwable =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}
匹配OneWayMessage,进入对应分支,此处的 endpoint 是 Master,因此进入Master.receive方法。
- 进入
org.apache.spark.deploy.master.Master.scala
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader =>
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
// 省略部分内容
}
匹配RegisterWorker,走对应分支。
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
val workerAddress = worker.endpoint.address
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker, "Worker replaced by a new worker with same address")
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
true
}
- 正常情况下,创建WorkerInfo,尝试注册Worker到Master上,判断Master中有没有与当前注册Worker地址相同且状态为DEAD或UNKNOWN的Worker,如果有则将其移除,然后将 WorkerInfo 加入 Master 内部的 HashSet[WorkerInfo] 中;
- 发送 RegisteredWorker(实现接口RegisterWorkerResponse) 消息给当前注册的Worker对应的RpcEndpointRef,用于告知Worker注册成功;
- 调用 schedule() 为正在等待的 apps 规划可用资源,该方法在有新app加入或可用资源改变时都会被调用。新提交app时需要调用schedule()方法来规划一下,看是否有足够的资源启动运行该app;同时资源变化时也要调用schedule()方法规划一下,看正在等待的app是否可以获得足够资源启动运行了。
消息发送接收流程和之前一样。
5.4 返回消息发送给Worker
- 回到
org.apache.spark.deploy.worker.Worker.scala
override def receive: PartialFunction[Any, Unit] = synchronized {
case msg: RegisterWorkerResponse =>
handleRegisterResponse(msg)
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
// 省略部分内容
}
发送的消息是RegisterWorkerResponse类型,进入对应分支。
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
if (preferConfiguredMasterAddress) {
logInfo("Successfully registered with master " + masterAddress.toSparkURL)
} else {
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
}
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
case MasterInStandby =>
// Ignore. Master not yet ready.
}
}
在此处,会启一个新线程,定时调用 self.send(SendHeartbeat) 发送心跳。
5.5 心跳
调用 self.send(SendHeartbeat) 发送心跳时,self 表示自身对应的RpcEndpointRef,因此 Worker 会给自己发一个 SendHeartbeat 消息。
消息发送接收流程类似,最终由 Worker.receive 方法处理消息 SendHeartbeat,走 SendHeartbeat 对应分支()。
- 进入
org.apache.spark.deploy.worker.Worker.scala
override def receive: PartialFunction[Any, Unit] = synchronized {
case msg: RegisterWorkerResponse =>
handleRegisterResponse(msg)
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
// 省略部分内容
}
发送心跳信息给Master。
/**
* Send a message to the current master. If we have not yet registered successfully with any
* master, the message will be dropped.
*/
private def sendToMaster(message: Any): Unit = {
master match {
case Some(masterRef) => masterRef.send(message)
case None =>
logWarning(
s"Dropping $message because the connection to master has not yet been established")
}
}
消息发送接收流程类似。
消息 Heartbeat 最后发送给 Master 去处理。
- 再次回到
org.apache.spark.deploy.master.Master.scala
override def receive: PartialFunction[Any, Unit] = {
case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
// 省略部分信息
}
此处会更新注册在 Master 上的 WorkerInfo.lastHeartbeat 为当前时间。
5.6 心跳超时检测
啥时候检测心跳超时呢?
在Master启动时,最后会调用 onStart() 方法,详见 Spark源码:启动Master,Worker启动时也是类似。
- 进入
org.apache.spark.deploy.master.Master.scala
override def onStart(): Unit = {
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
// 省略部分内容
}
启新线程定时调用 self.send(CheckForWorkerTimeOut)。
和上面类似,self 表示自身对应的 RpcEndpointRef,即 Master,消息发给自己,自己在 receive 方法中接收处理。
- 进入
org.apache.spark.deploy.master.Master.scala
override def receive: PartialFunction[Any, Unit] = {
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
// 省略部分内容
}
/** Check for, and remove, any timed-out workers */
private def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}
找出心跳超时的 Worker,从 Master 的注册信息中移除。