ambari-agent启动流程
ambari-agent启动通过命令ambari-agent start
,命令实际执行nohup $PYTHON $AMBARI_AGENT_PY_SCRIPT
即python AmbariAgent.py
AMBARI_AGENT_PY_SCRIPT=/usr/lib/ambari-agent/lib/ambari_agent/AmbariAgent.py
AmbariAgent.py
通过subprocess子进程执行main.py
,该进程会会从配置文件中获取相关需要的配置信息,数据文件清理(err,auto,output*等文件),通过启动参数来重启或关闭agent,监听端口,获取server的地址,然后和server建立连接,一旦建立连接,调用run-thread方法开始agent和server的通信过程,在run_threads中启动了Controller线程。
Ambari-agent 通过Controller.py
的方法与Ambari-server 中的AgentResource(org.apache.ambari.server.agent.rest)
进行交互(获得集群配置变更,报告节点属性,以及节点上运行服务运行状态),并通过HTTP Response返回ambari-server投递过来的状态操作到操作队列ActionQueue。ActionQueue默认是使用并行模式执行command
Post请求 注册关于主机的信息
Post请求 心跳连接(更新节点的状态)
Get 请求 检索用于集群上的组件映射
基础参数释义
ExecuteCommand:对服务组件执行INSTALL/START/STOP等操作。
StatusCommand:对服务组件执行死活检查(由Server定期下发)。需要server确定
CancelCommand:取消其他已经下发的Task(当Stage中的某个Task失败时)。
RegistrationCommand:要求Agent向Server重新注册(当发现Server维护的心跳序号与Agent上报的不一致时)
心跳流程源码分析
heartbeatWithServer
方法属性释义
self.DEBUG_HEARTBEAT_RETRIES = 0 # debug心跳重试次数
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 # debug心跳成功次数
retry = False # 是否重试,心跳开始的时候默认False
certVerifFailed = False
state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60')) # 日志记录间隔 默认60s
# last time when state was successfully sent to server
last_state_timestamp = 0.0 # 上一次记录日志的时间
# 为了确保心跳的正常运行,我们会记录一些日志,但是为了避免高频次的日志记录,引用了state_interval来间隔记录日志
heartbeat_running_msg_timestamp = 0.0 #
# 通过只在特定的时间间隔内进行日志记录来防止过度日志记录
getrecoverycommands_timestamp = 0.0 # 获取recoverycommands的时间戳
getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC #获取recoverycommands时间间隔默认为10s
heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC # 心跳的默认时间间隔是10s
出于测试目的
DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
非测试环境下初始化心跳进入while循环
while not self.DEBUG_STOP_HEARTBEATING:
while循环体内部解析:
- 默认的日志级别为DEBUG,如果
当前时间 - 心跳日志输出时间 > 日志输出间隔
则将日志级别改成INFO,将心跳日志输出时间设为当前时间
while not self.DEBUG_STOP_HEARTBEATING:
current_time = time.time()
logging_level = logging.DEBUG
if current_time - heartbeat_running_msg_timestamp > state_interval:
# log more steps every minute or so
logging_level = logging.INFO
heartbeat_running_msg_timestamp = current_time
- 发送心跳前的数据准备
- 日志记录当前的responseId,注册系统时返回的或者默认的-1
- send_state 是否发送系统状态信息(通过shell命令获取主机信息) ,默认不发送
- 判断是否重试retry,如果为true,则DEBUG_HEARTBEAT_RETRIES +1 否则判断
当前时间 - 上一次发送系统状态信息的时间 > state_interval(10s)
如果为true,则send_state 置为true,意味着需要发送系统状态信息给ambari-server端 - 组织需要发送给ambari-server的json数据
- 根据当前设置的日志级别 进行日志记录.如果是INFO级别的则会记录下当前发出的json数据
try:
logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
send_state = False
if not retry:
if current_time - last_state_timestamp > state_interval:
send_state = True
logger.log(logging_level, "Building heartbeat message")
data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
if logger.isEnabledFor(logging.DEBUG):
logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
else:
logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
- 与ambari-server建立通信并解析response
3.1 发送请求至ambari-server,并接受server传过来的response- exitstatus==0则表示本次通信成功, 不为0则抛出异常
- 获取responseId,并记录日志
- 或者集群节点数,来及时调整心跳的频率,如果集群节点>0 则
集群节点//HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC(10s)
取整除 - 返回商的整数部分(向下取整)
# HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 10 # HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC = 1 if(0 < cluster_size and cluster_size < 9 ) cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 0 ==> 则心跳频率为1s if(cluster_size > 9 and cluster_size <=100) cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC 整除取返回商部分 if(cluster_size > 100 ) cluster_size // HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC =10===>则心跳频率为10s
- 日志记录新的心跳频率
response = self.sendRequest(self.heartbeatUrl, data)
exitStatus = 0
if 'exitstatus' in response.keys():
exitStatus = int(response['exitstatus'])
if exitStatus != 0:
raise Exception(response)
serverId = int(response['responseId'])
logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
# TODO: this needs to be revised if hosts can be shared across multiple clusters
heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
if cluster_size > 0 \
else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
- 根据ambari-server 返回的值来调整部分设置
4.1 是否有映射的组件信息-----主要是心跳发送数据的时候如果无组件映射的话,会执行一些比较耗费性能的操作,根据response返回的信息来决定是否执行该操作
4.2 是否有处于等待的task,如果有的话,会暂停 自动恢复管理机(recovery_manager)的操作
4.3 是否存在注册命令,如果存在的话,则退出心跳,将设置isRegistered = False 以及repeatRegistration = True 并再次进行注册操作,意味着从头再来一次
if 'hasMappedComponents' in response.keys():
self.hasMappedComponents = response['hasMappedComponents'] is not False
if 'hasPendingTasks' in response.keys():
has_pending_tasks = bool(response['hasPendingTasks'])
self.recovery_manager.set_paused(has_pending_tasks)
if 'registrationCommand' in response.keys():
# check if the registration command is None. If none skip
if response['registrationCommand'] is not None:
logger.info("RegistrationCommand received - repeat agent registration")
self.isRegistered = False
self.repeatRegistration = True
return
- 根据指标来决定是否需要重启agent
5.1 获取当前进程使用的内存大小(单位KB)/1000 = MB
默认的软性指标为400MB 硬性指标为1000GB 可通过ambari-agent.ini配置- 当超过软性指标时且没有正在处理的任务时,进行agent重启
- 当大于等于硬性指标时,则强制进行agent重启
5.2 ambari-server返回的responseId与agent当前记录的responseId+1做比较,如果不相符则进入重启反之则更新最新的responseId,并更新上一次获取主机状态信息的时间last_state_timestamp=current_time
used_ram = get_used_ram() / 1000
# dealing with a possible memory leaks
if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
logger.error(
AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb",
max_ram=self.max_ram_soft))
self.restartAgent()
if self.max_ram_hard and used_ram >= self.max_ram_hard:
logger.error(
AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb",
max_ram=self.max_ram_hard))
self.restartAgent()
if serverId != self.responseId + 1:
logger.error(
"Error in responseId sequence - received responseId={0} from server while expecting {1} - restarting..."
.format(serverId, self.responseId + 1))
self.restartAgent()
else:
self.responseId = serverId
if send_state:
last_state_timestamp = current_time
- 通过心跳返回信息更新agent配置
# if the response contains configurations, update the in-memory and
# disk-based configuration cache (execution and alert commands have this)
logger.log(logging_level, "Updating configurations from heartbeat")
self.cluster_configuration.update_configurations_from_heartbeat(response)
7.根据ambari-server返回的不同command来分别执行相应的操作
- 因为需要取消的Commands可能会在其它类型Commands之前进行,这可能导致actionQueue执行操作的混乱,所以为了避免命令执行失败,所以会将actionQueue进行原子性操作,
- 先将cancelCommand进行移除(从actionQueue中remove假如它还没执行的话,或者直接kill假如它已经在执行中)
- executionCommands存在的话,则recovery_manager会根据执行的操作来动态调整预期状态方便之后进行recover,紧接着将executionCommands放入actionQueue
- statusCommands存在的话,recovery_manager会执行相应的操作,并放入statusCommandsExecutor通过线程去执行
- 通过时间的间隔查询来定期生成recovery_commands
# there's case when canceled task can be processed in
# Action Queue.execute before adding rescheduled task to queue
# this can cause command failure instead result suppression
# so canceling and putting rescheduled commands should be executed atomically
if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
logger.log(logging_level, "Adding cancel/execution commands")
with self.actionQueue.lock:
if 'cancelCommands' in response_keys:
self.cancelCommandInQueue(response['cancelCommands'])
if 'executionCommands' in response_keys:
execution_commands = response['executionCommands']
self.recovery_manager.process_execution_commands(execution_commands)
self.addToQueue(execution_commands)
if 'statusCommands' in response_keys:
# try storing execution command details and desired state
self.addToStatusQueue(response['statusCommands'])
if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
getrecoverycommands_timestamp = current_time
if not self.actionQueue.tasks_in_progress_or_pending():
logger.log(logging_level, "Adding recovery commands")
recovery_commands = self.recovery_manager.get_recovery_commands()
for recovery_command in recovery_commands:
logger.info("Adding recovery command %s for component %s",
recovery_command['roleCommand'], recovery_command['role'])
self.addToQueue([recovery_command])
if 'alertDefinitionCommands' in response_keys:
logger.log(logging_level, "Updating alert definitions")
self.alert_scheduler_handler.update_definitions(response)
if 'alertExecutionCommands' in response_keys:
logger.log(logging_level, "Executing alert commands")
self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
- server下发restart命令的话则进行agent重启,如果response中存在recoveryConfig则进行配置更新
if "true" == response['restartAgent']:
logger.error("Received the restartAgent command")
self.restartAgent()
else:
logger.debug("No commands sent from %s", self.serverHostname)
if retry:
logger.info("Reconnected to %s", self.heartbeatUrl)
if "recoveryConfig" in response:
# update the list of components enabled for recovery
logger.log(logging_level, "Updating recovery config")
self.recovery_manager.update_configuration_from_registration(response)
retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_stop_callback.reset_heartbeat()
- heartbeat_stop_callback 这个方法使用了python的threading,每一次心跳结束后进入阻塞,thread.wait(timeout)后才会进入下一次心跳,特殊情况,一旦非statusCommand类型的命令执行完成,也会立即发送心跳,触发threading.event.set(),立即进入下一次心跳
相关博客
如果有相关问题的话,可以给我留言,欢迎一起探讨ambari-agent !!