自行整理, 学习用途, 侵知删歉
一.MapReduce定义
- MP是一种编程模型
- 记录导向的数据处理方式(键值对)
- 帮助多节点下的任务分配
- 由Map和Reduce两部分组成
Mapper每次对一个独立记录进行处理
Reducer汇集Mapper送来的结果 -
Map后的数据通过shuffle和sort送往Reduce
- 自动并行和分布
- 容错功能
- 具备状态和监控工具
二.基本概念
- 每一个Mapper处理一个HDFS单独的input split
- Hadoop一次发送一条记录给Mapper代码
- 每一个记录有一个键值对
- 中间环节的数据由Mapper保存到本地磁盘
- 在shuffle和sort过程, 所有有相同中间环节key的value都被转换到相同的Reducer
开发者确定Reducer的数量 - Reducer传送每一个key和相对应的一组values
- Reducer的输出写到HDFS
举例:
数据源:
行数 | 内容 |
---|---|
1202 | the cat sat on the mat |
1225 | the aardvark sat on the sofa |
Mapper:
// assume input is a set of text files
// k is a byte offset
// v is the line for that offset
let map(k, v) =
foreach word in v:
emit(word, 1)
产生结果:
(the, 1), (cat, 1), (sat, 1), (on, 1), (the, 1), (mat, 1), (the, 1), (aardvark, 1), (sat, 1), (on, 1), (the, 1), (sofa, 1)
经过shuffle sort后输入给Reducer的结果为:
(aardvark, [1])
(cat, [1])
(mat, [1])
(on, [1, 1])
(sat, [1, 1])
(sofa, [1])
(the, [1, 1, 1, 1])
Reducer
// k is a word, vals is a list of 1s
let reduce(k, vals) =
sum = 0
foreach (v in vals):
sum = sum + v
emit (k, sum)
最终输出为[保存至HDFS]:
(aardvark, 1)
(cat, 1)
(mat, 1)
(on, 2)
(sat, 2)
(sofa, 1)
(the, 4
三.YARN 集群架构
Yarn : Yet Another Resource Negotiator
使用`ResourceManager/NodeManager 结构
支持: MapReduce v2, Impala, Spark, Giraph等处理框架
MapReduce v1:
使用'JobTracker/TaskTracker'结构
1. YARN 守护进程
- ResourceManager 每个集群1个
- 启动程序启动, 规划分配slave节点的资源, 分配Container
- NodeManager 每个slave节点1个
- 启动程序进程, 管理slave节点的资源
- JobHistoryServer 每个集群1个
- job的数据和元数据归档
然后和HDFS结合一下:
MapReduce术语:
Job
Mapper, Reducer, 需要处理的输入
也可以叫做一个应用
Task
一个工作独立的任务
一个Job被分成许多task: 每一个task是一个Map
或者是Reduce
task
一个task
在slave中的一个Container
中运行
Client
一个把job提交给ResourceManager的程序, 也可以指运行程序的机器
Container
- 由ResourceManager分配
- 在slave node需要一定的资源(内存, cpu)
-
应用程序在1个或多个Container内运行
** Application Master**
- 每一个应用有一个Application Master
- 框架/application specific (MRAppMaster for MapReduce)
- 在slave的
container
中运行 -
会请求更多的container来运行application tasks
** ResourceManager**
- 管理节点
跟进NodeManager的心跳
- 运行一个scheduler
决定资源如何分配 - 管理container
创建一个ApplicationMaster的container, 并追踪心跳
管理ApplicationMaster的资源请求
当container失效后,或者应用完成后解除分配
管理集群级别的安全
Nodemanager
- 和ResourceManager通信
注册和提供节点的资源信息
发送心跳和container的状态
- 管理进程
启动ApplicationMaster
启动应用程序进程
监控container使用的资源, 杀死错误的进程
汇总应用的日志并保存在HDFS中
运行辅助服务
维护节点级别的安全
流程:
- client提交任务, ResourceManager提交一个Application Master到 NodeManager
- Application Master启动其他container[slave nodes]里面的Map tasks
- slave nodes里的Map tasks运行
- Application Master启动其他container[slave nodes]里面的Reduce tasks
- slave nodes里的Reduce tasks运行
6.完成后Application Master告诉Job History Server记录
7.Application Master告诉Resource Manager收回资源, 关闭Application Manager
日志汇集
log文件存放在slave节点本地
Yarn提供了应用Log汇集的服务:
先由NodeManager把本地日志放到HDFS
根据job(application)汇总log
可以根据HDFS客户端, 命令行, Yarn Web UI访问
本地资源
- MapReduce任务需要访问本地资源文件[slave节点上的硬盘]
- 用于Map task的输出[中间数据]
-
分布的内存[cache] : the JAR file contains the program binary to run
辅助auxiliary
服务
配置NodeManager可以运行辅助服务[为提供服务的持续的应用]
在NodeManager的JVM中运行
MapReduce的shuffle和sort会用到
Shuffle
shuffule包括Map阶段的spill
和Reduce阶段的copy
和sort
spill
: Map输出的结果不断的写到一个环形内存中, 默认80%后保存到HDFS中Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge, merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引.
Reduce侧的
copy
和sort
copy
:Reduce任务通过HTTP向各个Map任务拖取它所需要的数据, 在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中.
(Merge)Sort
这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。
申请资源
ApplicationMaster向ResourceManager请求资源, 请求的格式如下:
流程简图:
client提交程序后, ResourceManager会在一个slaveNode上创建AppMaster, AppMaster会向ResourceManager请求资源, 请求到的资源会以container的形式分配, container会提交给NodeManager. NodeManager启动tasks.
容错处理
task失败?
AppliacationMaster会重试task, 4次重试失败后会停止响应
ApplicationMaster失败?
如果应用挂了, 或者ApplicationMaster没有发送心跳, ResourceManager会重启整个应用, [最多2次重试]
备选项:Job recovery
设为false: 所有tasks会重跑
设为true: AppMaster会回溯所有tasks的状态, 只有未完成的task会重跑
NodeManager失败?
如果NodeManager没有发送心跳了, 这个Node就会从active列表中移除;
这个节点上的tasks也会定为失败;
如果有ApplicationMaster的节点挂了, 这整个应用都算挂了
ResourceManager失败?
那就没有应用,或者tasks可以启动了;
可以设置成high availability(HA)