深入了解YARN如何做内存资源隔离

前言

实话实说,YARN之前并不是我特别擅长的领域,只是平时常用而已,但深入了解一番之后,发现还是蛮有意思的。YARN作为资源管理框架,资源调度和资源隔离就是它的两大基本任务,本文谈谈它如何实现内存资源的隔离。

下图就是写本文的起因,上个月在Flink社区群里班门弄斧回答了一个小问题。

好孩几要打码。蓝色方块是我嗯

这也符合笔者在前面的一篇文章中提到过的:

(Flink)任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,亦即在调度层面认为每个任务槽“应该”得到taskmanager.heap.size的N分之一大小的内存。CPU资源不算在内。

YARN Container之间同样默认只做内存隔离,具体逻辑在YARN的源码中有直接的体现。CPU隔离则是间接通过与Linux cgroup相关的高级参数来配置,这个就是后话了。

YARN的内存隔离本质上是监控内存的使用量。在真正读代码之前,先得来看进程树的相关细节。

/proc及进程树

YARN管理资源的基本单位是Container,而Container本质上就是Java进程。进程通常都会用fork的方式创建子进程,子进程也如是。因此单个Container的内存使用量应该是以Container进程为根的进程树中所有进程的内存使用量之和

以Linux系统为例,观察/proc目录,如下图。

/proc目录是Linux系统中内核信息虚拟文件系统(procfs)的挂载点。其中有许多以PID命名的目录,这些目录中则包含有进程对应的信息。我们所关注的是/proc/[PID]/stat这个文件,如下图所示。

该文件中包含进程的一些运行时统计数据,比如进程名、父进程PID、用户组ID、在内核态与用户态下运行的时间、使用的物理内存和虚拟内存大小等。由此可见,通过遍历所有/proc/[PID]/stat文件,我们就可以构造出进程树以及求出Container的内存占用了。

YARN中的进程树基类为抽象类ResourceCalculatorProcessTree,并且有两种实现:Linux系统下的ProcfsBasedProcessTree、Windows系统下的WindowsBasedProcessTree。

ProcfsBasedProcessTree的实现

大致看一下ProcfsBasedProcessTree的具体实现。首先是getProcessList()方法,它负责遍历各个/proc/[PID]文件夹并获取所有进程。

  static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");

  private List<String> getProcessList() {
    String[] processDirs = (new File(procfsDir)).list();
    List<String> processList = new ArrayList<String>();

    for (String dir : processDirs) {
      Matcher m = numberPattern.matcher(dir);
      if (!m.matches()) continue;
      try {
        if ((new File(procfsDir, dir)).isDirectory()) {
          processList.add(dir);
        }
      } catch (SecurityException s) {
        // skip this process
      }
    }
    return processList;
  }

主要的处理逻辑都位于updateProcessTree()方法中,它负责更新进程树。因为YARN Container总是在频繁地分配与销毁,因此必须及时更新。源码较长,如下所示。

  @Override
  public void updateProcessTree() {
    if (!pid.equals(deadPid)) {
      // Get the list of processes
      List<String> processList = getProcessList();
      Map<String, ProcessInfo> allProcessInfo = new HashMap<String, ProcessInfo>();

      // cache the processTree to get the age for processes
      Map<String, ProcessInfo> oldProcs =
              new HashMap<String, ProcessInfo>(processTree);
      processTree.clear();

      ProcessInfo me = null;
      for (String proc : processList) {
        // Get information for each process
        ProcessInfo pInfo = new ProcessInfo(proc);
        if (constructProcessInfo(pInfo, procfsDir) != null) {
          allProcessInfo.put(proc, pInfo);
          if (proc.equals(this.pid)) {
            me = pInfo; // cache 'me'
            processTree.put(proc, pInfo);
          }
        }
      }

      if (me == null) {
        return;
      }

      // Add each process to its parent.
      for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
        String pID = entry.getKey();
        if (!pID.equals("1")) {
          ProcessInfo pInfo = entry.getValue();
          ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
          if (parentPInfo != null) {
            parentPInfo.addChild(pInfo);
          }
        }
      }

      // now start constructing the process-tree
      LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>();
      pInfoQueue.addAll(me.getChildren());
      while (!pInfoQueue.isEmpty()) {
        ProcessInfo pInfo = pInfoQueue.remove();
        if (!processTree.containsKey(pInfo.getPid())) {
          processTree.put(pInfo.getPid(), pInfo);
        }
        pInfoQueue.addAll(pInfo.getChildren());
      }

      // update age values and compute the number of jiffies since last update
      for (Map.Entry<String, ProcessInfo> procs : processTree.entrySet()) {
        ProcessInfo oldInfo = oldProcs.get(procs.getKey());
        if (procs.getValue() != null) {
          procs.getValue().updateJiffy(oldInfo);
          if (oldInfo != null) {
            procs.getValue().updateAge(oldInfo);
          }
        }
      }

      if (LOG.isDebugEnabled()) {
        // Log.debug the ProcfsBasedProcessTree
        LOG.debug(this.toString());
      }
      if (smapsEnabled) {
        // 略
      }
    }
  }

该方法的执行流程如下:

  1. 调用getProcessList()方法获取到所有进程。
  2. 将旧的进程树缓存下来,用来增加原本存在的进程的年龄。为什么要有年龄?稍后再说。
  3. 遍历进程列表,调用constructProcessInfo()方法抽取stat文件中的数据,封装成ProcessInfo实例。如果在遍历过程中发现了当前Container进程(即“我”/me)的PID,就将它先行加入进程树,因为Container进程必然是进程树中的一个节点。
  4. 遍历已经构造出来的ProcessInfo,通过解析出来的父进程ID(即PPID)来确定进程的父子关系,并将子进程的ProcessInfo加入父进程的ProcessInfo中。
  5. 接下来就真正地填充进程树:将一个队列初始化填充为当前Container进程(me)的子进程ProcessInfo,然后从头到尾将这些ProcessInfo以及它们子进程对应的ProcessInfo都加入队列,并不断地将头元素加入进程树。
  6. 分别调用updateJiffy()与updateAge()方法,更新每个进程的运行时间以及年龄。特别地,只有进程在旧的进程树中存在时,才会更新其年龄(直接+1)。

ProcessInfo静态类的一些属性如下所示。

  private static class ProcessInfo {
    private String pid; // process-id
    private String name; // command name
    private Integer pgrpId; // process group-id
    private String ppid; // parent process-id
    private Integer sessionId; // session-id
    private Long vmem; // virtual memory usage
    private Long rssmemPage; // rss memory usage in # of pages
    private Long utime = 0L; // # of jiffies in user mode
    private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE);
    private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode
    // how many times has this process been seen alive
    private int age;

    // # of jiffies used since last update:
    private Long dtime = 0L;
    // dtime = (utime + stime) - (utimeOld + stimeOld)
    // We need this to compute the cumulative CPU time
    // because the subprocess may finish earlier than root process

    private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children

    public ProcessInfo(String pid) {
      this.pid = pid;
      // seeing this the first time.
      this.age = 1;
    }
    // ......
  }

这样在统计内存占用时,只需要将进程树中所有ProcessInfo的对应域简单相加。举个例子,统计虚拟内存的方法getCumulativeVmem()。

  @Override
  public long getCumulativeVmem(int olderThanAge) {
    long total = 0;
    for (ProcessInfo p : processTree.values()) {
      if ((p != null) && (p.getAge() > olderThanAge)) {
        total += p.getVmem();
      }
    }
    return total;
  }

内存监控ContainersMonitorImpl的实现

该类是ContainersMonitor接口的唯一实现类,其中维护了3个主要的数据结构。

  final List<ContainerId> containersToBeRemoved;
  final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
  Map<ContainerId, ProcessTreeInfo> trackingContainers =
      new HashMap<ContainerId, ProcessTreeInfo>();
  • containersToBeRemoved:已经完成或失败,不再需要监控的Container列表;
  • containersToBeAdded:新分配的Container及其对应的进程信息映射;
  • trackingContainers:目前正在监控的Container及其对应的进程信息映射。

ContainersMonitorImpl的主要功能几乎全部在一个MonitoringThread线程中实现。其源码如下,略长。

  private class MonitoringThread extends Thread {
    public MonitoringThread() {
      super("Container Monitor");
    }

    @Override
    public void run() {
      while (true) {
        // Print the processTrees for debugging.
        if (LOG.isDebugEnabled()) {
          // 打印日志
        }

        // Add new containers
        synchronized (containersToBeAdded) {
          for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
              .entrySet()) {
            ContainerId containerId = entry.getKey();
            ProcessTreeInfo processTreeInfo = entry.getValue();
            LOG.info("Starting resource-monitoring for " + containerId);
            trackingContainers.put(containerId, processTreeInfo);
          }
          containersToBeAdded.clear();
        }

        // Remove finished containers
        synchronized (containersToBeRemoved) {
          for (ContainerId containerId : containersToBeRemoved) {
            trackingContainers.remove(containerId);
            LOG.info("Stopping resource-monitoring for " + containerId);
          }
          containersToBeRemoved.clear();
        }

        // Now do the monitoring for the trackingContainers
        // Check memory usage and kill any overflowing containers
        long vmemStillInUsage = 0;
        long pmemStillInUsage = 0;
        for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
            trackingContainers.entrySet().iterator(); it.hasNext();) {

          Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
          ContainerId containerId = entry.getKey();
          ProcessTreeInfo ptInfo = entry.getValue();
          try {
            String pId = ptInfo.getPID();

            // Initialize any uninitialized processTrees
            if (pId == null) {
              // get pid from ContainerId
              pId = containerExecutor.getProcessId(ptInfo.getContainerId());
              if (pId != null) {
                // pId will be null, either if the container is not spawned yet
                // or if the container's pid is removed from ContainerExecutor
                LOG.debug("Tracking ProcessTree " + pId
                    + " for the first time");

                ResourceCalculatorProcessTree pt =
                    ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
                ptInfo.setPid(pId);
                ptInfo.setProcessTree(pt);
              }
            }
            // End of initializing any uninitialized processTrees

            if (pId == null) {
              continue; // processTree cannot be tracked
            }

            LOG.debug("Constructing ProcessTree for : PID = " + pId
                + " ContainerId = " + containerId);
            ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
            pTree.updateProcessTree();    // update process-tree
            long currentVmemUsage = pTree.getCumulativeVmem();
            long currentPmemUsage = pTree.getCumulativeRssmem();
            // as processes begin with an age 1, we want to see if there
            // are processes more than 1 iteration old.
            long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
            long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1);
            long vmemLimit = ptInfo.getVmemLimit();
            long pmemLimit = ptInfo.getPmemLimit();
            LOG.info(String.format(
                "Memory usage of ProcessTree %s for container-id %s: ",
                     pId, containerId.toString()) +
                formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));

            boolean isMemoryOverLimit = false;
            String msg = "";
            int containerExitStatus = ContainerExitStatus.INVALID;
            if (isVmemCheckEnabled()
                && isProcessTreeOverLimit(containerId.toString(),
                    currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
              // Container (the root process) is still alive and overflowing
              // memory.
              // Dump the process-tree and then clean it up.
              msg = formatErrorMessage("virtual",
                  currentVmemUsage, vmemLimit,
                  currentPmemUsage, pmemLimit,
                  pId, containerId, pTree);
              isMemoryOverLimit = true;
              containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
            } else if (isPmemCheckEnabled()
                && isProcessTreeOverLimit(containerId.toString(),
                    currentPmemUsage, curRssMemUsageOfAgedProcesses,
                    pmemLimit)) {
              // Container (the root process) is still alive and overflowing
              // memory.
              // Dump the process-tree and then clean it up.
              msg = formatErrorMessage("physical",
                  currentVmemUsage, vmemLimit,
                  currentPmemUsage, pmemLimit,
                  pId, containerId, pTree);
              isMemoryOverLimit = true;
              containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
            }

            if (isMemoryOverLimit) {
              // Virtual or physical memory over limit. Fail the container and
              // remove
              // the corresponding process tree
              LOG.warn(msg);
              // warn if not a leader
              if (!pTree.checkPidPgrpidForMatch()) {
                LOG.error("Killed container process with PID " + pId
                    + " but it is not a process group leader.");
              }
              // kill the container
              eventDispatcher.getEventHandler().handle(
                  new ContainerKillEvent(containerId,
                      containerExitStatus, msg));
              it.remove();
              LOG.info("Removed ProcessTree with root " + pId);
            } else {
              // Accounting the total memory in usage for all containers that
              // are still
              // alive and within limits.
              vmemStillInUsage += currentVmemUsage;
              pmemStillInUsage += currentPmemUsage;
            }
          } catch (Exception e) {
            // Log the exception and proceed to the next container.
            LOG.warn("Uncaught exception in ContainerMemoryManager "
                + "while managing memory of " + containerId, e);
          }
        }

        try {
          Thread.sleep(monitoringInterval);
        } catch (InterruptedException e) {
          LOG.warn(ContainersMonitorImpl.class.getName()
              + " is interrupted. Exiting.");
          break;
        }
      }
    }

虽然长,但还是比较容易理解的。该线程会以monitoringInterval的时间间隔周期性进行监控,由参数yarn.nodemanager.container-monitor.interval-ms来配置,默认值是3000ms。整体流程如下:

  1. 将新的Container加入trackingContainers,将完成的Container从trackingContainers中移除。这两步都需要加锁。
  2. 遍历正在跟踪的所有Container。如果进程树尚未初始化,就通过Container ID取得对应的PID,再调用getResourceCalculatorProcessTree()初始化之(其实就是构造一个ProcfsBasedProcessTree)。
  3. 调用前述updateProcessTree()方法更新进程树,然后从进程树取得累计的物理内存和虚拟内存占用量。
  4. 调用isProcessTreeOverLimit()方法,根据是否开启物理内存和虚拟内存的监控,判断内存占用是否超过限制。如是,发送ContainerKillEvent事件,杀掉该Container。

最后查看isProcessTreeOverLimit()方法的具体实现,它有助于我们理解为什么ProcessInfo需要有年龄标记。

  boolean isProcessTreeOverLimit(String containerId,
                                  long currentMemUsage,
                                  long curMemUsageOfAgedProcesses,
                                  long vmemLimit) {
    boolean isOverLimit = false;

    if (currentMemUsage > (2 * vmemLimit)) {
      LOG.warn("Process tree for container: " + containerId
          + " running over twice " + "the configured limit. Limit=" + vmemLimit
          + ", current usage = " + currentMemUsage);
      isOverLimit = true;
    } else if (curMemUsageOfAgedProcesses > vmemLimit) {
      LOG.warn("Process tree for container: " + containerId
          + " has processes older than 1 "
          + "iteration running over the configured limit. Limit=" + vmemLimit
          + ", current usage = " + curMemUsageOfAgedProcesses);
      isOverLimit = true;
    }

    return isOverLimit;
  }

由代码可以看出,进程的初始年龄都为1。当进程树中的所有进程内存占用量大于阈值的两倍,或者进程树中所有年龄大于1的进程内存占用量大于阈值,就认为已经超过限制。我们知道,Java创建子进程时最终是用了系统调用fork()与execvp(),而fork出来的子进程在启动的一瞬间,仍然继承父进程的内存使用量,所以用阈值的两倍是为了防止错误地杀掉正常的子进程。当进程年龄大于1之后,说明它们不是fork出来的子进程,所以对它们的判断只需要用正常的阈值就可以了。

YARN的内存限制参数说明

主要参数有4个:

  • yarn.nodemanager.pmem-check-enabled:是否启用物理内存占用量监控;
  • yarn.nodemanager.vmem-check-enabled:是否启用虚拟内存占用量监控。这两项的默认值都是true;
  • yarn.nodemanager.vmem-pmem-ratio:NodeManager上虚拟内存与物理内存的使用量之比,默认为2.1。也就是说,每使用1M物理内存,最多可以使用2.1M虚拟内存;
  • yarn.nodemanager.resource.memory-mb:每个NodeManager最多可以使用多少物理内存。

The End

最近生活和工作方面变动比较大,因此拖了很久没有更。预计10月就会逐渐恢复正常,还有个Spark连载搁着呢。

顺便,蝶式键盘用的久了之后手感甚至比剪刀脚键盘更好,有点意思……

晚安晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容