Hadoop-MapReduce2.x-yarn框架
1.mapreduce1.0的不足
- JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
- JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
- 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
- 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
- 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。
2.Hadoop2.x中新方案YARN+MapReduce
首先的不要被YARN给迷惑住了,它只是负责资源调度管理,而MapReduce才是负责运算的家伙,所以YARN != MapReduce2.这是大师说的:
YARN并不是下一代MapReduce(MRv2),下一代MapReduce与第一代MapReduce(MRv1)在编程接口、数据处理引擎(MapTask和ReduceTask)是完全一样的, 可认为MRv2重用了MRv1的这些模块,不同的是资源管理和作业管理系统,MRv1中资源管理和作业管理均是由JobTracker实现的,集两个功能于一身,而在MRv2中,将这两部分分开了, 其中,作业管理由ApplicationMaster实现,而资源管理由新增系统YARN完成,由于YARN具有通用性,因此YARN也可以作为其他计算框架的资源管理系统,不仅限于MapReduce,也是其他计算框架(Spark).
看上图我们可以知道Hadoop1中mapreduce可以说是啥事都干,而Hadoop2中的MapReduce的话则是专门处理数据分析.而YARN的话则做为资源管理器存在.
在Hadoop2中将JobTracker两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度/监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。
1.事实上,每一个应用的ApplicationMaster是一个详细的框架库,它结合从ResourceManager获得的资源和 NodeManagr 协同工作来运行和监控任务。
2.在上图中ResourceManager支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。
ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。
3.在上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。
4.在上图中,每一个应用的 ApplicationMaster的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。
1.客户端的mapreduce程序通过hadoop shell提交到hadoop的集群中.
2.程序会通过RPC通信将打成jar包的程序的有关信息传递给Hadoop集群中RM(ResourceManager),可称为领取JOBID的过程
3.RM更加提交上来的信息给任务分配一个唯一的ID,同时会将run.jar的在HDFS上的存储路径发送给客户端.
4.客户端得到那个存储路径之后,会相应的拼接出最终的存放路径目录,然后将run.jar分多份存储在HDFS目录中,默认情况下备份数量为10份.可配置.
5.客户端提交一些配置信息,例如:最终存储路径,JOB ID等.
6.RM会将这些配置信息放入一个队列当中,所谓的调度器.至于调度的算法,则不必深究.
7.NM(NodeManager)和RM是通过心跳机制保持着通信的,NM会定期的向RM去领取任务.
8.RM会在任意的一台或多台的NM中,启动任务监控的进程Application Master.用来监控其他NM中YARN CHild的执行的情况
9.NM在领取到任务之后,得到信息,会去HDFS的下载run.jar.然后在本地的机器上启动YARN Child进程来执行map或者reduce函数.map函数的处理之后的中间结果数据会放在本地文件系统中的.
10.在结束程序之后,将结果数据写会HDFS中.整个流程大概就是这样子的.
3.执行过程
作业提交
Job实例调用submit()方法提交一个作业,这个方法会创建一个JobSubmitter实例(图中第1步)。提交完作业后,waitForCompletion(如果代码中调用了的话)方法会每隔一秒查看作业的状态并且将变化(如果有的话)打印到终端。
在JobSubmitter中会做如下事情:
向Yarn RM中请求一个新的应用ID,分配给这个MR作业(第2步)。 检查作业的输出需求。例如,如果输出目录已经存在,则不提交作业并且抛出一个异常。 计算作业的输入切片(准确说是切片信息)。如果切片不能被计算出来(例如输入目录不存在),同样作业不会被提交并且发出一个异常。 拷贝各种作业资源(例如作业的jar包,配置文件,切片信息等)到分布式文件系统下的一个被命名为job ID的目录下(第3步)。 调用submitApplicatiion方法提交作业到Yarn RM(第4步)。
作业初始化
当Yarn RM收到了submitApplicatiion方法中的请求。它将这个请求交给Yarn调度器。调度器会分配一个容器,然后Yarn RM在这个容器中启动一个AM(在NM管理下)(5a和5b)。
AM是一个java应用,它的主类叫做MRAppMaster。这个类会创建一些记录对象,用来获取来自任务的报告(第6步)。然后会从分布式文件系统中获取切片信息(第7步)。为每一个分片创建一个map任务,而reduce任务的个数由mapreduce.job.reduces属性或者setNumReduceTasks方法指定。同样任务的ID也在这个时候被分配。
AM也需要决定如何运行这些任务。如果作业太小,AM则把所有的任务跑在一个JVM中。这种作业被称为uber任务。
任务分配
如果作业不是uber任务,AM则会从RM那里为每一个map和reduce任务请求一个容器(第8步)。当然为map请求容器的优先级要高于reduce。事实上,reduce任务的请求直到至少5%的map任务完成才会被提出。
还需要注意到, map任务会尽量遵守locality约束,而reduce任务则可以运行在集群的任意地方。
任务执行
一旦RM给某个任务分配了某个特定节点上的容器,AM通过NM启动这个容器(9a和9b)。这个任务是一个Java应用并且主类叫做YarnChild。在运行这个任务之前,它会先从分布式文件系统中获取各种资源(第10步)。最后,它运行这个任务(第11步)。
状态更新
当任务运行时,它记录了自己的进度。对于map任务,是自己处理的分片的比例。对于reduce任务,稍微复杂一点,需要把整个过程分为三个部分,即shuffle和sort的三个阶段:copy, sort 和reduce,然后再估算。
每个任务,会每三秒通过一个脐带(umbilical)接口向AM报告自己的状态,而client则每一秒从AM那里收到最新的进度。
作业完成
当AM收到最后一个任务完成的消息时,它会把作业的状态变为“成功“。Job实例则获得这个状态后,告诉用户并且它的waitForCompletion方法会返回值,同时各种统计信息会被打印出来。
稍微提一点的是,通过配置mapreduce.job.end-notification.url property参数,AM也会发送HTTP通知。
最后AM和任务的容器会清理它们的状态。以及一些其他的收尾工作也在这时候被完成。