一、问题描述
YARN 版本升至2.9.1以后,RM每过一段时间就会发生一次切换且GC时间巨长,通过监控发现RM的内存一直在缓慢的增加。
二、问题分析
通过分析RM内存发现,在RM内存中95.63%的空间被1603个RMNodeImpl占用。
在RMNodeImpl中有一个hashset中包含13w+个元素,导致每个RMNodeImpl约14M。
这包含14w+的hashset是completedContainers
为什么会有那么多的completedContainer没有释放呢?
当NM汇报心跳时,RMNodeImpl会把非Launched(即已完成)的和lost的container添加到completedContainers中。响应心跳时会将containersToBeRemovedFromNM中的container从completedContainers中删除,同时也会让NM删除containersToBeRemovedFromNM中的container。
//RMNodeImpl.java
publicvoidsetAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponseresponse) {
this.writeLock.lock();
try{
response.addAllContainersToCleanup(
newArrayList<ContainerId>(this.containersToClean));
response.addAllApplicationsToCleanup(this.finishedApplications);
//将containersToBeRemovedFromNM中的container发送给NM,让其删除
response.addContainersToBeRemovedFromNM(
newArrayList<ContainerId>(this.containersToBeRemovedFromNM));
response.addAllContainersToSignal(this.containersToSignal);
//从completedContainers中删除containersToBeRemovedFromNM中的container
this.completedContainers.removeAll(this.containersToBeRemovedFromNM);
this.containersToClean.clear();
this.finishedApplications.clear();
this.containersToSignal.clear();
this.containersToBeRemovedFromNM.clear();
//略
}
containersToBeRemovedFromNM中的container是从RMNodeFinishedContainersPulledByAMEvent事件中获取的。而RMNodeFinishedContainersPulledByAMEvent是由RMAppAttempt发送过来的。
//RMNodeImpl.java
publicstaticclassAddContainersToBeRemovedFromNMTransitionimplements
SingleArcTransition<RMNodeImpl,RMNodeEvent>{
@Override
publicvoidtransition(RMNodeImplrmNode,RMNodeEventevent) {
rmNode.containersToBeRemovedFromNM.addAll(((
RMNodeFinishedContainersPulledByAMEvent)event).getContainers());
}
}
当RMAppAttempt接收到CONTAINER_FINISHED事件后,会去判断该container是不是AMContainer,如果不是AMContainer,则会将该container加到justFinishedContainers这个hashset中,AM汇报心跳时会调用RMAppAttempt.pullJustFinishedContainers()方法。RMAppAttempt.pullJustFinishedContainers()是清理completed containers的一个至关重要的方法。在该方法中首先会调用sendFinishedContainersToNM()方法将finishedContainersSentToAM中的container发送给NM,也就是在RMNodeI中从completedcontainers中删除的那部分container。然后将justFinishedContainers中的container转移到finishedContainersSentToAM中,这么做的目的是由AM决定这些container是否删除,而不是由NM决定。在下一次心跳的时候会再次将finishedContainersSentToAM中container发送给NM。
//RMAppAttempt.java
publicList<ContainerStatus>pullJustFinishedContainers() {
this.writeLock.lock();
try{
List<ContainerStatus>returnList=newArrayList<>();
// A new allocate means the AM received the previously sent
// finishedContainers. We can ack this to NM now
sendFinishedContainersToNM();//把Finished Container发送给AM
// Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt
booleankeepContainersAcrossAppAttempts=this.submissionContext
.getKeepContainersAcrossApplicationAttempts();
//将justFinishedContainers中的container转移到 finishedContainersSentToAM中,然后将
//justFinishedContainers清空
for(Map.Entry<NodeId,List<ContainerStatus>>entry:
justFinishedContainers.entrySet()) {
NodeIdnodeId=entry.getKey();
List<ContainerStatus>finishedContainers=entry.getValue();
if(finishedContainers.isEmpty()) {
continue;
}
if(keepContainersAcrossAppAttempts) {
returnList.addAll(finishedContainers);
}else{
// Filter out containers from previous attempt
for(ContainerStatuscontainerStatus:finishedContainers) {
if(containerStatus.getContainerId().getApplicationAttemptId()
.equals(this.getAppAttemptId())) {
returnList.add(containerStatus);
}
}
}
finishedContainersSentToAM.putIfAbsent(nodeId,
newArrayList<ContainerStatus>());
finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
}
justFinishedContainers.clear();
returnreturnList;
}finally{
this.writeLock.unlock();
}
}
在sendFinishedContainersToNM()方法中会从finishedContainersSentToAM中拿出nodeId和containerId用于构建RMNodeFinishedContainersPulledByAMEvent事件,然后清空finishedContainersSentToAM。
//RMAppAttempt.java
privatevoidsendFinishedContainersToNM() {
for(NodeIdnodeId:finishedContainersSentToAM.keySet()) {
// Clear and get current values
List<ContainerStatus>currentSentContainers=
finishedContainersSentToAM.put(nodeId,
newArrayList<ContainerStatus>());
List<ContainerId>containerIdList=
newArrayList<>(currentSentContainers.size());
for(ContainerStatuscontainerStatus:currentSentContainers) {
containerIdList.add(containerStatus.getContainerId());
}
eventHandler.handle(newRMNodeFinishedContainersPulledByAMEvent(nodeId,
containerIdList));
}
this.finishedContainersSentToAM.clear();
}
如果完成的是AMContainer,则会调用RMAppAttempt.amContainerFinished清理AM所占用的container。首先从RMAppAttemptContainerFinishedEvent事件中拿到NodeId和ContainerStatus加到finishedContainersSentToAM中,然后调用appAttempt.sendFinishedAMContainerToNM方法清理该container。
//RMAppAttempt.java
privatestaticvoidamContainerFinished(RMAppAttemptImplappAttempt,
RMAppAttemptContainerFinishedEventcontainerFinishedEvent) {
NodeIdnodeId=containerFinishedEvent.getNodeId();//获取NodeId
ContainerStatuscontainerStatus=
containerFinishedEvent.getContainerStatus();//获取ContainerStatus
if(containerStatus!=null) {
intexitStatus=containerStatus.getExitStatus();
if(shouldCountTowardsNodeBlacklisting(exitStatus)) {
appAttempt.addAMNodeToBlackList(nodeId);
}
}else{
LOG.warn("No ContainerStatus in containerFinishedEvent");
}
if(!appAttempt.getSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
newArrayList<ContainerStatus>());
//将container加入到finishedContainersSentToAM中
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
appAttempt.sendFinishedContainersToNM();//清理container
}else{
appAttempt.sendFinishedAMContainerToNM(nodeId,
containerStatus.getContainerId());
}
}
基本流程如下(红线是与AM心跳有关):
考虑这一场景:一个普通container刚刚完成被加入到了justFinishedContainers中,这个container需要等待AM的心跳才可以被加入到finishedContainersSentToAM中,然后在AM的再一次心跳或者清理AMContainer时才可以被清理掉。如果在这个时候AM运行完成取消注册了,AM就不会再汇报心跳了,justFinishedContainers中的container就永远不会再有机会转移到finishedContainersSentToAM中,AMContainer在清理的时候也不会清理掉这部分的container。那么RMNodeImpl的completedContainers中的这些container也就不会被清理掉一直留在RM的内存中。
还有一个问题,如果RMAppAttempt已经处于最终状态(FINISHED,KILLED,FAILED),再接受到CONTAINER_FINISHED事件时,仅将该container加入到justFinishedContainer中,没有清理AMContainer。也会造成AMContainer释放不掉。
//RMAppAttempt.java
privatestaticfinalclassContainerFinishedAtFinalStateTransition
extendsBaseTransition{
@Override
publicvoid
transition(RMAppAttemptImplappAttempt,RMAppAttemptEventevent) {
RMAppAttemptContainerFinishedEventcontainerFinishedEvent=
(RMAppAttemptContainerFinishedEvent)event;
// Normal container. Add it in completed containers list
//仅仅将AMcontainer加入到justFinishedContainer中
addJustFinishedContainer(appAttempt,containerFinishedEvent);
}
}
三、问题优化
问题产生的原因是普通container在被加入到justFinishedContainer中后,因为AM的取消注册来不及转移到finishedContainersSentToAM中,导致这些container被留在内存中。再加上处于最终状态的RMAppAttempt接收到CONTAINER_FINISHED事件后,没有对AMContainer进行清理,导致RMNodeImpl占用内存过大。
优化方案:处于最终状态的RMAppAttempt接收到CONTAINER_FINISHED事件后对AMContainer进行清理,并且在清理AMContainer的时候判断justFinishedContainer是否为空,如果不为空则将justFinishedContainer中的NodeId和ContainerId转移到finishedContainersSentToAM中发送给NM。
//RMAppAttempt.java
privatestaticfinalclassContainerFinishedAtFinalStateTransition
extendsBaseTransition{
@Override
publicvoid
transition(RMAppAttemptImplappAttempt,RMAppAttemptEventevent) {
RMAppAttemptContainerFinishedEventcontainerFinishedEvent=
(RMAppAttemptContainerFinishedEvent)event;
// Normal container. Add it in completed containers list
addJustFinishedContainer(appAttempt,containerFinishedEvent);
appAttempt.amContainerFinished(appAttempt,containerFinishedEvent);//清理AMContainer
}
}
//RMAppAttempt.java
privatestaticvoidamContainerFinished(RMAppAttemptImplappAttempt,
RMAppAttemptContainerFinishedEventcontainerFinishedEvent) {
NodeIdnodeId=containerFinishedEvent.getNodeId();
ContainerStatuscontainerStatus=
containerFinishedEvent.getContainerStatus();
if(containerStatus!=null) {
intexitStatus=containerStatus.getExitStatus();
if(shouldCountTowardsNodeBlacklisting(exitStatus)) {
appAttempt.addAMNodeToBlackList(nodeId);
}
}else{
LOG.warn("No ContainerStatus in containerFinishedEvent");
}
if(!appAttempt.getSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
newArrayList<ContainerStatus>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
//将justFinishedContainer中的container转移到finishedContainersSentToAM中
if(!appAttempt.justFinishedContainers.isEmpty()) {
for(Map.Entry<NodeId,List<ContainerStatus>>justFinishedContainer
:appAttempt.justFinishedContainers.entrySet()) {
List<ContainerStatus>justFinishedContainerStatus=justFinishedContainer.getValue();
NodeIdjustFinishedNodeId=justFinishedContainer.getKey();
appAttempt.finishedContainersSentToAM.putIfAbsent(justFinishedNodeId,newArrayList<ContainerStatus>());
appAttempt.finishedContainersSentToAM.get(justFinishedNodeId).addAll(justFinishedContainerStatus);
}
}
appAttempt.sendFinishedContainersToNM();
}else{
appAttempt.sendFinishedAMContainerToNM(nodeId,
containerStatus.getContainerId());
}
}
四、优化结果
---after completedContainers contains---打印的日志表示从completedContainers中删除containersToBeRemovedFromNM内的container后剩余的container。
优化前:NM多次心跳过后datanode1和datanode2的RMNodeImpl中依然有已完成任务的container。
优化后:completedContainers中没有container存在。