Hadoop核心组件——MR(Map-Reduce)
Hadoop分布式计算框架Map-Reduce
Map-Reduce流程分四步:split->map->shuffle->reduce
上图是一个简单的例子,统计一个文本中各单词出现的数量。
Map-Reduce的Split大小
- max.split(100M)
- min.split(10M)
- block(64M)
- max(min.split,min(max.split,block))
max(min.split,min(max.split,block))在100与64中取小的为64,在64与10中取大的为64正好是一个block的大小。执行时正好是一个block大小,避免了数据的移动。
Mapper - Map-Reduce的思想就是“分而治之”
Mapper负责分,即把复杂的任务分解成若干个“简单的任务”执行 - “简单的任务”有几个含义
1.数据或计算规模相对于原任务要大大缩小
2.就近计算,即会被分配到存放了所需数据的节点进行计算
3.这些小人物可以并行计算,彼此间几乎没有依赖关系
Hadoop计算框架Reducer - 对map阶段的结果进行汇总
- Reducer的数目由mapred-site.xml配置文件里的项目mapred.reduce.tasks决定,缺省值为1,用户可以覆盖之(一般不会这么做,修改配置文件相当于修改整个集群,因此一般选择在程序中设置Reducer的数目)
Hadoop 计算框架Shuffler
- 每个map task都有一个内存缓冲区(默认是100MB),储存着map的输出结果
- 当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘(Spill)
- 溢写是有单独线程来完成,不影响往缓冲区写map结果的线程(spill percent,默认是0.8)
-
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)
map端 - 假如client设置过Combiner,那么现在就是使用Combiner的时候了,将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量
- 当整个map task结束后再对磁盘中的这个map task产生的所有临时文件做合并(Merge),对于“wird1”就是像这样的:{“word1”,[5,8,2,...]},假如有Combiner,{word1[15]},最终产生一个文件
- reduce 从tasktracker copy数据
- copy过来的数据会放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置
-
merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。merge从不同tasktracker上拿到的数据。
reduce端
MapReduce的架构1.x
在hadoop2.x中,我们通常搭建高可用环境,因此SecondaryNameNode变成一个standby状态的namenode有zookeeper集群负责维护,Job Tracker改名为ResourceManager,Task Tracker改名为nodemanager。
- 一主多从架构
- 主JobTracker:ResourceManager
负责调度分配每一个子任务task运行于TaskTracker上,如果发现有失败的task就重新分配其任务到其他节点。每一个hadoop集群中只一个JobTracker,一般它运行在Master节点上。 - 从TaskTracker:NodeManager
TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务,为了减少网络带宽TaskTracker最好运行在HDFS的DataNode上。
Map-Reducer高可用环境搭建
我们用node3和node4作为resoucemanager
首先,执行stop-dfs.sh停掉hadoop集群
mapred-site.xml(改文件可能叫mapred-site.xml.template,需要重命名)
添加
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
yarn-site.xml:
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!--配置rousource节点>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>node3</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>node4</value>
</property>
<!--配置zookeeper集群>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>node3:2181,node4:2181,node5:2181</value>
</property>
改完之后同步配置文件。
然后执行:start-all.sh 启动整个hadoop
然后再node3和node4上执行jps看看是否有resourcemanager,由于start-all.sh脚本有bug因此没有启动需要手动启动,命令yarn-daemon.sh start resourcemanager
nodemanager默认为node3-node5,因为我们在slaves中配置过datanode,该文件也同时配置了nodemanager。
启动之后我们可以访问node3和node4的8088端口查看resourcemanager状态