1 概述
1.1 基本概念
** A.ResourceManager **
ResourceManager 是一个全局的资源管理器,负责整个集群的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,AppManager)。这里指进行基本介绍,部分模块单独进行深入。
- 调度器
该调度器是一个 "纯调度器",不再参与任何与具体应用程序逻辑相关的工作,而仅根据各个应用程序的资源需求进行分配,资源分配的单位用一个资源抽象概念 "Container" 来表示。Container 封装了内存和 CPU。此外,调度器是一个可插拔的组件,用户可根据自己的需求设计新的调度器,YARN 自身提供了 Fair Scheduler 和 Capacity Scheduler。
- 应用程序管理器
应用程序管理器负责管理整个系统中所有应用程序,包括应用程序的提交、与调度器协商资源以启动 ApplicationMaster、监控 ApplicationMaster 运行状态并在失败时重新启动它等。
** B.ApplicationMaster **
用户提交的每个 Application 都要包含一个 ApplicatioNMaster,主要功能包括:
- 向 RM 调度器申请资源(用 Container 表示)
- 将从 RM 分配的资源分配给 Applcation 内部的任务
- 与 NM 通信请求 启动/停止 任务
- 监控所有任务的运行状态,并在失败时重新为任务申请资源以重启任务
** C.NodeManager **
NM 是每个节点上的资源和任务管理器,一方面,它会定时地向 RM 汇报本节点上的资源使用情况和各个 Container 的运行状态;另一方面,它接收并处理来自 AM 的 Container 启动/停止 等各种命令。
** D.Container **
Container 是 YARN 中资源抽象,它封装了某个节点上的内存和 CPU,当 AM 向 RM 申请资源时,RM 为 AM 返回的资源便是用 Container 表示的。YARN 是使用轻量级资源隔离机制 Cgroups 进行资源隔离的。
1.2 YARN通信协议
在 YARN 中,任何两个需要相互通信的组件之间仅有一个RPC协议,而对于任何一个RPC协议,通信双方有一端是Client,另一端是Server,且Client总是主动连接Server。YARN 中有以下几个主要的RPC协议:
- 1.JobClient与RM之间的协议:ApplicationClientProtocol,JobClient 通过该 RPC 协议提交应用程序、查询应用程序状态等
- 2.Admin(管理员)与RM之间的协议:ResourceManagerAdministrationProtocol,Admin通过该RPC协议更新系统配置文件,比如节点黑白名单、用户队列、权限等
- 3.AM与RM之间的协议:ApplicationMasterProtocol,AM 通过该RPC协议向RM注册并撤销自己,并为各个人物申请资源
- 4.NM与RM之间的协议:ResourceTracker,NM通过该协议向RM注册,并定时发送心跳信息汇报当前节点的资源使用情况和Container运行情况,并接收来自AM的命令
- 5.AM与NM 之间的协议:ContainerManagermentProtocol,AM通过该RPC协议要求NM启动或者停止Container
Yarn工作流程如下:
说明:
1.Client 向 YARN 提交应用程序,其中包括 ApplicationMaster 程序及启动 ApplicationMaster 的命令
2.ResourceManager 为该 ApplicationMaster 分配第一个 Container,并与对应的 NodeManager 通信,要求它在这个 Container 中启动应用程序的 ApplicationMaster
3.ApplicationMaster 首先向 ResourceManager 注册
4.ApplicationMaster 为 Application 的任务申请并领取资源
5.领取到资源后,要求对应的 NodeManager 在 Container 中启动任务
6.NodeManager收到ApplicationMaster的请求后,为任务设置好运行环境(包括环境变量、JAR 包、二进制程序等),将任务启动脚本写到一个脚本中,并通过运行该脚本启动任务
7.各个任务通过 RPC 协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在失败时重启任务
8.应用程序完成后,ApplicationMaster向ResourceManager注销并关闭自己
2 关键代码流程
客户端与RM进行交换(Client<-->ResourceManager),首先需要创建一个YarnClient
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
完整如下:
package com.hortonworks.simpleyarnapp;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
public class Client {
Configuration conf = new YarnConfiguration();
public void run(String[] args) throws Exception {
final String command = args[0];
final int n = Integer.valueOf(args[1]);
final Path jarPath = new Path(args[2]);
// Create yarnClient
YarnConfiguration conf = new YarnConfiguration();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
// Set up the container launch context for the application master
ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
amContainer.setCommands(
Collections.singletonList(
"$JAVA_HOME/bin/java" +
" -Xmx256M" +
" com.hortonworks.simpleyarnapp.ApplicationMaster" +
" " + command +
" " + String.valueOf(n) +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
);
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
setupAppMasterJar(jarPath, appMasterJar);
amContainer.setLocalResources(
Collections.singletonMap("simpleapp.jar", appMasterJar));
// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
setupAppMasterEnv(appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(256);
capability.setVirtualCores(1);
// Finally, set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext =
app.getApplicationSubmissionContext();
appContext.setApplicationName("simple-yarn-app"); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue("default"); // queue
// Submit application
ApplicationId appId = appContext.getApplicationId();
System.out.println("Submitting application " + appId);
yarnClient.submitApplication(appContext);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.FAILED) {
Thread.sleep(100);
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
}
System.out.println(
"Application " + appId + " finished with" +
" state " + appState +
" at " + appReport.getFinishTime());
}
private void setupAppMasterJar(Path jarPath, LocalResource appMasterJar) throws IOException {
FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
appMasterJar.setSize(jarStat.getLen());
appMasterJar.setTimestamp(jarStat.getModificationTime());
appMasterJar.setType(LocalResourceType.FILE);
appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
}
private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
c.trim());
}
Apps.addToEnvironment(appMasterEnv,
Environment.CLASSPATH.name(),
Environment.PWD.$() + File.separator + "*");
}
public static void main(String[] args) throws Exception {
Client c = new Client();
c.run(args);
}
}
ApplicatioNMaster与ResourceManager的交换(ApplicationMater<-->ResourceManager),需要创建AMRMClientAsync/AMRMClient,对应处理事件为AMRMClientAsync.CallbackHandler/AMRMClient.CallbackHandler
AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
rmClient.init(conf);
rmClient.start();
ApplicationMater在启动任务执行时的container,需要直接与NodeManager交互(ApplicationMaster<-->NodeManager),需要创建NMClientAsync/NMClient对象,处理事件通过NMClientAsync.CallbackHandler/NMClient.CallbackHandler
完整源码如下:
package com.hortonworks.simpleyarnapp;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
public class ApplicationMaster {
public static void main(String[] args) throws Exception {
final String command = args[0];
final int n = Integer.valueOf(args[1]);
// Initialize clients to ResourceManager and NodeManagers
Configuration conf = new YarnConfiguration();
AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
rmClient.init(conf);
rmClient.start();
NMClient nmClient = NMClient.createNMClient();
nmClient.init(conf);
nmClient.start();
// Register with ResourceManager
System.out.println("registerApplicationMaster 0");
rmClient.registerApplicationMaster("", 0, "");
System.out.println("registerApplicationMaster 1");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
// Resource requirements for worker containers
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(128);
capability.setVirtualCores(1);
// Make container requests to ResourceManager
for (int i = 0; i < n; ++i) {
ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority);
System.out.println("Making res-req " + i);
rmClient.addContainerRequest(containerAsk);
}
// Obtain allocated containers, launch and check for responses
int responseId = 0;
int completedContainers = 0;
while (completedContainers < n) {
AllocateResponse response = rmClient.allocate(responseId++);
for (Container container : response.getAllocatedContainers()) {
// Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx =
Records.newRecord(ContainerLaunchContext.class);
ctx.setCommands(
Collections.singletonList(
//实际执行的内容
command +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
));
System.out.println("Launching container " + container.getId());
nmClient.startContainer(container, ctx);
}
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
++completedContainers;
System.out.println("Completed container " + status.getContainerId());
}
Thread.sleep(100);
}
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(
FinalApplicationStatus.SUCCEEDED, "", "");
}
}
3 Container实现
当NodeManager收到AM的启动Container事件时,执行ContainerLauncher,然后在其中执行具体的ContainerExecutor(DefaultContainerExecutor/LinuxContainerExecutor),针对Container中资源监控时传入了Executor对象获取相关PID。
public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
super("containers-monitor");
this.containerExecutor = exec;
this.eventDispatcher = dispatcher;
this.context = context;
this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
this.containersToBeRemoved = new ArrayList<ContainerId>();
this.monitoringThread = new MonitoringThread();
}
3.1 资源隔离
默认情况下,YARN采用了线程监控的方法判断任务是否超量使用内存,一旦发生超量,则直接将其杀死。由于Cgroups对内存的控制缺乏灵活性(即任务任何时刻不能超过内存上限,如果超过,则直接将其杀死或者报OOM),而Java进程在创建瞬间内存将翻倍,之后骤降到正常值,这种情况下,采用线程监控的方式更加灵活(当发现进程树内存瞬间翻倍超过设定值时,可认为是正常现象,不会将任务杀死)。
- 构造进程树
在YARN中每个Container可以认为是一个独立的进程,同时,Container也可能会创建子进程(可能会创建多个子进程,这些子进程可能也会创建子进程),因此Container进程的内存(物理内存、虚拟内存)使用量应该表示为:以Container进程为根的进程树中所有进程的内存(物理内存、虚拟内存)使用总量。
在Linux系统中的 /proc目录下,有大量以整数命名的目录,这些整数是某个正在运行的进程的PID,如下图所示:
而目录/proc/<PID>下面的那些文件分别表示着进程运行时的各方面信息
YARN只关心/proc/<PID>/stat文件。
打开一个/proc/<PID>/stat文件你会发现它仅有一行(多列)文本:
60717 (rpc.statd) S 1 60717 60717 0 -1 4202816 1262 0 0 0 3 0 0 0 20 0 1 0 412627997 75718656 12987 18446744073709551615 139780868575232 139780868643356 140737412651392 140737412650712 139780856985091 0 0 16846848 18947 18446744071580587289 0 0 17 6 0 0 0 0 0
里面记录了该进程相关的信息,可以通过正则表达式
^([\\d-]+)\\s\\(([^)]+)\\)\\s[^\\s]\\s([\\d-]+)\\s([\\d-]+)\\s+([\\d-]+)\\s([\\d-]+\\s){7}(\\d+)\\s(\\d+)\\s([\\d-]+\\s){7}(\\d+)\\s +(\\d+)(\\s[\\d-]+){15}
从中抽取进程的运行时信息,包括:进程名称、父进程PID、父进程用户组ID、Session ID在用户态运行的时间(单位:jiffies)、核心态运行的时间(单位:jiffies)、占用虚拟内存大小(单位:page)和占用物理内存大小(单位:page)等。
文件/proc/<PID>/stat中包含的内存大小单位为page,为了获取以字节为单位的内存信息,可通过执行以下Shell脚本获取每个page对应的内存量(单位:B):
[root@lark001 60717]# getconf PAGESIZE
4096
通过对以上信息的计算可以得到当前每个运行的Container使用的内存总量。在ContainersMonitorImpl内部维护着每个Container进程的PID,通过遍历/proc下各个进程的stat文件内容(父进程PID、占用虚拟内存大小和占用物理内存大小),YARN构建出每个Container的进程树,从而得出每个进程树的虚拟内存、物理内存使用总量。后期针对该信息会定时刷新。
- 判断内存
在获取到每一个Container之后已经可以获得各个Container进程树的内存(物理内存、虚拟内存)使用量,YARN并不会仅凭进程树的内存使用量(物理内存、虚拟内存)是否超过上限值就决定是否“杀死”一个Container,因为“子进程”的内存使用量是有“波动”的(Java中创建子进程采用了fork()+exec()的方案,子进程启动瞬间,它使用的内存量与父进程一致,从外面看来,一个进程使用内存量可能瞬间翻倍,然后又降下来),为了避免“误杀”的情况出现,Hadoop赋予每个进程“年龄”属性,并规定刚启动进程的年龄是1,MonitoringThread线程每更新一次,各个进程的年龄加一,在此基础上,选择被“杀死”的Container的标准如下:如果一个Contaier对应的进程树中所有进程(年龄大于0)总内存(物理内存或虚拟内存)使用量超过上限值的两倍;或者所有年龄大于1的进程总内存(物理内存或虚拟内存)使用量超过上限值,则认为该Container使用内存超量,则Container发送ContainerEventType.KILL_CONTAINER事件将其KILL掉。
- CPU资源隔离
默认情况下,NodeManager未启用任何对CPU资源的隔离机制,如果需要启用该机制需使用LinuxContainerExecutor,它能够以应用程序提交者的身份创建文件、运行Container和销毁Container。相比于DefaultContainerExecutor采用NodeManager启动者的身份执行这些操作,LinuxContainerExecutor具有更高的安全性。
LinuxContainerExecutor的核心设计思想是赋予NodeManager启动者以root权限,进而使其拥有足够的权限以任意用户身份执行一些操作,从而使得NodeManager执行者可以将Container使用的目录和文件的拥有者修改为应用程序的提交者,并以应用程序提交者的身份运行Container,防止所有Container以NodeManager执行者身份运行进而带来的各种风险。
上述机制的实现在YARN的NodeManager采用C语言实现了一个setuid功能的工具container-executor,其位于如下目录:
该脚本拥有root权限,可以完成任意操作,其可执行脚本位于:/opt/yarn/hadoop/bin/container-executor。YARN正是通过该脚本创建出Cgroups层级树以及完成Cgroups属性设置等操作。具体cgroups详细讲解参考引用部分文章。