前言
上文提到yarn类似一个分布式操作系统,那么我们就可以自定义写一些应用在这个操作系统上运行
当然也不能太过随意写,我们要运行在操作系统上就必然要遵守操作系统本身的规矩
Yarn
Yarn体系中,用户的主程序被称作ApplicationMaster
,当然我们可以在ApplicationMaster中继续向RM申请资源来执行子程序,比如MapReduce中的MapTask和ReduceTask都属于子程序。
这就好比我们平时写java,在main方法主线程中可以创建子线程跑一些逻辑
- linux/windows中,我们创建java子线程不需要关心这个线程任务到底由哪个cpu完成,任务交给操作系统来调度
- 同理yarn中,
ApplicationMaster
申请创建出来的子程序,我们不用考虑程序运行在哪台机器上,任务交给yarn来调度
Hello World
接下来我们就尝试写一个简单应用(输出Hello World),运行在yarn中,我们先不考虑使用子程序,直接在ApplicationMaster
中输出Hello World
ApplicationMaster
写一个Hello World应用再简单不过了:
public class MyAppMaster {
public static void main(String[] args) {
System.out.println("HELLO WORLD");
}
}
但还是那句话,在yarn上运行就要遵守人家的规矩,而yarn规定:
ApplicationMaster程序运行前需要向RM注册,运行结束后需要取消注册
也就是说程序不是你想跑就能跑,你得告诉人家资源管理器一声,否则人家队伍怎么带?
注册的相关逻辑如果真自己写还挺复杂,但好在hadoop为我们提供了客户端工具,我们引入依赖就方便了
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
最终ApplicationMaster
代码如下(就是增加了注册到RM和取消注册)
public class MyAppMaster {
/**
* AppMaster 程序入口
* @param args 执行参数
*/
public static void main(String[] args) {
MyAppMaster master = new MyAppMaster();
master.run();
}
/**
* AppMaster 运行
*/
public void run() {
try {
// 开启am-rm client,建立rm-am的通道,用于注册AM
AMRMClientAsync amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, null);
amRmClient.init(new Configuration());
amRmClient.start();
String hostName = NetUtils.getHostname();
// 注册至RM
amRmClient.registerApplicationMaster(hostName, -1, null);
// 运行程序
doRun();
// 解除注册
amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 实际运行程序,就一个输出
*/
private void doRun() {
System.out.println("HELLO WORLD");
}
}
到此我们的应用程序就写完了,并且遵守了yarn的规矩
YarnClient
应用程序写完了,怎么把程序部署到yarn上运行呐?
yarn又有规定了:
想让你的程序在我的平台上跑,需要你在RM上创建应用,并指定好应用名称、运行环境、程序(jar包)位置、启动命令、所需资源等
当然这些数据的提交是有一定格式的,就像我们前端对接后端api,肯定是有一个json格式
索性我们不要考虑这复杂的格式,因为hadoop-yarn-client
依赖同样帮我们封装好了,就好似有了sdk,写写代码就可以和RM对接了,而这个负责对接RM上传应用程序和启动参数的代码,一般我们叫它:YarnClient
我们开始写代码实现这个YarnClient
1.配置
首先我们要与RM沟通创建应用,首先要搞清楚RM在哪才能和它交互,所以先配置一下RM的IP地址
Configuration conf = new Configuration();
// 设置rm所在的ip地址
conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
其中192.168.10.101就是你运行RM的机器IP地址
2.申请应用
有了地址,就可以申请应用,这一步直接使用hadoop-yarn-client
依赖的工具即可
// 创建客户端
YarnClient yarnClient = YarnClient.createYarnClient();
// 初始配置
yarnClient.init(conf);
// 开启(建立连接)
yarnClient.start();
// 向RM发送请求创建应用
YarnClientApplication application = yarnClient.createApplication();
// 准备应用提交上下文(RM要求你提交的信息格式)
ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
// 获取分配的应用id
ApplicationId appId = applicationSubmissionContext.getApplicationId();
log.info("appId: {}", appId);
其中ApplicationId
就是RM给我们分配的应用ID,ApplicationSubmissionContext
就是我们要提交的应用相关信息的载体
所以接下来就是给applicationSubmissionContext填充应用名称、运行环境、程序(jar包)位置、启动命令、所需资源等信息再次提交给RM
3.设置应用名称
应用名称就起个"Hello World"
// 设置应用名称
applicationSubmissionContext.setApplicationName("Hello World");
4.设置程序(jar包)位置
这一步最重要,你得告诉RM你得程序在哪,一般都存在HDFS上,因为我懒着去上传,写了一个本地传送到HDFS的方法
// 即上一步写的AppMaster jar包的本地位置
String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
String jarName = "my-yarn-app.jar";
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
put(jarName, addLocalToHdfs(jarPath, jarName));
}};
其中addLocalToHdfs
就是上传到HDFS,并获取HDFS路径
private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
//获取文件系统
Configuration configuration = new Configuration();
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
// 目标路径
String dst =
"hello/" + jarName;
Path dstPath =
new Path(fs.getHomeDirectory(), dst);
// 上传
fs.copyFromLocalFile(new Path(jarPath), dstPath);
FileStatus scFileStatus = fs.getFileStatus(dstPath);
// 关闭
fs.close();
LocalResource scRsrc = LocalResource.newInstance(
URL.fromURI(dstPath.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
return scRsrc;
}
这一步需要引入hdfs-client
依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.1.3</version>
</dependency>
5.设置程序环境
这一步同样比较重要,我们需要设置程序运行的环境,jdk、yarn包什么的,设置了CLASSPATH
Map<String, String> env = new HashMap<>();
// 任务的运行依赖jar包的准备
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
// yarn依赖包
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
6.设置启动脚本
这一步一样至关重要,我们要告诉RM我们的程序怎么启动,因为Yarn不光支持java包这一种程序,所以我们要写java的启动命令,可以通过-Xms -Xmx等设置启动jvm参数
List<String> commands = new ArrayList<String>() {{
add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
}};
7.配置Container启动上下文
资源、环境、启动命令等就组成了一个Container(AM的Container)启动的所需参数,把它们打包为container启动上下文,通过setAMContainerSpec
设置到要提交的参数中
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// 准备am Container的运行环境
applicationSubmissionContext.setAMContainerSpec(amContainer);
8.设置am程序所需硬件资源
准备好了所有启动程序的信息,下一步就是告诉RM你这个AppMaster需要多少硬件资源,这样RM才能给你找合适的节点运行你的程序,通过setResource
设置到要提交的参数中
int memory = 1024;
int vCores = 2;
applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
9.提交作业
完事具备,提交给RM你的程序就会被跑起来了
yarnClient.submitApplication(applicationSubmissionContext);
完整代码
YarnClient完整代码如下
package me.pq.yarn;
/**
* @Author pq217
* @Date 2022/11/18 17:47
* @Description
*/
public class MyYarnClient {
private static Logger log = LoggerFactory.getLogger(MyYarnClient.class);
public static void main(String[] args) {
MyYarnClient client = new MyYarnClient();
try {
client.run();
} catch (Exception e) {
log.error("client run exception , please check log file.", e);
}
}
/**
* 客户端运行
* @throws IOException
* @throws YarnException
* @throws URISyntaxException
* @throws InterruptedException
*/
public void run() throws IOException, YarnException, URISyntaxException, InterruptedException {
/**=====1.配置=====**/
Configuration conf = new Configuration();
// 设置rm所在的ip地址
conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
/**=====2.申请app=====**/
// 创建YarnClient和ResourceManager进行交互
YarnClient yarnClient = YarnClient.createYarnClient();
// 初始配置
yarnClient.init(conf);
// 开启(建立连接)
yarnClient.start();
// 向RM发送请求创建应用
YarnClientApplication application = yarnClient.createApplication();
// 准备应用提交上下文(RM要求你提交的信息格式)
ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
// 获取分配的应用id
ApplicationId appId = applicationSubmissionContext.getApplicationId();
log.info("appId: {}", appId);
/**=====3.设置应用名称=====**/
// 设置应用名称
applicationSubmissionContext.setApplicationName("Hello World");
/**=====4.准备程序(jar包)=====**/
String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
String jarName = "my-yarn-app.jar";
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
put(jarName, addLocalToHdfs(jarPath, jarName));
}};
/**=====5.准备程序环境=====**/
Map<String, String> env = new HashMap<>();
// 任务的运行依赖jar包的准备
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
// yarn依赖包
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
/**=====6.准备启动命令=====**/
List<String> commands = new ArrayList<String>() {{
add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
}};
/**=====7.构造am container运行资源+环境+脚本=====**/
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// 准备am Container的运行环境
applicationSubmissionContext.setAMContainerSpec(amContainer);
/**=====8.设置am程序所需资源=====**/
int memory = 1024;
int vCores = 2;
applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
/**=====9.提交并开始作业=====**/
yarnClient.submitApplication(applicationSubmissionContext);
/**=====10.查询作业是否完成=====**/
for (;;) {
Thread.sleep(500);
ApplicationReport applicationReport = yarnClient.getApplicationReport(appId);
YarnApplicationState state = applicationReport.getYarnApplicationState();
FinalApplicationStatus status = applicationReport.getFinalApplicationStatus();
if (state.equals(YarnApplicationState.FINISHED)) {
if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
log.info("程序运行成功!");
break;
} else {
log.error("程序运行失败!");
break;
}
} else if (state.equals(YarnApplicationState.FAILED) || state.equals(YarnApplicationState.KILLED) ) {
log.error("程序运行失败!");
break;
}
log.info("计算中...");
}
}
/**
* 上传本地jar包到hdfs
* @param jarPath
* @param jarName
* @throws IOException
*/
private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
//获取文件系统
Configuration configuration = new Configuration();
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
// 目标路径
String dst =
"hello/" + jarName;
Path dstPath =
new Path(fs.getHomeDirectory(), dst);
// 上传
fs.copyFromLocalFile(new Path(jarPath), dstPath);
FileStatus scFileStatus = fs.getFileStatus(dstPath);
// 关闭
fs.close();
LocalResource scRsrc = LocalResource.newInstance(
URL.fromURI(dstPath.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
return scRsrc;
}
}
测试
应用写好了,上传应用的client也写好了,下面测一下
首先使用maven-assembly插件给程序打jar包
mvn clean package
其次,本地idea直接运行YarnClient的main方法
注意替换一下代码中的jar包地址和名称,以及AppMaster的全路径名,以及hadoop的ip地址等信息
MyYarnClient的运行结果idea输出如下
打开yarn-web再看一下日志
成功实现了一个运行在Yarn上的小程序!
分布式计算
以上,我们完成了一个简单的程序运行在yarn上,但其实这个应用程序实际上只在一个节点上实际运行了System.out.println
的代码,这就像去了一趟沃尔玛,买了瓶矿泉水
yarn的优势是可以让我们的计算程序分给多个机器节点去执行,我们继续改造一下AppMaster,实现如下功能:
- 添加两个子任务,子任务分别在HDFS中创建一个文件夹
- 两个子任务结束之后,再运行输出Hello World
ChildTask
首先编写子任务,我为了省事,直接和AppMaster放一个项目中了,很简单的代码,创建一个/child/+服务器hostName的文件夹
public class ChildTask {
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
//获取文件系统
Configuration configuration = new Configuration();
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
// hostName
String hostName = NetUtils.getHostname();
// 创建一个文件夹
fs.mkdirs(new Path("/child/"+hostName));
fs.close();
}
}
AppMaster
接下来要改造AppMaster,原来只是输出Hello World,现在要向RM申请Container用来执行子任务
container请求
首先申请Container需要向RM申请,所以使用amRmClient即可发出请求
// 两个子任务,对应两个container
int childTaskNum = 2;
for (int i = 0; i < childTaskNum; i++) {
// 向rm申请一个1M内存,1个CPU的资源容器
int memory = 1024;
int vCores = 1;
AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
amRmClient.addContainerRequest(containerRequest);
}
rm回调
申请成功后,当rm分配出container时还要进行相关回调处理,所以amRmClient定义时要加上一个回调处理类
// rm回调处理器
AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
// 开启am-rm client,建立rm-am的通道,用于注册AM, allocListener负责处理AM的响应
AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
RMCallBackHandler是rm响应的处理器
private class RMCallBackHandler extends AMRMClientAsync.AbstractCallbackHandler {
重点要实现两个方法
- onContainersAllocated rm分配出containers的回调方法
- onContainersCompleted container运行结束的方法
onContainersCompleted
这个方法主要是子任务运行完成,我们在AppMaster加几个内部变量控制所有子任务完成再输出"Hello World"
// 充当锁
private Object lock = new Object();
// 任务个数
private int childTaskNum = 2;
// 已完成任务个数
private int childTaskCompletedNum = 0;
RMCallBackHandler的onContainersCompleted方法实现如下:
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
synchronized (lock) {
System.out.println(++childTaskCompletedNum + " container completed");
// 子任务全部完成
if (childTaskCompletedNum == childTaskNum) {
lock.notify();
}
}
}
}
doRun方法修改为如下
private void doRun(AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient) throws InterruptedException {
// 申请两个资源容器
for (int i = 0; i < childTaskNum; i++) {
// 向rm申请一个1M内存,1个CPU的资源容器
int memory = 1024;
int vCores = 1;
AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
amRmClient.addContainerRequest(containerRequest);
}
synchronized (lock) {
// 等待子任务完成
lock.wait();
}
System.out.println("HELLO WORLD");
}
到此即可实现申请两个container,两个container运行完后再执行输出"HELLO WORLD"
onContainersAllocated
这是RMCallBackHandler中要实现的重点方法,当container分配成功后要做什么?
思路很简单,container分配之后当然要在对应的容器上运行我们的子任务:ChildTask
,而子任务的运行一定是在container所指定的NM节点上,所以我们要提前初始化一个NM客户端:
加一个内部属性以供AppMaster整个类使用
NMClientAsyncImpl nmClientAsync;
此时AppMaster run
方法修改如下
public void run() {
try {
// rm回调处理器
AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
// 开启am-rm client,建立rm-am的通道,用于注册AM, allocListener负责处理AM的响应
AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
amRmClient.init(new Configuration());
amRmClient.start();
String hostName = NetUtils.getHostname();
// 注册至RM
amRmClient.registerApplicationMaster(hostName, -1, null);
// 初始化nmClient
nmClientAsync = new NMClientAsyncImpl(new NMCallBackHandler());
nmClientAsync.init(conf);
nmClientAsync.start();
// 运行程序
doRun(amRmClient);
// 解除注册
amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
// am-rm客户端关闭
amRmClient.stop();
// nm客户端关闭
nmClientAsync.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
其中NMCallBackHandler是一个NM响应的Callback,可以通过实现其方法在container声明周期加入一些逻辑
private class NMCallBackHandler extends NMClientAsync.AbstractCallbackHandler {
}
接下来就是实现onContainersAllocated
,代码如下
@Override
public void onContainersAllocated(List<Container> containers) {
try {
for (Container container : containers) {
System.out.println("container allocated, Node=" + container.getNodeHttpAddress());
// 构建AM<->NM客户端并开启
// 还是YarnClient containerLaunchContext那一套,这把直接去HDFS系统取文件,因为和YarnClient打包到一个jar上传
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), conf, "root");
URI appUri = new URI("/user/root/hello/my-yarn-app.jar");
FileStatus fileStatus = fs.getFileStatus(new Path(appUri));
put("my-yarn-app.jar", LocalResource.newInstance(
URL.fromURI(appUri),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
fileStatus.getLen(), fileStatus.getModificationTime()));
}};
Map<String, String> env = new HashMap<>();
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
List<String> commands = new ArrayList<String>() {{
// 传入ip地址作为参数
add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx200m me.pq.yarn.ChildTask");
}};
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// nm节点启动container
nmClientAsync.startContainerAsync(container, containerLaunchContext);
}
} catch (Exception e) {
e.printStackTrace();
}
}
代码就不详解了,和YarnClient提交的ContainerLaunchContext写法基本一致(最终运行me.pq.yarn.ChildTask而不是MyAppMaster),最后使用NM客户端的startContainerAsync
方法让子任务运行在NM上
值得一提的是我的ChildTask和AppMaster都在一个jar包下,所以这里不用上传了,直接去HDFS取即可
测试
代码写完了,测试一下,mvn clean package然后执行MyYarnClient main
方法
idea输出
HDFS-WEB上看一下子任务的文件夹创建是否成功
可见文件夹创建出来了
YARN-WEB看一下AppMaster的日志
到此,实现了一个运行在yarn上的简单分布式计算程序~