MapReduce作业运行流程
Map-Reduce的处理过程主要涉及下面四个部分:
- 客户端Client:用于提交Map-reduce任务job
- JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
- TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
- HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件
流程分析
- 在客户端启动一个作业
- 向JobTracker请求一个JobID
- 将运行作业所需要的资源文件复制到hdfs上,包括MapReduce程序打包的jar文件,配置文件和客户端计算多的输入划分信息,这些文件都存放在JobTracker专门为该作业创建的文件夹中,文件夹名为该作业的Job ID。 jar文件默认会有10个副本,(mapred.submit.replication属性控制), 输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。
- JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行,对于map和reduce任务,TaskTracker会根据主机核的数量和内存的大小有固定的map槽和reduce槽,map任务不是随随随便地分配给某个TaskTracker的,这里有个概念叫:数据本地化,将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序jar包复制到该TaskTracker上来运行,这叫运算移动,数据不移动
- TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker他依然在运行,同时心跳中还携带例如当前map任务完成的进度等信息,当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”,当JobClient查询状态时,它将得知作业已经完成。
流程细节讲解
1. 任务提交
- JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。
向JobTracker请求一个新的job ID
检测此job的output配置
计算此job的input splits
将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
通知JobTracker此Job已经可以运行了
提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。
2. 任务初始化
当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。
初始化首先创建一个对象来封装job运行的tasks, status以及progress。
在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。
其为每个input split创建一个map task。
每个task被分配一个ID。
3. 任务分配
TaskTracker周期性的向JobTracker发送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。
在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。
TaskTracker有固定数量的位置来运行map task或者reduce task。
默认的调度器对待map task优先于reduce task
当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。
4. 任务执行
TaskTracker被分配了一个task,下面便要运行此task。
首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。
TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。
其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。
其三,其创建一个TaskRunner来运行task。
TaskRunner创建一个新的JVM来运行task。
被创建的child JVM和TaskTracker通信来报告运行进度。
4.1 Map的过程
MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。
map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。
当buffer中数据的到达一定的大小,一个后台线程将数据开始写入硬盘。
在写入硬盘之前,内存中的数据通过partitioner分成多个partition。
在同一个partition中,后台线程会将数据按照key在内存中排序。
每次从内存向硬盘flush数据,都生成一个新的spill文件。
当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。
reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。
4.2 Reduce的过程
当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。
对于一个job,JobTracker知道TaskTracer和map输出的对应关系。
reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。
reduce task需要其对应的partition的所有的map输出。
reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。
reduce task中有多个copy线程,可以并行拷贝map输出。
当很多map输出拷贝到reduce task后,一个后台线程将其合并为一个大的排好序的文件。
当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。
细说Map、Reduce任务中Shuffle和排序的过程
Map端
每个输入分片会让一个map任务来处理,map输出的结果会暂时放在一个环形内存缓冲区中,(该缓冲区的大小默认为100m,由io.sort.mb属性控制),当该缓冲区快要溢出时,(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区的数据写入这个文件。
spill线程过程:
在把缓冲区的数据写到磁盘前,会对它进行一个二次快速排序,首先根据所属的partition排序,然后每个partition中再按key排序,输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的机场上进行,Combier就是一个MiniReducer,它在执行map任务的节点本身运行,对map的输出做一次简单的reduce,是map的输出更紧凑,更少的数据会被写入磁盘和传送到reducer,spill文件保存在mapred.local.dir指定的目录下,所有的spill文件会被归并为一个文件,map将删除所有的临时spill文件,并告知TaskTracker任务已经完成。Reducers通过HTTP来获取对应的数据,用来传输partitions数据的工作线程个数由tasktracker.http.threads控制,这个设定是针对每一个TaskTracker的,并不是单个map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面,其实分区就是对数据进行hash的过程,然后对每个分区中的数据进行排序,此时如果设置了Combiner,将排序后的结果进行Combine操作,这样做是让尽可能少的数据写入磁盘。
当map任务输出最后一个记录时,可能会有很多的溢出文件,这是需要将这些文件合并,合并的过程中会不断地进行排序和combine,合并文件的目的是:尽量减少下一复制阶段网络传输的数据量。 最后合并成了一个已分区且已排序的文件,为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。
将分区中的数据拷贝给相对应的reduce任务,有人可能会问,reduce任务怎么知道分区数据的呢? 其实map runner一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳,所以JobTracker保存了整个集群中的宏观信息,只要reduce任务向JobTracker获取对应的map输出位置就OK了。
map端的流程结束了,那到底什么是Shuffle呢,map任务的输出数据传输到reduce任务所在节点的过程,叫做shuffle。Shuffle的中文意思是洗牌,我们可以这么看: 一个map产生的数据,结果通过hash过程分区却分配给了不同的reduce任务,是不是一个队数据洗牌的过程。
Reduce端
接下来运行ReduceTask,其中的fetcher线程会从map端以http方式获取相应的文件分区,完成复制map的输出后,reducer就开始排序,运行merger把复制过来的文件存储在本地磁盘。
- Reduce会接收不同map任务传来的数据,并且每个map传来的数据都是有序的,如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比), 如果数据量超过了该缓冲区大小的一定比列(mapred.job.shuffle.merge.percen决定),则对数据合并后溢出写到磁盘中。
- 随着溢出文件的增多,后台线程会将他们合并成一个更大的有序文件,这样做是为了给后面的合并节省时间,其实不管在map端还是reduce端,map reduce都是反复地执行排序、合并操作,排序是hadoop的灵魂
下面给出通用的mapreduce数据流图:
当然,有些作业中我们可能根本不需要有reduce任务,所有工作在map任务并行执行完之后就完毕了,例如Hadoop提供的并行复制工作distcp,其内部实现就是采用一个只有Mapper,没有Reducer的MapReduce作业,在map完成文件复制之后作业就完成了,如下图所示
从上面可以清楚看出MapReduce程序的流程和设计思路:
- 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。
- TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。
- TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat 发送给 JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上
可以看出原来的map-reduce框架简单明了,在最初推出的几年,也得到了众多的成功案例,获得业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:
- JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
- JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
- 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
- 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是集群资源利用的问题。
- 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
JobTracker单点故障解决方案:
通过zk的选举机制解决MapReduce的单点故障,当JobTracker节点宕机时,能够在一台备用的JobTracker节点上启动JobTracker进程,并使用虚拟IP机制将虚拟IP(vip)指向备用JobTracker节点。
新 Hadoop Yarn 框架原理及运作机制(MR2)
从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。
为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn
重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控,新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调,每一个应用的 ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。