1.概述
MapReduce是一个分布式计算框架(编程模型),最初由由谷歌的工程师开发,基于GFS的分布式计算框架, 主要用于搜索领域,解决海量数据的计算问题。后来Cutting根据《Google Mapreduce》,设计了基于HDFS的Mapreduce分布式计算框架。
MR框架对于程序员的最大意义在于,不需要掌握分布式计算编程,不需要考虑分布式编程里可能存在的种种难题,比如任务调度和分配、文件逻辑切块、位置追溯等工作。这样,软件工程师能够把大部分精力放在核心业务层面上,大大简化了分布式程序的开发和调试周期。
MR由两个阶段组成:Mapper和Reducer,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。
2.MapReduce框架的节点组成结构
2.1 JobTracker / ResourceManager工作职能:
A. 知道管理哪些机器,即管理哪些NodeManager。
B. 要有检测机制,能够检测到NodeManager的状态变换,通过RPC心跳来实现。
C. 任务的分配和调度,ResourceManager能够做到细粒度的任务分配,比如某一个任务需要占用多大内存,需要多少计算资源。
注:ResourceManager是hadoop2.0版本之后引入了yarn,有yarn来管理hadoop之后,jobtracker就被替换成了ResourceManager
2.2 TaskTracker / NodeManager工作职能:
A. 能够收到ResourceManager发过来的任务,并进行任务的处理。这里处理任务指的是Map任务或Reduce任务。
3.Map、Reduce的执行步骤
1. map任务处理
读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
对输出的key、value进行分区。
对相同分区的数据,按照key进行排序(默认按照字典顺序进行排序)、分组。相同key的value放到一个集合中。
(可选)分组后的数据进行归约。
2. reduce任务处理
对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。这个过程并不是map将数据发送给reduce,而是reduce主动去获取数据。
对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。Hello 3
把reduce的输出保存到文件中(hdfs)。
4. Hadoop2.x的MR执行流程
一. 作业提交:
1. Job的submit()方法创建一个内部的JobSubmitter实例,并调用其submitJobInternal方法(步骤1)。提交作业后,waitForCompletion()调用monitorAndPrintJob()方法每秒轮询作业进度,如果发现自上次报告后有改变,便把进度报告给控制台。作业完成后,如果成功,就显示计数器;如果失败,这将导致作业失败的错误记录到控制台。
JobSubmitter所实现的作业提交过程如下所述:
2. 向资源管理器请求一个新应用的ID,用于MapReduce作业ID。
3. 作业客户端检查作业的输出说明,计算输入分片并将作业资源(包括作业Jar、配置和分片信息)复制到HDFS(步骤3)
4. 通过调用资源管理器上的submitApplication()方法提交作业(步骤4)
二. 作业初始化
5. 资源管理器ResourceManager收到调用他的submitApplication()消息后,便将请求传递给调度器(scheduler)。调度器分配一个容器,然后资源管理器在节点管理器的管理下载容器中启动应用程序的master进程(步骤5a和5b)
6. MapReduce作业的application master是一个Java应用程序,它的主类时MRAppMaster。它对作业进行初始化:通过创建多个簿记对象以保持对作业进度的跟踪,因为它将接受来自任务的进度和完成报告(步骤6)。
7. 接下来,它接受来自共享文件系统的在客户端计算的输入分片(步骤7)。对每一个分片创建一个map任务对象以及由mapreduce.
job.reduces属性确定的多个reduce任务对象。
三. 任务分配
8. AppMaster为该作业中的所有map任务和reduce任务向资源管理器请求容器。
四. 任务执行
9. 一旦资源管理器的调度器为任务分配了容器,AppMaster就通过与节点管理器NodeManager通讯来启动容器(步骤9a和9b)。
10. 该任务由主类为YarnChild的Java应用程序执行。在它允许任务之前,首先将任务需要的资源本地化,包括作业的配置、JAR文件和所有来自分布式缓存的文件.
11. 最后运行map任务或reduce任务。
五. 进度和状态更新
在YARN下运行时,任务每3秒钟通过umbilical接口向APPMaster汇报进度和状态。客户端每一秒钟(通过mapreduce.client.
Progressmonitor.pollinterval设置)查询一次AppMaster以接收进度更新,通常都会向用户显示。
六. 作业完成
除了向AppMaster查询进度外,客户端每5秒还通过调用Job的waitForCompletion()来检测作业是否完成。查询的间隔可以通过mapreduce.client.completion.pollinterval属性进行设置。作业完成后,AppMaster和任务容器清理器工作状态。
5. MR的序列化机制
由于集群工作过程中需要用到RPC操作,所以想要MR处理的对象的类必须可以进行序列化/反序列化操作。
Hadoop并没有使用Java原生的序列化,它的底层其实是通过AVRO实现序列化/反序列化,并且在其基础上提供了便捷API。
6. Partitioner -- 分区
分区操作是shuffle操作中的一个重要过程,作用就是将map的结果按照规则分发到不同reduce中进行处理,从而按照分区得到多个输出结果
Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类
HashPartitioner是mapreduce的默认partitioner。计算方法是
reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
Integer.MAX_VALUE)=2147483647 转换成二进制值是1111111111111111111111111111111
key.hashCode() & 2147483647 可以保证结果是非负数
注:默认情况下,reduceTask数量为1
很多时候MR自带的分区规则并不能满足我们需求,为了实现特定的效果,可以需要自己来定义分区规则。
7. sort 排序
Map执行过后,在数据进入reduce操作之前,数据将会按照mokey进行排序,利用这个特性可以实现大数据场景下排序的需求
8. Combiner -- 合并
每一个MapperTask可能会产生大量的输出,combiner的作用就是在MapperTask端对输出先做一次合并,以减少传输到reducerTask的数据量。
combiner是实现在Mapper端进行key的归并,combiner具有类似本地的reducer功能。
如果不用combiner,那么,所有的结果都是reducer完成,效率会相对低下。使用combiner,先完成在Mapper的本地聚合,从而提升速度。
9.Shuffle
shuffle是mapreduce中最重要的一个环节也是调优的重点
1.Mapper
每个MapperTask有一个环形内存缓冲区,用于存储map任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
写磁盘前,要partition,sort,Combiner。如果有后续的数据,将会继续写入环形缓冲区中,最终写入下一个溢出文件中。
等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
如果在最终合并时,被合并的文件大于等于3个,则合并完会再执行一次Combiner,否则不会。
2.Reducer
Reducer通过Http方式得到输出文件的分区。
NodeManager为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一但Map任务完成,Reduce就开始复制输出。
排序阶段合并map输出。然后走Reduce阶段逻辑
3. Mapper数量
Mapper的数量在默认情况下不可直接控制干预,Mapper的数量由输入的大小和个数决定。
在默认情况下,最终input占据了多少block,就应该启动多少个Mapper。
可以通过配置mapred.min.split.size来控制split的size的最小值。