最近在阅读Hadoop的源码,为了加深理解,就将其记录下来。
我们都知道,Hadoop主要由MapReduce实现,YARN,以及HDFS组成。所以,我会依次阅读MapReduce的实现,以及YARN,最后是HDFS的实现。
在这篇文章中,我们先介绍提交一个作业的时候发生了什么。
当我们运行一个作业的时候,我们都是通过:$HADOOP_HOME/bin/hadoop your.jar main_class input output这么一条命令来运行的。
通过查看$HADOOP_HOME/bin/hadoop这个文件的内容,我们能够发现,它是运行了一个org.apache.hadoop.util.RunJar这么一个类。
那么这个类都做了什么呢?
其实没什么,打开它的源码,我们能够看到,只不过就是解压我们指定的jar文件,然后运行主类。
这里有一点需要注意的是,有的jar文件是会包含META-INF/MANIFEST.MF这么一个文件的。而这个文件中可以指定Jar文件的主类。
从org.apache.hadoop.util.RunJar这个类的内容中,我们可以看到,如果在META-INF/MANIFEST.MF中包含了一个跟我们通过命令行指定的主类不相同的主类,那么是会优先选择META-INF/MANIFEST.MF中指定的那个主类的。
而我们编写的MapReduce作业中,一般主类中,都是通过Job对象来进行一些配置,并通过job.waitForCompletion()方法来等待其完成,并输出中间的过程信息。
关于“作业提交”这个阶段的具体过程,在《Hadoop: The Definitive Guide》这本书中,有一副非常清晰的过程图,这里我们将它摘取过来:
就算不看源码,只看这幅图片,也能对这个过程有一个清晰的认识。
在这篇文章中,我们主要介绍上图中的第1, 2, 3, 4步。后面的步骤,我们会在后面的文章中介绍。
我们首先看一下Job的waitForCompletion(boolean verbose)方法的实现。
在这个方法中,最重要的调用就是submit()这一行。
我们看一下submit()方法的实现。
其中connect()方法,会根据你的配置文件,实例化一个Cluster类的对象,并将其赋给Job.cluster这个字段。我们可以通过Cluster来跟YARN这种集群资源管理系统交互。
这是因为Cluster中有一个ClientProtocol类型的字段,ClientProtocol就是用于Client和集群资源管理系统交互的工具,ClientProtocol有一个很重要的实现,就是YARNRunner。
我们可以看到,在submit方法中,获取到了一个JobSubmitter。然后通过JobSubmitter的submitJobInternal()方法来正式提交任务。
关于submitJobInternal()这个方法,在其注释中,就写的非常清楚了。
我们可以看到,它做了这么几件事情:
- 检查作业的输入和输出
- 计算InputSplit
- 验证
- 将作业的Jar文件,配置文件,以及一些其他的文件保存到DistributedCache中
- 将作业提交的JobTracker上,并监控它的状态
我们这里主要介绍这么三点:
- 计算InputSplit
- 将作业的Jar文件,配置文件,以及一些其他的文件保存到DistributedCache中
- 将作业提交的JobTracker上,并监控它的状态
我们先来看第二点-将作业的Jar文件、配置文件、以及一些其他的文件保存到DistributedCache中。
在这一点上,最重要的是一个叫做copyAndConfigureFiles()的方法,这个方法会将我们的作业的Jar文件,以及在命令行中,通过-files,-libjars,-archives指定的文件,加入到DistributedCache中。而DistributedCache实际上,就是DFS中的一个特殊的文件夹。它的命名规则是,/tmp/hadoop-yarn/staging/<your_username>/.staging/<job_id>/files
在把它们加到DistributedCache中之后,还会为它们设置权限等。并将路径信息保存到Configuration对象中。
还有很多跟这个作业相关的内容,也会被添加到这个跟此次作业相关的文件夹当中,比如,后面我们将会看到的,关于InputSplit的一些信息。
在这个方法内部,通过调用writeSplits()方法,来进行分片。
我们可以看到,就是调用了InputFormat的getSplits()方法而已,最后再对各个InputSplit进行排序,将最大的排到前面。
在这个方法的最后,我们可以看到,调用了ClientProtocol的submitJob()方法来提交作业。
这个方法中,最重要的就是调用createApplicationSubmissionContext()方法来构造ApplicationSubmissionContext以及ContainerLaunchContext这两个非常重要的对象了。
这两个对象为什么重要呢?
- ApplicationSubmissionContext这个对象,包含了YARN ResourceManager启动ApplicationMaster的全部信息。ApplicationMaster也是一个非常重要的组件,它相当于Mapper和Reducer中间的桥梁。
- ContainerLaunchContext这个对象,包含了YARN NodeManager启动一个Container所需要的全部信息,包括需要运行的命令,以及CLASSPATH等。
在构造好这两个对象之后,将ContainerLaunchContext包含在ApplicationSubmissionContext中,然后通过Protobuf这种RPC调用,将ApplicationSubmissionContext发送给YARN ResourceManager,来启动一个任务。
然后,再通过获取任务状态的RPC调用,来获取这个任务的状态。
一个任务的状态,有这么几种:
- NEW:表示任务刚被创建
- NEW_SAVING:表示任务已经被保存
- SUBMITTED:表示任务已经被提交
- ACCEPTED:表示任务已经被调度器接受
- RUNNING:表示任务正在运行
- FINISHED:表示任务已经正常完成
- FAILED:表示任务失败
- KILLED:表示任务已经被用户或者管理员取消
只要任务不是处于NEW或者NEW_SAVING状态,那么就给Client返回。也就是说,不管任务是成功还是失败,都给Client返回。
然后,客户端再通过不断向YARN ResourceManager轮循任务的状态,在控制台输出进度等。
思考
从Hadoop的实现中,可以看到,我们可以很轻松的添加其他的集群资源管理系统,可以替换掉YARN,而采用Mesos,或者Kubernetes等。
但是,目前Hadoop的实现中,只支持YARN。
参考资料
《Hadoop: The Definitive Guide》