Yarn资源隔离

1 概述

1.1 基本概念

** A.ResourceManager **

ResourceManager 是一个全局的资源管理器,负责整个集群的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,AppManager)。这里指进行基本介绍,部分模块单独进行深入。

  • 调度器

该调度器是一个 "纯调度器",不再参与任何与具体应用程序逻辑相关的工作,而仅根据各个应用程序的资源需求进行分配,资源分配的单位用一个资源抽象概念 "Container" 来表示。Container 封装了内存和 CPU。此外,调度器是一个可插拔的组件,用户可根据自己的需求设计新的调度器,YARN 自身提供了 Fair Scheduler 和 Capacity Scheduler。

  • 应用程序管理器

应用程序管理器负责管理整个系统中所有应用程序,包括应用程序的提交、与调度器协商资源以启动 ApplicationMaster、监控 ApplicationMaster 运行状态并在失败时重新启动它等。

** B.ApplicationMaster **

用户提交的每个 Application 都要包含一个 ApplicatioNMaster,主要功能包括:

  • 向 RM 调度器申请资源(用 Container 表示)
  • 将从 RM 分配的资源分配给 Applcation 内部的任务
  • 与 NM 通信请求 启动/停止 任务
  • 监控所有任务的运行状态,并在失败时重新为任务申请资源以重启任务

** C.NodeManager **

NM 是每个节点上的资源和任务管理器,一方面,它会定时地向 RM 汇报本节点上的资源使用情况和各个 Container 的运行状态;另一方面,它接收并处理来自 AM 的 Container 启动/停止 等各种命令。

** D.Container **

Container 是 YARN 中资源抽象,它封装了某个节点上的内存和 CPU,当 AM 向 RM 申请资源时,RM 为 AM 返回的资源便是用 Container 表示的。YARN 是使用轻量级资源隔离机制 Cgroups 进行资源隔离的。

1.2 YARN通信协议

在 YARN 中,任何两个需要相互通信的组件之间仅有一个RPC协议,而对于任何一个RPC协议,通信双方有一端是Client,另一端是Server,且Client总是主动连接Server。YARN 中有以下几个主要的RPC协议:

  • 1.JobClient与RM之间的协议:ApplicationClientProtocol,JobClient 通过该 RPC 协议提交应用程序、查询应用程序状态等
  • 2.Admin(管理员)与RM之间的协议:ResourceManagerAdministrationProtocol,Admin通过该RPC协议更新系统配置文件,比如节点黑白名单、用户队列、权限等
  • 3.AM与RM之间的协议:ApplicationMasterProtocol,AM 通过该RPC协议向RM注册并撤销自己,并为各个人物申请资源
  • 4.NM与RM之间的协议:ResourceTracker,NM通过该协议向RM注册,并定时发送心跳信息汇报当前节点的资源使用情况和Container运行情况,并接收来自AM的命令
  • 5.AM与NM 之间的协议:ContainerManagermentProtocol,AM通过该RPC协议要求NM启动或者停止Container

Yarn工作流程如下:


yarn工作流程.png

说明:

1.Client 向 YARN 提交应用程序,其中包括 ApplicationMaster 程序及启动 ApplicationMaster 的命令
2.ResourceManager 为该 ApplicationMaster 分配第一个 Container,并与对应的 NodeManager 通信,要求它在这个 Container 中启动应用程序的 ApplicationMaster
3.ApplicationMaster 首先向 ResourceManager 注册
4.ApplicationMaster 为 Application 的任务申请并领取资源
5.领取到资源后,要求对应的 NodeManager 在 Container 中启动任务
6.NodeManager收到ApplicationMaster的请求后,为任务设置好运行环境(包括环境变量、JAR 包、二进制程序等),将任务启动脚本写到一个脚本中,并通过运行该脚本启动任务
7.各个任务通过 RPC 协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在失败时重启任务
8.应用程序完成后,ApplicationMaster向ResourceManager注销并关闭自己

2 关键代码流程

客户端与RM进行交换(Client<-->ResourceManager),首先需要创建一个YarnClient

YarnClient yarnClient = YarnClient.createYarnClient();
 yarnClient.init(conf);
 yarnClient.start();

完整如下:

package com.hortonworks.simpleyarnapp;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;


public class Client {

  Configuration conf = new YarnConfiguration();
  
  public void run(String[] args) throws Exception {
    final String command = args[0];
    final int n = Integer.valueOf(args[1]);
    final Path jarPath = new Path(args[2]);

    // Create yarnClient
    YarnConfiguration conf = new YarnConfiguration();
    YarnClient yarnClient = YarnClient.createYarnClient();
    yarnClient.init(conf);
    yarnClient.start();
    
    // Create application via yarnClient
    YarnClientApplication app = yarnClient.createApplication();

    // Set up the container launch context for the application master
    ContainerLaunchContext amContainer = 
        Records.newRecord(ContainerLaunchContext.class);
    amContainer.setCommands(
        Collections.singletonList(
            "$JAVA_HOME/bin/java" +
            " -Xmx256M" +
            " com.hortonworks.simpleyarnapp.ApplicationMaster" +
            " " + command +
            " " + String.valueOf(n) +
            " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + 
            " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" 
            )
        );
    
    // Setup jar for ApplicationMaster
    LocalResource appMasterJar = Records.newRecord(LocalResource.class);
    setupAppMasterJar(jarPath, appMasterJar);
    amContainer.setLocalResources(
        Collections.singletonMap("simpleapp.jar", appMasterJar));

    // Setup CLASSPATH for ApplicationMaster
    Map<String, String> appMasterEnv = new HashMap<String, String>();
    setupAppMasterEnv(appMasterEnv);
    amContainer.setEnvironment(appMasterEnv);
    
    // Set up resource type requirements for ApplicationMaster
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(256);
    capability.setVirtualCores(1);

    // Finally, set-up ApplicationSubmissionContext for the application
    ApplicationSubmissionContext appContext = 
    app.getApplicationSubmissionContext();
    appContext.setApplicationName("simple-yarn-app"); // application name
    appContext.setAMContainerSpec(amContainer);
    appContext.setResource(capability);
    appContext.setQueue("default"); // queue 

    // Submit application
    ApplicationId appId = appContext.getApplicationId();
    System.out.println("Submitting application " + appId);
    yarnClient.submitApplication(appContext);
    
    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
    YarnApplicationState appState = appReport.getYarnApplicationState();
    while (appState != YarnApplicationState.FINISHED && 
           appState != YarnApplicationState.KILLED && 
           appState != YarnApplicationState.FAILED) {
      Thread.sleep(100);
      appReport = yarnClient.getApplicationReport(appId);
      appState = appReport.getYarnApplicationState();
    }
    
    System.out.println(
        "Application " + appId + " finished with" +
            " state " + appState + 
            " at " + appReport.getFinishTime());

  }
  
  private void setupAppMasterJar(Path jarPath, LocalResource appMasterJar) throws IOException {
    FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
    appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
    appMasterJar.setSize(jarStat.getLen());
    appMasterJar.setTimestamp(jarStat.getModificationTime());
    appMasterJar.setType(LocalResourceType.FILE);
    appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
  }
  
  private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
    for (String c : conf.getStrings(
        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
      Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
          c.trim());
    }
    Apps.addToEnvironment(appMasterEnv,
        Environment.CLASSPATH.name(),
        Environment.PWD.$() + File.separator + "*");
  }
  
  public static void main(String[] args) throws Exception {
    Client c = new Client();
    c.run(args);
  }
}

ApplicatioNMaster与ResourceManager的交换(ApplicationMater<-->ResourceManager),需要创建AMRMClientAsync/AMRMClient,对应处理事件为AMRMClientAsync.CallbackHandler/AMRMClient.CallbackHandler

AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
rmClient.init(conf);
rmClient.start();

ApplicationMater在启动任务执行时的container,需要直接与NodeManager交互(ApplicationMaster<-->NodeManager),需要创建NMClientAsync/NMClient对象,处理事件通过NMClientAsync.CallbackHandler/NMClient.CallbackHandler

完整源码如下:

package com.hortonworks.simpleyarnapp;

import java.util.Collections;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;

public class ApplicationMaster {

  public static void main(String[] args) throws Exception {

    final String command = args[0];
    final int n = Integer.valueOf(args[1]);
    
    // Initialize clients to ResourceManager and NodeManagers
    Configuration conf = new YarnConfiguration();

    AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
    rmClient.init(conf);
    rmClient.start();

    NMClient nmClient = NMClient.createNMClient();
    nmClient.init(conf);
    nmClient.start();

    // Register with ResourceManager
    System.out.println("registerApplicationMaster 0");
    rmClient.registerApplicationMaster("", 0, "");
    System.out.println("registerApplicationMaster 1");
    
    // Priority for worker containers - priorities are intra-application
    Priority priority = Records.newRecord(Priority.class);
    priority.setPriority(0);

    // Resource requirements for worker containers
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(128);
    capability.setVirtualCores(1);

    // Make container requests to ResourceManager
    for (int i = 0; i < n; ++i) {
      ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority);
      System.out.println("Making res-req " + i);
      rmClient.addContainerRequest(containerAsk);
    }

    // Obtain allocated containers, launch and check for responses
    int responseId = 0;
    int completedContainers = 0;
    while (completedContainers < n) {
        AllocateResponse response = rmClient.allocate(responseId++);
        for (Container container : response.getAllocatedContainers()) {
            // Launch container by create ContainerLaunchContext
            ContainerLaunchContext ctx =
                    Records.newRecord(ContainerLaunchContext.class);
            ctx.setCommands(
                    Collections.singletonList(
                    //实际执行的内容
                            command +
                                    " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
                                    " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
                    ));
            System.out.println("Launching container " + container.getId());
            nmClient.startContainer(container, ctx);
        }
        for (ContainerStatus status : response.getCompletedContainersStatuses()) {
            ++completedContainers;
            System.out.println("Completed container " + status.getContainerId());
        }
        Thread.sleep(100);
    }

    // Un-register with ResourceManager
    rmClient.unregisterApplicationMaster(
        FinalApplicationStatus.SUCCEEDED, "", "");
  }
}

3 Container实现

当NodeManager收到AM的启动Container事件时,执行ContainerLauncher,然后在其中执行具体的ContainerExecutor(DefaultContainerExecutor/LinuxContainerExecutor),针对Container中资源监控时传入了Executor对象获取相关PID。

public ContainersMonitorImpl(ContainerExecutor exec,
      AsyncDispatcher dispatcher, Context context) {
    super("containers-monitor");

    this.containerExecutor = exec;
    this.eventDispatcher = dispatcher;
    this.context = context;

    this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
    this.containersToBeRemoved = new ArrayList<ContainerId>();
    this.monitoringThread = new MonitoringThread();
  }

3.1 资源隔离

默认情况下,YARN采用了线程监控的方法判断任务是否超量使用内存,一旦发生超量,则直接将其杀死。由于Cgroups对内存的控制缺乏灵活性(即任务任何时刻不能超过内存上限,如果超过,则直接将其杀死或者报OOM),而Java进程在创建瞬间内存将翻倍,之后骤降到正常值,这种情况下,采用线程监控的方式更加灵活(当发现进程树内存瞬间翻倍超过设定值时,可认为是正常现象,不会将任务杀死)。

  • 构造进程树

在YARN中每个Container可以认为是一个独立的进程,同时,Container也可能会创建子进程(可能会创建多个子进程,这些子进程可能也会创建子进程),因此Container进程的内存(物理内存、虚拟内存)使用量应该表示为:以Container进程为根的进程树中所有进程的内存(物理内存、虚拟内存)使用总量。
在Linux系统中的 /proc目录下,有大量以整数命名的目录,这些整数是某个正在运行的进程的PID,如下图所示:


进程信息.png

而目录/proc/<PID>下面的那些文件分别表示着进程运行时的各方面信息

PID内容.png

YARN只关心/proc/<PID>/stat文件。

打开一个/proc/<PID>/stat文件你会发现它仅有一行(多列)文本:

60717 (rpc.statd) S 1 60717 60717 0 -1 4202816 1262 0 0 0 3 0 0 0 20 0 1 0 412627997 75718656 12987 18446744073709551615 139780868575232 139780868643356 140737412651392 140737412650712 139780856985091 0 0 16846848 18947 18446744071580587289 0 0 17 6 0 0 0 0 0

里面记录了该进程相关的信息,可以通过正则表达式

^([\\d-]+)\\s\\(([^)]+)\\)\\s[^\\s]\\s([\\d-]+)\\s([\\d-]+)\\s+([\\d-]+)\\s([\\d-]+\\s){7}(\\d+)\\s(\\d+)\\s([\\d-]+\\s){7}(\\d+)\\s +(\\d+)(\\s[\\d-]+){15}

从中抽取进程的运行时信息,包括:进程名称、父进程PID、父进程用户组ID、Session ID在用户态运行的时间(单位:jiffies)、核心态运行的时间(单位:jiffies)、占用虚拟内存大小(单位:page)和占用物理内存大小(单位:page)等。

文件/proc/<PID>/stat中包含的内存大小单位为page,为了获取以字节为单位的内存信息,可通过执行以下Shell脚本获取每个page对应的内存量(单位:B):

[root@lark001 60717]# getconf  PAGESIZE
4096

通过对以上信息的计算可以得到当前每个运行的Container使用的内存总量。在ContainersMonitorImpl内部维护着每个Container进程的PID,通过遍历/proc下各个进程的stat文件内容(父进程PID、占用虚拟内存大小和占用物理内存大小),YARN构建出每个Container的进程树,从而得出每个进程树的虚拟内存、物理内存使用总量。后期针对该信息会定时刷新。

  • 判断内存

在获取到每一个Container之后已经可以获得各个Container进程树的内存(物理内存、虚拟内存)使用量,YARN并不会仅凭进程树的内存使用量(物理内存、虚拟内存)是否超过上限值就决定是否“杀死”一个Container,因为“子进程”的内存使用量是有“波动”的(Java中创建子进程采用了fork()+exec()的方案,子进程启动瞬间,它使用的内存量与父进程一致,从外面看来,一个进程使用内存量可能瞬间翻倍,然后又降下来),为了避免“误杀”的情况出现,Hadoop赋予每个进程“年龄”属性,并规定刚启动进程的年龄是1,MonitoringThread线程每更新一次,各个进程的年龄加一,在此基础上,选择被“杀死”的Container的标准如下:如果一个Contaier对应的进程树中所有进程(年龄大于0)总内存(物理内存或虚拟内存)使用量超过上限值的两倍;或者所有年龄大于1的进程总内存(物理内存或虚拟内存)使用量超过上限值,则认为该Container使用内存超量,则Container发送ContainerEventType.KILL_CONTAINER事件将其KILL掉。

  • CPU资源隔离

默认情况下,NodeManager未启用任何对CPU资源的隔离机制,如果需要启用该机制需使用LinuxContainerExecutor,它能够以应用程序提交者的身份创建文件、运行Container和销毁Container。相比于DefaultContainerExecutor采用NodeManager启动者的身份执行这些操作,LinuxContainerExecutor具有更高的安全性。

LinuxContainerExecutor的核心设计思想是赋予NodeManager启动者以root权限,进而使其拥有足够的权限以任意用户身份执行一些操作,从而使得NodeManager执行者可以将Container使用的目录和文件的拥有者修改为应用程序的提交者,并以应用程序提交者的身份运行Container,防止所有Container以NodeManager执行者身份运行进而带来的各种风险。
上述机制的实现在YARN的NodeManager采用C语言实现了一个setuid功能的工具container-executor,其位于如下目录:

nm监控脚本.png

该脚本拥有root权限,可以完成任意操作,其可执行脚本位于:/opt/yarn/hadoop/bin/container-executor。YARN正是通过该脚本创建出Cgroups层级树以及完成Cgroups属性设置等操作。具体cgroups详细讲解参考引用部分文章。

References

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容