前言
实话实说,YARN之前并不是我特别擅长的领域,只是平时常用而已,但深入了解一番之后,发现还是蛮有意思的。YARN作为资源管理框架,资源调度和资源隔离就是它的两大基本任务,本文谈谈它如何实现内存资源的隔离。
下图就是写本文的起因,上个月在Flink社区群里班门弄斧回答了一个小问题。
这也符合笔者在前面的一篇文章中提到过的:
(Flink)任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,亦即在调度层面认为每个任务槽“应该”得到taskmanager.heap.size的N分之一大小的内存。CPU资源不算在内。
YARN Container之间同样默认只做内存隔离,具体逻辑在YARN的源码中有直接的体现。CPU隔离则是间接通过与Linux cgroup相关的高级参数来配置,这个就是后话了。
YARN的内存隔离本质上是监控内存的使用量。在真正读代码之前,先得来看进程树的相关细节。
/proc及进程树
YARN管理资源的基本单位是Container,而Container本质上就是Java进程。进程通常都会用fork的方式创建子进程,子进程也如是。因此单个Container的内存使用量应该是以Container进程为根的进程树中所有进程的内存使用量之和。
以Linux系统为例,观察/proc目录,如下图。
/proc目录是Linux系统中内核信息虚拟文件系统(procfs)的挂载点。其中有许多以PID命名的目录,这些目录中则包含有进程对应的信息。我们所关注的是/proc/[PID]/stat这个文件,如下图所示。
该文件中包含进程的一些运行时统计数据,比如进程名、父进程PID、用户组ID、在内核态与用户态下运行的时间、使用的物理内存和虚拟内存大小等。由此可见,通过遍历所有/proc/[PID]/stat文件,我们就可以构造出进程树以及求出Container的内存占用了。
YARN中的进程树基类为抽象类ResourceCalculatorProcessTree,并且有两种实现:Linux系统下的ProcfsBasedProcessTree、Windows系统下的WindowsBasedProcessTree。
ProcfsBasedProcessTree的实现
大致看一下ProcfsBasedProcessTree的具体实现。首先是getProcessList()方法,它负责遍历各个/proc/[PID]文件夹并获取所有进程。
static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
private List<String> getProcessList() {
String[] processDirs = (new File(procfsDir)).list();
List<String> processList = new ArrayList<String>();
for (String dir : processDirs) {
Matcher m = numberPattern.matcher(dir);
if (!m.matches()) continue;
try {
if ((new File(procfsDir, dir)).isDirectory()) {
processList.add(dir);
}
} catch (SecurityException s) {
// skip this process
}
}
return processList;
}
主要的处理逻辑都位于updateProcessTree()方法中,它负责更新进程树。因为YARN Container总是在频繁地分配与销毁,因此必须及时更新。源码较长,如下所示。
@Override
public void updateProcessTree() {
if (!pid.equals(deadPid)) {
// Get the list of processes
List<String> processList = getProcessList();
Map<String, ProcessInfo> allProcessInfo = new HashMap<String, ProcessInfo>();
// cache the processTree to get the age for processes
Map<String, ProcessInfo> oldProcs =
new HashMap<String, ProcessInfo>(processTree);
processTree.clear();
ProcessInfo me = null;
for (String proc : processList) {
// Get information for each process
ProcessInfo pInfo = new ProcessInfo(proc);
if (constructProcessInfo(pInfo, procfsDir) != null) {
allProcessInfo.put(proc, pInfo);
if (proc.equals(this.pid)) {
me = pInfo; // cache 'me'
processTree.put(proc, pInfo);
}
}
}
if (me == null) {
return;
}
// Add each process to its parent.
for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
String pID = entry.getKey();
if (!pID.equals("1")) {
ProcessInfo pInfo = entry.getValue();
ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
if (parentPInfo != null) {
parentPInfo.addChild(pInfo);
}
}
}
// now start constructing the process-tree
LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>();
pInfoQueue.addAll(me.getChildren());
while (!pInfoQueue.isEmpty()) {
ProcessInfo pInfo = pInfoQueue.remove();
if (!processTree.containsKey(pInfo.getPid())) {
processTree.put(pInfo.getPid(), pInfo);
}
pInfoQueue.addAll(pInfo.getChildren());
}
// update age values and compute the number of jiffies since last update
for (Map.Entry<String, ProcessInfo> procs : processTree.entrySet()) {
ProcessInfo oldInfo = oldProcs.get(procs.getKey());
if (procs.getValue() != null) {
procs.getValue().updateJiffy(oldInfo);
if (oldInfo != null) {
procs.getValue().updateAge(oldInfo);
}
}
}
if (LOG.isDebugEnabled()) {
// Log.debug the ProcfsBasedProcessTree
LOG.debug(this.toString());
}
if (smapsEnabled) {
// 略
}
}
}
该方法的执行流程如下:
- 调用getProcessList()方法获取到所有进程。
- 将旧的进程树缓存下来,用来增加原本存在的进程的年龄。为什么要有年龄?稍后再说。
- 遍历进程列表,调用constructProcessInfo()方法抽取stat文件中的数据,封装成ProcessInfo实例。如果在遍历过程中发现了当前Container进程(即“我”/me)的PID,就将它先行加入进程树,因为Container进程必然是进程树中的一个节点。
- 遍历已经构造出来的ProcessInfo,通过解析出来的父进程ID(即PPID)来确定进程的父子关系,并将子进程的ProcessInfo加入父进程的ProcessInfo中。
- 接下来就真正地填充进程树:将一个队列初始化填充为当前Container进程(me)的子进程ProcessInfo,然后从头到尾将这些ProcessInfo以及它们子进程对应的ProcessInfo都加入队列,并不断地将头元素加入进程树。
- 分别调用updateJiffy()与updateAge()方法,更新每个进程的运行时间以及年龄。特别地,只有进程在旧的进程树中存在时,才会更新其年龄(直接+1)。
ProcessInfo静态类的一些属性如下所示。
private static class ProcessInfo {
private String pid; // process-id
private String name; // command name
private Integer pgrpId; // process group-id
private String ppid; // parent process-id
private Integer sessionId; // session-id
private Long vmem; // virtual memory usage
private Long rssmemPage; // rss memory usage in # of pages
private Long utime = 0L; // # of jiffies in user mode
private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE);
private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode
// how many times has this process been seen alive
private int age;
// # of jiffies used since last update:
private Long dtime = 0L;
// dtime = (utime + stime) - (utimeOld + stimeOld)
// We need this to compute the cumulative CPU time
// because the subprocess may finish earlier than root process
private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
public ProcessInfo(String pid) {
this.pid = pid;
// seeing this the first time.
this.age = 1;
}
// ......
}
这样在统计内存占用时,只需要将进程树中所有ProcessInfo的对应域简单相加。举个例子,统计虚拟内存的方法getCumulativeVmem()。
@Override
public long getCumulativeVmem(int olderThanAge) {
long total = 0;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) {
total += p.getVmem();
}
}
return total;
}
内存监控ContainersMonitorImpl的实现
该类是ContainersMonitor接口的唯一实现类,其中维护了3个主要的数据结构。
final List<ContainerId> containersToBeRemoved;
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
Map<ContainerId, ProcessTreeInfo> trackingContainers =
new HashMap<ContainerId, ProcessTreeInfo>();
- containersToBeRemoved:已经完成或失败,不再需要监控的Container列表;
- containersToBeAdded:新分配的Container及其对应的进程信息映射;
- trackingContainers:目前正在监控的Container及其对应的进程信息映射。
ContainersMonitorImpl的主要功能几乎全部在一个MonitoringThread线程中实现。其源码如下,略长。
private class MonitoringThread extends Thread {
public MonitoringThread() {
super("Container Monitor");
}
@Override
public void run() {
while (true) {
// Print the processTrees for debugging.
if (LOG.isDebugEnabled()) {
// 打印日志
}
// Add new containers
synchronized (containersToBeAdded) {
for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo processTreeInfo = entry.getValue();
LOG.info("Starting resource-monitoring for " + containerId);
trackingContainers.put(containerId, processTreeInfo);
}
containersToBeAdded.clear();
}
// Remove finished containers
synchronized (containersToBeRemoved) {
for (ContainerId containerId : containersToBeRemoved) {
trackingContainers.remove(containerId);
LOG.info("Stopping resource-monitoring for " + containerId);
}
containersToBeRemoved.clear();
}
// Now do the monitoring for the trackingContainers
// Check memory usage and kill any overflowing containers
long vmemStillInUsage = 0;
long pmemStillInUsage = 0;
for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
trackingContainers.entrySet().iterator(); it.hasNext();) {
Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
try {
String pId = ptInfo.getPID();
// Initialize any uninitialized processTrees
if (pId == null) {
// get pid from ContainerId
pId = containerExecutor.getProcessId(ptInfo.getContainerId());
if (pId != null) {
// pId will be null, either if the container is not spawned yet
// or if the container's pid is removed from ContainerExecutor
LOG.debug("Tracking ProcessTree " + pId
+ " for the first time");
ResourceCalculatorProcessTree pt =
ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
ptInfo.setPid(pId);
ptInfo.setProcessTree(pt);
}
}
// End of initializing any uninitialized processTrees
if (pId == null) {
continue; // processTree cannot be tracked
}
LOG.debug("Constructing ProcessTree for : PID = " + pId
+ " ContainerId = " + containerId);
ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getCumulativeVmem();
long currentPmemUsage = pTree.getCumulativeRssmem();
// as processes begin with an age 1, we want to see if there
// are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1);
long vmemLimit = ptInfo.getVmemLimit();
long pmemLimit = ptInfo.getPmemLimit();
LOG.info(String.format(
"Memory usage of ProcessTree %s for container-id %s: ",
pId, containerId.toString()) +
formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
boolean isMemoryOverLimit = false;
String msg = "";
int containerExitStatus = ContainerExitStatus.INVALID;
if (isVmemCheckEnabled()
&& isProcessTreeOverLimit(containerId.toString(),
currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
// Container (the root process) is still alive and overflowing
// memory.
// Dump the process-tree and then clean it up.
msg = formatErrorMessage("virtual",
currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit,
pId, containerId, pTree);
isMemoryOverLimit = true;
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
} else if (isPmemCheckEnabled()
&& isProcessTreeOverLimit(containerId.toString(),
currentPmemUsage, curRssMemUsageOfAgedProcesses,
pmemLimit)) {
// Container (the root process) is still alive and overflowing
// memory.
// Dump the process-tree and then clean it up.
msg = formatErrorMessage("physical",
currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit,
pId, containerId, pTree);
isMemoryOverLimit = true;
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
}
if (isMemoryOverLimit) {
// Virtual or physical memory over limit. Fail the container and
// remove
// the corresponding process tree
LOG.warn(msg);
// warn if not a leader
if (!pTree.checkPidPgrpidForMatch()) {
LOG.error("Killed container process with PID " + pId
+ " but it is not a process group leader.");
}
// kill the container
eventDispatcher.getEventHandler().handle(
new ContainerKillEvent(containerId,
containerExitStatus, msg));
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
} else {
// Accounting the total memory in usage for all containers that
// are still
// alive and within limits.
vmemStillInUsage += currentVmemUsage;
pmemStillInUsage += currentPmemUsage;
}
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainerMemoryManager "
+ "while managing memory of " + containerId, e);
}
}
try {
Thread.sleep(monitoringInterval);
} catch (InterruptedException e) {
LOG.warn(ContainersMonitorImpl.class.getName()
+ " is interrupted. Exiting.");
break;
}
}
}
虽然长,但还是比较容易理解的。该线程会以monitoringInterval的时间间隔周期性进行监控,由参数yarn.nodemanager.container-monitor.interval-ms
来配置,默认值是3000ms。整体流程如下:
- 将新的Container加入trackingContainers,将完成的Container从trackingContainers中移除。这两步都需要加锁。
- 遍历正在跟踪的所有Container。如果进程树尚未初始化,就通过Container ID取得对应的PID,再调用getResourceCalculatorProcessTree()初始化之(其实就是构造一个ProcfsBasedProcessTree)。
- 调用前述updateProcessTree()方法更新进程树,然后从进程树取得累计的物理内存和虚拟内存占用量。
- 调用isProcessTreeOverLimit()方法,根据是否开启物理内存和虚拟内存的监控,判断内存占用是否超过限制。如是,发送ContainerKillEvent事件,杀掉该Container。
最后查看isProcessTreeOverLimit()方法的具体实现,它有助于我们理解为什么ProcessInfo需要有年龄标记。
boolean isProcessTreeOverLimit(String containerId,
long currentMemUsage,
long curMemUsageOfAgedProcesses,
long vmemLimit) {
boolean isOverLimit = false;
if (currentMemUsage > (2 * vmemLimit)) {
LOG.warn("Process tree for container: " + containerId
+ " running over twice " + "the configured limit. Limit=" + vmemLimit
+ ", current usage = " + currentMemUsage);
isOverLimit = true;
} else if (curMemUsageOfAgedProcesses > vmemLimit) {
LOG.warn("Process tree for container: " + containerId
+ " has processes older than 1 "
+ "iteration running over the configured limit. Limit=" + vmemLimit
+ ", current usage = " + curMemUsageOfAgedProcesses);
isOverLimit = true;
}
return isOverLimit;
}
由代码可以看出,进程的初始年龄都为1。当进程树中的所有进程内存占用量大于阈值的两倍,或者进程树中所有年龄大于1的进程内存占用量大于阈值,就认为已经超过限制。我们知道,Java创建子进程时最终是用了系统调用fork()与execvp(),而fork出来的子进程在启动的一瞬间,仍然继承父进程的内存使用量,所以用阈值的两倍是为了防止错误地杀掉正常的子进程。当进程年龄大于1之后,说明它们不是fork出来的子进程,所以对它们的判断只需要用正常的阈值就可以了。
YARN的内存限制参数说明
主要参数有4个:
- yarn.nodemanager.pmem-check-enabled:是否启用物理内存占用量监控;
- yarn.nodemanager.vmem-check-enabled:是否启用虚拟内存占用量监控。这两项的默认值都是true;
- yarn.nodemanager.vmem-pmem-ratio:NodeManager上虚拟内存与物理内存的使用量之比,默认为2.1。也就是说,每使用1M物理内存,最多可以使用2.1M虚拟内存;
- yarn.nodemanager.resource.memory-mb:每个NodeManager最多可以使用多少物理内存。
The End
最近生活和工作方面变动比较大,因此拖了很久没有更。预计10月就会逐渐恢复正常,还有个Spark连载搁着呢。
顺便,蝶式键盘用的久了之后手感甚至比剪刀脚键盘更好,有点意思……
晚安晚安。