Hadoop YARN RM内存泄露问题分析

一、问题描述

YARN 版本升至2.9.1以后,RM每过一段时间就会发生一次切换且GC时间巨长,通过监控发现RM的内存一直在缓慢的增加。

image

二、问题分析

通过分析RM内存发现,在RM内存中95.63%的空间被1603个RMNodeImpl占用。

image
image

在RMNodeImpl中有一个hashset中包含13w+个元素,导致每个RMNodeImpl约14M。

image

这包含14w+的hashset是completedContainers

image

为什么会有那么多的completedContainer没有释放呢?

当NM汇报心跳时,RMNodeImpl会把非Launched(即已完成)的和lost的container添加到completedContainers中。响应心跳时会将containersToBeRemovedFromNM中的container从completedContainers中删除,同时也会让NM删除containersToBeRemovedFromNM中的container。

//RMNodeImpl.java

publicvoidsetAndUpdateNodeHeartbeatResponse(

NodeHeartbeatResponseresponse) {

this.writeLock.lock();

try{

response.addAllContainersToCleanup(

newArrayList<ContainerId>(this.containersToClean));

response.addAllApplicationsToCleanup(this.finishedApplications);

//将containersToBeRemovedFromNM中的container发送给NM,让其删除

response.addContainersToBeRemovedFromNM(

newArrayList<ContainerId>(this.containersToBeRemovedFromNM));

response.addAllContainersToSignal(this.containersToSignal);

//从completedContainers中删除containersToBeRemovedFromNM中的container

this.completedContainers.removeAll(this.containersToBeRemovedFromNM);

this.containersToClean.clear();

this.finishedApplications.clear();

this.containersToSignal.clear();

this.containersToBeRemovedFromNM.clear();

//略

}

containersToBeRemovedFromNM中的container是从RMNodeFinishedContainersPulledByAMEvent事件中获取的。而RMNodeFinishedContainersPulledByAMEvent是由RMAppAttempt发送过来的。

//RMNodeImpl.java

publicstaticclassAddContainersToBeRemovedFromNMTransitionimplements

SingleArcTransition<RMNodeImpl,RMNodeEvent>{

​

@Override

publicvoidtransition(RMNodeImplrmNode,RMNodeEventevent) {

rmNode.containersToBeRemovedFromNM.addAll(((

RMNodeFinishedContainersPulledByAMEvent)event).getContainers());

  }

}

当RMAppAttempt接收到CONTAINER_FINISHED事件后,会去判断该container是不是AMContainer,如果不是AMContainer,则会将该container加到justFinishedContainers这个hashset中,AM汇报心跳时会调用RMAppAttempt.pullJustFinishedContainers()方法。RMAppAttempt.pullJustFinishedContainers()是清理completed containers的一个至关重要的方法。在该方法中首先会调用sendFinishedContainersToNM()方法将finishedContainersSentToAM中的container发送给NM,也就是在RMNodeI中从completedcontainers中删除的那部分container。然后将justFinishedContainers中的container转移到finishedContainersSentToAM中,这么做的目的是由AM决定这些container是否删除,而不是由NM决定。在下一次心跳的时候会再次将finishedContainersSentToAM中container发送给NM。

//RMAppAttempt.java

publicList<ContainerStatus>pullJustFinishedContainers() {

this.writeLock.lock();

​

try{

List<ContainerStatus>returnList=newArrayList<>();

​

// A new allocate means the AM received the previously sent

// finishedContainers. We can ack this to NM now

sendFinishedContainersToNM();//把Finished Container发送给AM

​

// Mark every containerStatus as being sent to AM though we may return

// only the ones that belong to the current attempt

booleankeepContainersAcrossAppAttempts=this.submissionContext

.getKeepContainersAcrossApplicationAttempts();

//将justFinishedContainers中的container转移到 finishedContainersSentToAM中,然后将

//justFinishedContainers清空

for(Map.Entry<NodeId,List<ContainerStatus>>entry:

justFinishedContainers.entrySet()) {

NodeIdnodeId=entry.getKey();

List<ContainerStatus>finishedContainers=entry.getValue();

if(finishedContainers.isEmpty()) {

continue;

     }

​

if(keepContainersAcrossAppAttempts) {

returnList.addAll(finishedContainers);

}else{

// Filter out containers from previous attempt

for(ContainerStatuscontainerStatus:finishedContainers) {

if(containerStatus.getContainerId().getApplicationAttemptId()

.equals(this.getAppAttemptId())) {

returnList.add(containerStatus);

         }

       }

     }

​

finishedContainersSentToAM.putIfAbsent(nodeId,

newArrayList<ContainerStatus>());

finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);

   }

justFinishedContainers.clear();

​

returnreturnList;

}finally{

this.writeLock.unlock();

  }

}

在sendFinishedContainersToNM()方法中会从finishedContainersSentToAM中拿出nodeId和containerId用于构建RMNodeFinishedContainersPulledByAMEvent事件,然后清空finishedContainersSentToAM。

//RMAppAttempt.java

privatevoidsendFinishedContainersToNM() {

for(NodeIdnodeId:finishedContainersSentToAM.keySet()) {

​

// Clear and get current values

List<ContainerStatus>currentSentContainers=

finishedContainersSentToAM.put(nodeId,

newArrayList<ContainerStatus>());

List<ContainerId>containerIdList=

newArrayList<>(currentSentContainers.size());

for(ContainerStatuscontainerStatus:currentSentContainers) {

containerIdList.add(containerStatus.getContainerId());

   }

eventHandler.handle(newRMNodeFinishedContainersPulledByAMEvent(nodeId,

containerIdList));

  }

this.finishedContainersSentToAM.clear();

}

如果完成的是AMContainer,则会调用RMAppAttempt.amContainerFinished清理AM所占用的container。首先从RMAppAttemptContainerFinishedEvent事件中拿到NodeId和ContainerStatus加到finishedContainersSentToAM中,然后调用appAttempt.sendFinishedAMContainerToNM方法清理该container。

//RMAppAttempt.java

privatestaticvoidamContainerFinished(RMAppAttemptImplappAttempt,

RMAppAttemptContainerFinishedEventcontainerFinishedEvent) {

​

NodeIdnodeId=containerFinishedEvent.getNodeId();//获取NodeId

​

ContainerStatuscontainerStatus=

containerFinishedEvent.getContainerStatus();//获取ContainerStatus

if(containerStatus!=null) {

intexitStatus=containerStatus.getExitStatus();

if(shouldCountTowardsNodeBlacklisting(exitStatus)) {

appAttempt.addAMNodeToBlackList(nodeId);

   }

}else{

LOG.warn("No ContainerStatus in containerFinishedEvent");

  }

​

if(!appAttempt.getSubmissionContext()

.getKeepContainersAcrossApplicationAttempts()) {

appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,

newArrayList<ContainerStatus>());

//将container加入到finishedContainersSentToAM中

appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);

appAttempt.sendFinishedContainersToNM();//清理container

}else{

appAttempt.sendFinishedAMContainerToNM(nodeId,

containerStatus.getContainerId());

  }

}

基本流程如下(红线是与AM心跳有关):

考虑这一场景:一个普通container刚刚完成被加入到了justFinishedContainers中,这个container需要等待AM的心跳才可以被加入到finishedContainersSentToAM中,然后在AM的再一次心跳或者清理AMContainer时才可以被清理掉。如果在这个时候AM运行完成取消注册了,AM就不会再汇报心跳了,justFinishedContainers中的container就永远不会再有机会转移到finishedContainersSentToAM中,AMContainer在清理的时候也不会清理掉这部分的container。那么RMNodeImpl的completedContainers中的这些container也就不会被清理掉一直留在RM的内存中。

还有一个问题,如果RMAppAttempt已经处于最终状态(FINISHED,KILLED,FAILED),再接受到CONTAINER_FINISHED事件时,仅将该container加入到justFinishedContainer中,没有清理AMContainer。也会造成AMContainer释放不掉。

//RMAppAttempt.java

privatestaticfinalclassContainerFinishedAtFinalStateTransition

extendsBaseTransition{

@Override

publicvoid

transition(RMAppAttemptImplappAttempt,RMAppAttemptEventevent) {

RMAppAttemptContainerFinishedEventcontainerFinishedEvent=

(RMAppAttemptContainerFinishedEvent)event;

// Normal container. Add it in completed containers list

//仅仅将AMcontainer加入到justFinishedContainer中

addJustFinishedContainer(appAttempt,containerFinishedEvent);

  }

}

三、问题优化

问题产生的原因是普通container在被加入到justFinishedContainer中后,因为AM的取消注册来不及转移到finishedContainersSentToAM中,导致这些container被留在内存中。再加上处于最终状态的RMAppAttempt接收到CONTAINER_FINISHED事件后,没有对AMContainer进行清理,导致RMNodeImpl占用内存过大。

优化方案:处于最终状态的RMAppAttempt接收到CONTAINER_FINISHED事件后对AMContainer进行清理,并且在清理AMContainer的时候判断justFinishedContainer是否为空,如果不为空则将justFinishedContainer中的NodeId和ContainerId转移到finishedContainersSentToAM中发送给NM。

//RMAppAttempt.java

privatestaticfinalclassContainerFinishedAtFinalStateTransition

extendsBaseTransition{

@Override

publicvoid

transition(RMAppAttemptImplappAttempt,RMAppAttemptEventevent) {

RMAppAttemptContainerFinishedEventcontainerFinishedEvent=

(RMAppAttemptContainerFinishedEvent)event;

// Normal container. Add it in completed containers list

addJustFinishedContainer(appAttempt,containerFinishedEvent);

appAttempt.amContainerFinished(appAttempt,containerFinishedEvent);//清理AMContainer

  }

}

//RMAppAttempt.java

privatestaticvoidamContainerFinished(RMAppAttemptImplappAttempt,

RMAppAttemptContainerFinishedEventcontainerFinishedEvent) {

​

NodeIdnodeId=containerFinishedEvent.getNodeId();

​

ContainerStatuscontainerStatus=

containerFinishedEvent.getContainerStatus();

if(containerStatus!=null) {

intexitStatus=containerStatus.getExitStatus();

if(shouldCountTowardsNodeBlacklisting(exitStatus)) {

appAttempt.addAMNodeToBlackList(nodeId);

   }

}else{

LOG.warn("No ContainerStatus in containerFinishedEvent");

  }

​

if(!appAttempt.getSubmissionContext()

.getKeepContainersAcrossApplicationAttempts()) {

appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,

newArrayList<ContainerStatus>());

appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);

//将justFinishedContainer中的container转移到finishedContainersSentToAM中

if(!appAttempt.justFinishedContainers.isEmpty()) {

for(Map.Entry<NodeId,List<ContainerStatus>>justFinishedContainer

:appAttempt.justFinishedContainers.entrySet()) {

List<ContainerStatus>justFinishedContainerStatus=justFinishedContainer.getValue();

NodeIdjustFinishedNodeId=justFinishedContainer.getKey();

appAttempt.finishedContainersSentToAM.putIfAbsent(justFinishedNodeId,newArrayList<ContainerStatus>());

appAttempt.finishedContainersSentToAM.get(justFinishedNodeId).addAll(justFinishedContainerStatus);

     }

   }

appAttempt.sendFinishedContainersToNM();

}else{

appAttempt.sendFinishedAMContainerToNM(nodeId,

containerStatus.getContainerId());

  }

}

四、优化结果

---after completedContainers contains---打印的日志表示从completedContainers中删除containersToBeRemovedFromNM内的container后剩余的container。

优化前:NM多次心跳过后datanode1和datanode2的RMNodeImpl中依然有已完成任务的container。

优化后:completedContainers中没有container存在。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。