HADOOP框架
大数据技术解决的是什么问题?
- 解决海量数据的存储和计算
Hadoop的广义和狭义之分
- 狭义的Hadoop:
- Hadoop框架,三部分组成:
- HDFS:分布式文件系统,解决存储问题
- MapReduce:分布式离线计算框架,解决计算问题
- Yarn:资源调度框架
- Hadoop框架,三部分组成:
- 广义的Hadoop:生态圈,包括Hadoop框架以及辅助框架
- Flume:日志采集
- Sqoop:关系型数据库数据的采集,导出
- Hive:深度依赖Hadoop框架完成计算
- Hbase:大数据领域的数据库
- Kafka:高吞吐消息中间件
大数据简介
大数据定义
- 指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合
- 需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产
大数据特点
大量(Volume)
- 1PB = 1024TB
- 1EB = 1024PB
- 1ZB = 1024EB
高速(Velocity)
- 数据的创建、存储、分析都要求被高速处理
- 实时完成个性化推荐
多样(Variety)
- 数据形式和来源多样化
真实(Veracity)
- 确保数据的真实性,才能保证数据分析的正确性
低价值(Value)
- 数据价值密度相对较低
- 结合业务逻辑并通过强大的机器算法来挖掘数据价值
大数据应用场景
- 仓储物流
- 电商零售
- 个性推荐
- 汽车-无人驾驶
- 人工智能
Hadoop简介
Hadoop定义
Hadoop 是一个适合大数据的分布式存储和计算平台。
Hadoop的发行版本
- Apache Hadoop
- 0.x 系列版本:Hadoop当中最早的一个开源版本,在此基础上演变而来的1.x以及2.x的版本
- 1.x 版本系列:Hadoop版本当中的第二代开源版本,主要修复0.x版本的一些bug等
- 2.x 版本系列:架构产生重大变化,引入了yarn平台等许多新特性
- 3.x 版本系列:EC技术、YARN的时间轴服务等新特性
- CDH版本
- HDP版本
Hadoop 优缺点
优点
- Hadoop具有存储和处理数据能力的高可靠性
- 通过可用的计算机集群分配数据,完成存储和计算任务,这些集群可以方便地扩展到数以
- 能够在节点之间进行动态地移动数据,并保证各个节点的动态平衡,处理速度非常快,具有高效性
- 能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配,具有高容错性。
缺点
-
Hadoop不适用于低延迟数据访问。 -
Hadoop不能高效存储大量小文件。 -
Hadoop不支持多用户写入并任意修改文件
Apache Hadoop 重要组成
Hadoop=HDFS(分布式文件系统)+MapReduce(分布式计算框架)+Yarn(资源协调框架)+Common模块
Hadoop HDFS
一个高可靠、高吞吐量的分布式文件系统,Hadoop Distribute File System
-
分而治之,把一个很大的数据存储进行拆分,数据切割成数据块,由节点分别存储
image.png - HDFS是Master/Slave架构:
- 存储过程:把一个很大的数据存储进行拆分,数据切割成数据块
- 获取过程:向
NameNode请求,获取到之前存入文件的块以及块所在的DateNode信息,分别下载,合并
- NameNode(nn):存储文件的元数据,元数据记录了文件的块列表以及块所在
DateNode节点信息 - SecondaryNameNode(2nn):辅助NameNode更好的工作,用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据快照
- DateNode(dn):在本地文件系统存储文件块数据,以及块数据的校验
注意:NN,2NN,DN这些既是角色名称,进程名称,代指电脑节点名称
Hadoop MapReduce
一个分布式的离线并行计算框架
- 拆解任务、分散处理、汇整结果
-
MapReduce计算 =Map阶段 +Reduce阶段-
Map阶段就是“分”的阶段,并行处理输入数据; -
Reduce阶段就是“合”的阶段,对Map阶段结果进行汇总
-
-
每个节点负责一个切片工作,
image.png
Hadoop YARN
作业调度与集群资源管理的框架
image.png
- ResourceManager(rm):处理客户端请求、启动/监控
ApplicationMaster、监控NodeManager、资源分配与调度 - NodeManager(nm):单个节点上的资源管理、处理来自
ResourceManager的命令、处理来自ApplicationMaster的命令; - ApplicationMaster(am):数据切分、为应用程序申请资源,并分配给内部任务、任务监控与容错
- Container:对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息。
Hadoop Common
支持其他模块的工具模块(Configuration、RPC、序列化机制、日志操作)
Apache Hadoop 完全分布式集群搭建
- Hadoop框架是
Java编写,需要JVM环境 - JDK版本:JDK8
- Hadoop搭建方式:
- 单机模式:单节点模式,非集群,生产不会使用这种方式
- 单机伪分布式模式:单节点,多线程模拟集群的效果,生产不会使用这种方式
- 完全分布式模式:多台节点,真正的分布式Hadoop集群的搭建(生产环境建议使用这种方式)
集群规划
| 框架 | os1 | os2 | os3 |
|---|---|---|---|
| HDFS | NameNode、DataNode | DataNode | SecondaryNameNode、DataNode |
| YARN | NodeManager | ResourceManager、NodeManager | NodeManager |
Hadoop目录
├── bin # 对Hadoop进行操作的相关命令,如hadoop,hdfs等
├── etc # Hadoop的配置文件目录,入hdfs-site.xml,core-site.xml等
├── include
├── lib # Hadoop本地库(解压缩的依赖)
├── libexec
├── LICENSE.txt
├── NOTICE.txt
├── README.txt
├── sbin # 存放的是Hadoop集群启动停止相关脚本,命令
└── share # Hadoop的一些jar,官方案例jar,文档等
集群配置[1]
Hadoop集群配置 = HDFS集群配置 + MapReduce集群配置 + Yarn集群配置
HDFS集群配置
- 配置文件在
hadoop/etc/hadoop下 - 配置xml时,配置内容放在
<configuration></configuration>中 - 都先在os1上配置
- 将JDK路径明确配置给HDFS(修改hadoop-env.sh)
vi hadoop-env.sh
export JAVA_HOME=$JAVA_HOME
- 指定NameNode节点以及数据存储目录(修改core-site.xml)
vi core-site.xml
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://os1:9000</value>
</property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/servers/hadoop-2.9.2/data/tmp</value>
</property>
- 指定SecondaryNameNode节点(修改hdfs-site.xml)
vi hdfs-site.xml
<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>os3:50090</value>
</property>
<!--副本数量 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
- 指定DataNode从节点(修改etc/hadoop/slaves文件,每个节点配置信息占一行)
vi slaves
os1
os2
os3
MapReduce集群配置
- 将JDK路径明确配置给MapReduce(修改mapred-env.sh)
vi mapred-env.sh
export JAVA_HOME=$JAVA_HOME
- 指定MapReduce计算框架运行Yarn资源调度框架(修改mapred-site.xml)
mv mapred-site.xml.template mapred-site.xml
vi mapred-site.xml
<!-- 指定MR运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
Yarn集群配置
- 将JDK路径明确配置给Yarn(修改yarn-env.sh)
vi yarn-env.sh
export JAVA_HOME=$JAVA_HOME
- 指定ResourceManager的Master节点所在计算机节点(修改yarn-site.xml)
vi yarn-site.xml
<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>os3</value>
</property>
<!-- Reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
- 指定NodeManager节点(会通过slaves文件内容确定)
- 配置
slave
分发配置
使用rsync远程同步工具
yum install -y rsync
rsync语法
rsync -rvl $pdir/$fname $user@$host:$pdir/$fname
## -r 递归
## -v 显示复制过程
## -l 拷贝符号连接
rsync-script自动化脚本
#!/bin/bash
#1 获取命令输入参数的个数,如果个数为0,直接退出命令
paramnum=$#
if((paramnum==0)); then
echo no params;
exit;
fi
#2 根据传入参数获取文件名称
p1=$1
file_name=`basename $p1`
echo fname=$file_name
#3 获取输入参数的绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 获取用户名称
user=`whoami`
#5 循环执行rsync
for((host=2; host<4; host++)); do
echo ------------------- os$host --------------
rsync -rvl $pdir/$file_name $user@os$host:$pdir
done
使用:rsync-script /opt/servers/hadoop-2.9.2
启动集群
注意:
如果集群是第一次启动,需要在Namenode所在节点格式化NameNode,非第一次不用执行格式化Namenode操作
hadoop namenode -format
单节点启动
- 在os1上启动
NameNode
hadoop-daemon.sh start namenode
- 在os1,os2,os3上启动
DataNode
hadoop-daemon.sh start datanode
Yarn集群单节点启动
- 在os3上启动
ResourceManager
yarn-daemon.sh start resourcemanager
- 在os1,os2,os3启动
NodeManager
yarn-daemon.sh start nodemanager
集群群起
- os1启动dfsh
start-dfs.sh
- os3启动yarn
start-yarn.sh
集群测试
HDFS 分布式存储
- os1上传文件,os2下载
- 使用网页查看:{os1-IP}:50070/explorer.html#/test/input
# 创建hdfs目录
hdfs dfs -mkdir -p /test/input
cd /root
vi test.txt
# os1上传
hdfs dfs -put /root/test.txt /test/input
# os2下载
hdfs dfs -get /root/test.txt /test/input
MapReduce 分布式计算
- 在
HDFS文件系统根目录下面创建一个wcinput文件夹
hdfs dfs -mkdir /wcinput
- 本地
/root下创建一个wc.txt文件
cd /root
vi wc.tx
# 文件中输入
hadoop mapreduce yarn
hdfs hadoop mapreduce
mapreduce yarn os
os
os
- 上传
wc.txt到HDFS目录/wcinput下
hdfs dfs -put wc.txt /wcinput
- 使用
hadoop jar执行wordcount方法
hadoop jar $HADOOP_PATH/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jar wordcount /wcinput
/wcoutput
- 查看结果
hdfs dfs -cat /wcoutput/part-r-00000
- 使用网页查看
{os3-IP}:8088/cluster
配置历史服务器
- 配置
mapred-site.xml
# vi mapred-site.xml
<property>
<name>mapreduce.jobhistory.address</name>
<value>os1:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>os1:19888</value>
</property>
- 分发到其他节点
rsync-script mapred-site.xml
- 启动历史服务器
mr-jobhistory-daemon.sh start historyserver
- 查看JobHistory
http://os1:19888/jobhistory
配置日志的聚集
日志聚集
应用(Job)运行完成以后,将应用运行日志信息从各个task汇总上传到HDFS系统上
- 优势:可以方便的查看到程序运行详情,方便开发调试。
- 注意:开启日志聚集功能,需要重新启动
NodeManager、ResourceManager和HistoryManager。
日志聚集开启步骤
- 配置
yarn-site.xml
# vi yarn-site.xml
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
- 分发
yarn-site.xml到集群其它节点
rsync-script yarn-site.xml
- 关闭
NodeManager、ResourceManager和HistoryManager
# os3中
stop-yarn.sh
# os1
mr-jobhistory-daemon.sh stop historyserver
- 启动
NodeManager、ResourceManager和HistoryManager
# os3
start-yarn.sh
mr-jobhistory-daemon.sh start historyserver
- 删除
HDFS上已经存在的输出文件
hdfs dfs -rm -R /wcoutput
- 执行
WordCount程序
hadoop jar $HADOOP_PATH/share/hadoop/mapreduce/hadoopmapreduce-examples-2.9.2.jar wordcount /wcinput /wcoutput
HDFS分布式文件系统
HDFS简介
HDFS (全称:Hadoop Distribute File System,Hadoop 分布式文件系统)是 Hadoop 核心组成,是分布式存储服务。
-
HDFS是分布式文件系统中的一种
HDFS的重要概念
-
HDFS通过统一的命名空间目录树来定位文件 - 是分布式的,集群中的服务器有各自的角色
典型的 Master/Slave 架构
-
HDFS集群往往是一个NameNode(HA架构会有两个NameNode,联邦机制)+多个DataNode组成; -
NameNode是集群的主节点,DataNode是集群的从节点。
分块存储(block机制)
-
HDFS中的文件在物理上是分块存储(block) - 块的大小可以通过配置参数来规定,默认
128M,自动切分
命名空间(NameSpace)
-
HDFS支持传统的层次型文件组织结构。 - 用户或者应用程序可以创建目录,然后将文件保存在这些目录里
- 文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件
-
Namenode负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将会被记录下来 -
HDFS提供给客户单一个抽象目录树,访问形式:hdfs://namenode的hostname:port/test/input,例如:hdfs://os1:9000/test/input
NameNode元数据管理
- 我们把目录结构及文件分块位置信息叫做元数据。
-
NameNode的元数据记录每一个文件所对应的block信息(block的id,以及所在的DataNode节点的信息)
DataNode数据存储
文件的各个block的具体存储管理由 DataNode 节点承担。
一个block会有多个DataNode来存储,DataNode会定时向NameNode来汇报自己持有的block信息
副本机制
为了容错,文件的所有block都会有副本。
每个文件的block大小和副本系数都是可配置的。
应用程序可以指定某个文件的副本数目。
副本系数可以在文件创建的时候指定,也可以在之后改变。
副本数量默认是3个
一次写入,多次读出
-
HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的随机修改。 (支持追加写入,不只支持随机更新) -
HDFS适合用来做大数据分析的底层存储服务,并不适合用来做网盘等应用(修改不方便,延迟大,网络开销大,成本太高)
HDFS 架构

NameNode(nn):HDFS集群的管理者,Master
- 维护管理
HDFS的命名空间(NameSpace) - 维护副本策略
- 记录文件块(
Block)的映射信息 - 负责处理客户端读写请求
DateNode:Slave节点
- 保存实际的数据块
- 负责数据块的读写
Client:客户端
- 上传文件到
HDFS的时候,Client负责将文件切分成Block,然后进行上传
请求·NameNode·交互,获取文件的位置信息 - 读取或写入文件,与
DataNode交互 -
Client可以使用一些命令来管理HDFS或者访问HDFS
HDFS 客户端操作
Shell 命令行操作HDFS
基本语法
# hadoop fs [具体命令]
# 查看命令帮助
hadoop fs -help rm
# 显示目录信息
hadoop fs -ls /
# 在HDFS上创建目录
hadoop fs -mkdir -p /study/bigdata
# 从本地剪切粘贴到HDFS
cat > test1.txt << EOF
1
2
EOF
hadoop fs -moveFromLocal test1.txt /study/bigdata
# 追加一个文件到已经存在的文件末尾
cat > test2.txt << EOF
3
4
EOF
hadoop fs -appendToFile test2.txt /study/bigdata/test1.txt
# 显示文件内容
hadoop fs -cat /study/bigdata/test1.txt
# 统计文件夹的大小信息
hadoop fs -du -s -h /study/bigdata
# 设置HDFS中文件的副本数量,实际副本数量取决于datanode数量
hadoop fs -setrep 10 /study/bigdata/test1.txt
HDFS读写解析

- 客户端通过
Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。 - 挑选一台
DataNode(就近原则,然后随机)服务器,请求读取数据。 -
DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。 - 客户端以
Packet为单位接收,先在本地缓存,然后写入目标文件
HDFS写数据流程

- 客户端通过
Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。 -
NameNode返回是否可以上传。 - 客户端请求第一个
Block上传到哪几个DataNode服务器上。 -
NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。 - 客户端通过
FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。 -
dn1、dn2、dn3逐级应答客户端。 - 客户端开始往
dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个确认队列等待确认。 - 当一个
Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)
NN与2NN
HDFS元数据管理机制
NameNode如何管理和存储元数据?
- 计算机中存储数据2种:内存或者磁盘
- 如果元数据存储磁盘:存储磁盘无法面对客户端对元数据信息的任意的快速低延迟的响应,但是安全性高
- 如果元数据存储内存:可以高效的查询以及快速响应客户端的查询请求,数据保存在内存,如果断点,内存中的数据全部丢失。
- 解决方案:内存+磁盘,
NameNode内存+FsImage的文件(磁盘)
磁盘和内存中元数据如何划分?
- 如果一模一样:
client如果对元数据进行增删改操作,需要保证两个数据的一致性。FsImage文件操作起来效率也不高
解决方案:两个合并成完整数据,NameNode引入了一个edits文件(日志文件:只能追加写入)edits文件记录的是client的增删改操作
元数据管理流程图

- 特点:编辑日志生成快,fsimage恢复快、生成慢
- 2NN辅助NN管理维护元数据,帮助生成fsimage
- 第一阶段:
NameNode启动- 第一次启动
NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。 - 客户端对元数据进行增删改的请求。
-
NameNode记录操作日志,更新滚动日志。 -
NameNode在内存中对数据进行增删改。
- 第一次启动
- 第二阶段:
Secondary NameNode工作-
Secondary NameNode询问NameNode是否需要CheckPoint(检查点)。直接带回NameNode是否执行检查点操作结果。 -
Secondary NameNode请求执行CheckPoint。 -
NameNode滚动正在写的Edits日志。 - 将滚动前的编辑日志和镜像文件拷贝到
Secondary NameNode。 -
Secondary NameNode加载编辑日志和镜像文件到内存,并合并。 - 生成新的镜像文件
fsimage.chkpoint。 - 拷贝
fsimage.chkpoint到NameNode。 -
NameNode将fsimage.chkpoint重新命名成fsimage
-
fsimage与Edits文件解析
- 当使用
NameNode格式化之后,在目录hadoop工作目录/opt/servers/hadoop-2.9.2/data/tmp/dfs/name/current下生成一系列文件
image.png - seen_txid:记录编辑日志最大滚动编号,用于
NN管理edits文件,防止edits文件丢失后导致元数据错误 - VERSION:记录namenode的一些版本号信息,如
CusterId(当前集群唯一标识)、namespaceID(命名空间id)、blookpoolID(块池id)等 - edits_xxx_*文件:编辑日志滚动后生成
- fsimage文件:
namenode中关于元数据的镜像,一般称为检查点,这里包含了HDFS文件系统所有目录以及文件相关信息(Block数量,副本数量,权限等信息)
fsimage文件内容
查看oiv命令
hdfs oiv -p 文件类型 -i 镜像文件 -o 转换后文件输出路径
hdfs oiv -p XML -i fsimage_0000000000000000265 -o
/opt/lagou/servers/fsimage.xml
- 为什么fimage没有记录
datenode的信息?- 在内存元数据中记录
datanode信息 - 集群启动时,加载fsimage和edits文件,都没有块对应的
datanode信息 - 集群启动时有安全模式,安全模式会让
datanode汇报自己的持有的块信息,不全元数据
- 在内存元数据中记录
Edits文件内容‘
查看oev命令
hdfs oev -p 文件类型 -i编辑日志 -o 转换后文件输出路径
-
edits中只记录了更新相关的操作,查询或者下载文件并不会记录在内 -
NN启动时,加载fsimage和未经合并的edits文件,NN通过判断fsimage后编号过滤掉之前已经合并的edits文件(编号小于等于的文件)
checkpoint周期
默认设置:hdfs-default.xml
<!-- 定时一小时 -->
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<!-- 一分钟检查一次操作次数,3当操作次数达到1百万时,SecondaryNameNode执行一次 -->
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description> 1分钟检查一次操作次数</description>
</property >
NN故障处理
- 如果元数据出现丢失损坏如何恢复呢?
- 将
2NN的元数据拷贝到NN的节点下,此种方式会存在元数据的丢失。 - 搭建
HDFS的HA(高可用)集群,解决NN的单点故障问题!!(借助Zookeeper实现HA,一个Active的NameNode,一个是Standby的NameNode)
Hadoop的限额与归档以及集群安全模式
HDFS文件限额配置
数量限额
#创建hdfs文件夹
hdfs dfs -mkdir -p /user/root/test_setting
# 给该文件夹下面设置最多上传两个文件,上传文件,发现只能上传一个文件
hdfs dfsadmin -setQuota 2 /user/root/test_setting
# 清除文件数量限制
hdfs dfsadmin -clrQuota /user/root/test_setting
空间大小限额
# 限制空间大小4KB
hdfs dfsadmin -setSpaceQuota 4k /user/root/test_setting
#上传超过4Kb的文件大小上去提示文件超过限额
hdfs dfs -put /export/softwares/xxx.tar.gz /user/root/test_setting
#清除空间限额
hdfs dfsadmin -clrSpaceQuota /user/root/test_setting
#查看hdfs文件限额数量
hdfs dfs -count -q -h /user/root/test_setting
MapReduce编程框架
MapReduce思想
- 解决大数据量的任务计算、任务处理
-
MapReduce核心思想:大数量计算也是"分而治之",充分利用了并行处理的优势 -
MapReduce任务过程是分为两个处理阶段:-
Map阶段:Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。Map阶段的这些任务可以并行计算,彼此间没有依赖关系。-
Map阶段,编程人员只需关注业务逻辑,无需关心原始数据切分,读取问题,以及任务调度、分配问题;
-
-
Reduce阶段:主要作用是“合”,即对Map阶段的结果进行全局汇总。
-
官方WordCount案例源码解析
* `Reduce`阶段,编程人员无需关心如何获取结果,只需要关心如何汇总数据
官方WordCount源码分析
- 反编译
hadoop-mapreduce-examples-2.9.2.jar,WordCount.class下WordCount方法 -
TokenizerMapper类,对应Map阶段
image.png -
IntSumReducer类,对应Reduce阶段
image.png - 运行作业
Main方法
image.png
MapReduce计算类基本组成
- Mapper类
- Reducer类
- 运行作业的代码(Driver)
Hadoop序列化
为什么进行序列化?
- 序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结构
为什么Hadoop要选择建立自己的序列化格式而不使用java自带serializable?
- 序列化在分布式程序中非常重要,在Hadoop中,集群中多个节点的进程间的通信是通过
RPC(远程过程调用:Remote Procedure Call)实现;RPC将消息序列化成二进制流发送到远程节点,远程节点再将接收到的二进制数据反序列化为原始的消息,因此RPC往往追求如下特点:- 紧凑:数据更紧凑,能充分利用网络带宽资源
- 快速:序列化和反序列化的性能开销更低
-
Hadoop使用的是自己的序列化格式Writable,它比java的序列化serialization更紧凑速度更快。一个对象使用Serializable序列化后,会携带很多额外信息比如校验信息,Header,继承体系等。
MapReduce编程规范及示例编写
Mapper类
- 用户自定义一个
Mapper类继承Hadoop的Mapper类 -
Mapper的输入数据是KV对的形式(类型可以自定义) -
Map阶段的业务逻辑定义在map()方法中 -
Mapper的输出数据是KV对的形式(类型可以自定义)
注意:
map()方法是对输入的一个KV对调用一次
Reducer类
- 用户自定义
Reducer类要继承Hadoop的Reducer类 -
Reducer的输入数据类型对应Mapper的输出数据类型(KV对) -
Reducer的业务逻辑写在reduce()方法中
注意:
Reduce()方法是对相同K的一组KV对调用执行一次
Driver阶段
- 创建提交
YARN集群运行的Job对象 - 其中封装了
MapReduce程序运行所需要的相关参数入输入数据路径,输出数据路径等,也相当于是一个YARN集群的客户端 - 主要作用就是提交我们MapReduce程序运行。
------ 跳过
MapReduce原理分析
MapTask运行机制详解
-
沙盒机制
image.png - 首先,读取数据组件
InputFormat(默认TextInputFormat)会通过getSplits方法对输入文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一;
- 首先,读取数据组件
- 将输入文件切分为
splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容;
- 将输入文件切分为
- 读取
split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次
- 读取
-
map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
-
-
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集
map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
- 环形缓冲区其实是一个数组,数组中存放着
key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。 - 缓冲区是有大小限制,默认是
100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集
-
- 当溢写线程启动后,需要对这80MB空间内的
key做排序(Sort)。排序是MapReduce模型默认的行为,排序算法使用quicksort
- 如果
job设置过Combiner,这时会对所有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用 -
Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景
- 当溢写线程启动后,需要对这80MB空间内的
- 合并溢写文件,每次溢写会在磁盘上生成一个临时文件,当整个数据处理结束之后开始对磁盘中的临时文件进行
merge合并,因为最终的文件只有一个,合并后进行归并排序,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量
- 合并溢写文件,每次溢写会在磁盘上生成一个临时文件,当整个数据处理结束之后开始对磁盘中的临时文件进行
注意:
Map阶段的所有排序都是对key进行的
进入缓冲区,按key的hashcode进行分区
MapTask的并行度
-
MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度
思考:
MapTask并行任务是否越多越好呢?
答:源码中,默认切片大小128M,但是切片如果在10%增幅下,也当成一个切片;因为MR并行度越高消耗资源也越高,同样的,block是否也会有同样算法?- 哪些因素影响了
MapTask并行度?
MapTask并行度决定机制
- 数据块:Block是HDFS物理上把数据分成一块一块。
- 切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片
进行存储 - 切片大小
切片大小128 = block块大小
- 问题:
a文件300M,b文件100M,2个文件都存入HDFS,并作为某MR输入数据,需要对MR任务进行split,请问MapTask并行度是多少?
a文件:0-127M,128-256M,256-300M
b文件:0-100M
总共4个split,MapTask并行度为4
- 注意:
要移动计算,不要移动数据,移动数据代价高
ReduceTask工作机制

-
Reduce大致分为copy、sort、reduce三个阶段。 -
copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据- 在此过程中会启动两个
merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。
- 在此过程中会启动两个
- 待数据
copy完成之后,开始进行sort阶段,sort阶段主要是执行
finalMerge操作, - 完成之后就是
reduce阶段,调用用户定义的reduce函数进行处理。
ReduceTask详细步骤
-
Copy阶段,简单地拉取数据。-
Reduce进程启动一些数据copy线程(Fetcher) - 通过
HTTP方式请求maptask获取属于自己的文件。
-
-
Merge阶段。- 这里的
merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。 -
Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。 - merge有三种形式:
- 内存到内存;
- 默认不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的
merge。与map端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。
- 默认不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的
- 内存到磁盘;
- 磁盘到磁盘。
- 第二种
merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
- 第二种
- 内存到内存;
- 合并排序
- 把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
- 对排序后的键值对调用
reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中
- 这里的
ReduceTask并行度
MapTask并行度取决于split数量,split大小默认blocksize
-
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,ReduceTask数量的决定是可以直接手动设置:
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);
- 注意:
-
ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致; -
ReduceTask数量不设置默认就是一个,输出文件数量为1个; - 如果数据分布不均匀,可能在
Reduce阶段产生数据倾斜;
数据倾斜 —— 某个ReduceTask处理的数据量远远大于其他节点
-
Shuffle机制
map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle,
*MapTask的map之后,到ReduceTask的reduce之前的数据处理过程称为Shuffle流程
- shuffle:洗牌、发牌——(核心机制:数据分区,排序,分组,
combine,合并等过程)
MapReduce的分区与reduceTask的数量
- 在
MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据 - 如何才能保证相同
key的数据去往同个reduce呢?只需要保证相同key的数据分发到同个分区即可。 - MR程序shuffle机制默认就是这种规则,
MR程序默认使用的HashPartitioner,保证了相同的key去往同个分区
YARN资源调度
主要角色
-
ResourceManager(rm):处理客户端请求、启动/监控ApplicationMaster、监控NodeManager、资源分配与调度; -
NodeManager(nm):单个节点上的资源管理、处理来自ResourceManager的命令、处理来自ApplicationMaster的命令; -
ApplicationMaster(am):数据切分、为应用程序申请资源,并分配给内部任务、任务监控与容错。 -
Container:对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息。
Yarn任务提交(工作机制)

作业提交过程之YARN
- 作业提交
- 第1步:
Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 - 第2步:
Client向RM申请一个作业id。 - 第3步:
RM给Client返回该job资源的提交路径和作业id。 - 第4步:
Client提交jar包、切片信息和配置文件到指定的资源提交路径。 - 第5步:
Client提交完资源后,向RM申请运行MrAppMaster。
- 第1步:
- 作业初始化
- 第6步:当
RM收到Client的请求后,将该job添加到容量调度器中。 - 第7步:某一个空闲的
NM领取到该Job。 - 第8步:该
NM创建Container,并产生MRAppmaster。 - 第9步:下载
Client提交的资源到本地。
- 第6步:当
- 任务分配
- 第10步:
MrAppMaster向RM申请运行多个MapTask任务资源。 - 第11步:
RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
- 第10步:
- 任务运行
- 第12步:
MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。 - 第13步:
MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。 - 第14步:
ReduceTask向MapTask获取相应分区的数据。 - 第15步:程序运行完毕后,
MR会向RM申请注销自己。
- 第12步:
- 进度和状态更新
-
YARN中的任务将其进度和状态返回给应用管理器, 客户端每秒(通过
mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
-
- 作业完成
- 除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用
waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。 - 作业完成之后, 应用管理器和
Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
- 除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用
Yarn调度策略
- Hadoop作业调度器主要有三种:
-
FIFO:按照任务到达的时间顺序 -
Capacity Scheduler(2.9.2版本默认):-
Capacity调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力; - 通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源
- 在一个队列内部,资源的调度是采用的是先进先出
-
-
Fair Scheduler(CDH版本默认使用的调度器)
-
-
集群配置 ↩







