1.架构:
1.app--->2.jobclient--->3.jobtracker (namenode)
4.tasktracker (datanode)
2.工作流程
input--〉split->map->统计->sort->group-->reduce-->output
split->map
....
MapReduce作业的处理流程:
按照时间顺序包括:
输入分片(input split)
map阶段
combiner阶段
shuffle阶段和
reduce阶段
(1)输入分片(input split)
--在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务
输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切
--eg:假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split)
即我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。
(2)map阶段
--程序员编写好的map函数了,因此map函数效率相对好控制,而且一般map操作都是本地化操作也就是在数据存储节点上进行;
(3)combiner阶段
--combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作。combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入。
(4)shuffle阶段
将map的输出作为reduce的输入的过程就是shuffle了
(5)reduce阶段
程序员编写的,最终结果是存储在hdfs上的。
Intermediate files(on local disk)存储在硬盘中,安全,但有个效率的问题。
数据:
hello world
hello liu
hi dora
hello xiaozhang
Mapper{
map(k1,v1)->context {k2,v2}
k1-offset偏移量 int
v1- 一行 hello world->string.split('\\s')
for()
k2=hello
v2=1
}
Reducer{
reduce(k2,v2)--->k3,v3
k2=hello
v2=iterator{1,1,1}
k3=hello v3=sum
for(v2){
sum+=v2[i]
}
}
3.JobTracker的单点故障
问题:jobtracker和hdfs的namenode一样也存在单点故障。为什么hadoop的做的文件系统和mapreduce计算框架都是高容错的,但是最重要的管理节点的故障机制却如此不好?
答:主要是namenode和jobtracker在实际运行中都是在内存操作,而做到内存的容错就比较复杂了,只有当内存数据被持久化后容错才好做,namenode和jobtracker都可以备份自己持久化的文件,但是这个持久化都会有延迟,因此真的出故障,任然不能整体恢复,另外hadoop框架里包含zookeeper框架,zookeeper可以结合jobtracker,用几台机器同时部署jobtracker,保证一台出故障,有一台马上能补充上,不过这种方式也没法恢复正在跑的mapreduce任务。