引言
“在古时候,人们用牛来拉重物。当一头牛拉不动一根圆木时,人们从来没有考虑过要想方设法培育
出一种更强壮的牛。
同理,我们也不该想方设法打造什么超级计算机,而应该千方百计综合利用更多计算机来解决问题。”
——葛蕾丝-霍珀(Grace Hopper)
大数据应用案例分析
大数据框架简介
1. 离线部分
-》Hadoop
- HDFS:分布式文件系统,存储海量数据
- MapReduce:分布式计算框架,编程模型
- YARN:一个集群资源管理系统,允许任何分布式程序(不仅仅是MapReduce)
基于Hadoop集群的数据而运行
-》HBase:非关系型数据库
-》Hive:类SQL语句,数据仓库,用于分析
2. 协作框架
-》Zookeeper:分布式协调服务
-》Sqoop:导入和导出数据
-》Flume:日志抓取
-》Oozie:调度框架
3. 实时部分
-》Spark
大数据
1. 数据是以字节来衡量大小
-》1Byte = 8bit
-》1024B = 1M
-》1024M = 1G
-》1024G = 1T
-》1024T = 1P
-》1024P = 1E
2. 数据来源:
-》用户行为数据(推荐系统)
- 搜索习惯,关键字
- 消费记录,支付宝
-》业务数据
- 公司内部产生的数据
-》爬虫技术采集
- python,java语言实现对网页的抓取
-》生产机器上的日志文件
- 日志文件
3. “大数据胜于好算法”:
-》对于某些应用(譬如根据以往的偏好来推荐电影和音乐),不论算法有多牛,基于小数据的
推荐效果往往都不如基于大量可用数据的一般算法的推荐效果。
4. 现在有了大量数据,我们必须想方设法好好地存储和分析这些数据
Hadoop发展史
2002年,Apache Nutch 网页抓取,数十亿存储瓶颈
2003年,GFS论文
2004年,Nutch开发NDFS,既HDFS前身
2004年,Google发表MapReduce论文
2005年,Nutch应用MR,主要算法转移到MR和NDFS运行
2006年,MapReduce和NDFS从Nutch分离,形成Hadoop作为一个独立Lucene子项目
2006年,Doug Cutting加入Yahoo
2008年2月,Yahoo在1万个内核的集群上部署了Hadoop
2008年4月,Yahoo对1T数据进行排序,耗时209秒,集群有910个节点
2008年11月,Google对1T数据排序用了68秒
2009年5月,Yahoo对1T数据排序用了62秒
初识Hadoop
<a href="http://hadoop.apache.org/" target="_blank" rel="noopener noreferrer">Apache Hadoop 官方网站</a>
概念性的理论
1. hadoop2.x和1.x版本的区别
- Hadoop Common: The common utilities that support the other Hadoop modules.
- Hadoop Distributed File System (HDFS?): A distributed file system that provides high-throughput access to application data.
- Hadoop YARN: A framework for job scheduling and cluster resource management.
- Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.
2. 2.x的版本中添加了Yarn平台
3. hadoop三大发行版本
- Apache apache顶级项目http://hadoop.apache.org/
- CDH cloudera
- HDP hortonworks
4. 分布式:
- 分布储存
- 分布计算
5. 课程中用的是2.7.3的版本
- 常用版本:2.2.0, 2.5.0, 2.6.5, 2.7.3
Hadoop三种运行模式
1. Local (Standalone) Mode 本地模式 开发人员debug调试使用 local把文件存到本地的文件系统中
2. Pseudo-Distributed Mode 伪分布式 开发人员debug调试使用 在本地搭建HDFS,伪分布式,完全分布
3. Fully-Distributed Mode 完全分布式(集群) 生产环境使用 3,5,7,50,如何搭建,!!HA!!高可用性,比如突然有个节点出问题,保证集群还可用
Hadoop环境部署-JDK部分
1. 先修改权限
chown -R beifeng:beifeng /opt/
2. 解压JDK到指定的目录下,目录任意,建议不要装在某个用户主目录下
tar -zxvf jdk-7u67-linux-x64.tar.gz -C /opt/modules/
3. 添加环境变量
修改vi /etc/profile文件,配置jdk环境变量
#JAVA_HOME
export JAVA_HOME=/opt/modules/jdk1.7.0_67
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile生效文件
4. 验证是否配置成功:java -version
jps命令可以查看java 进程
echo $JAVA_HOME
Hadoop伪分布式环境部署-Hadoop部分
1. 解压Hadoop到指定目录下
tar -zxvf hadoop-2.5.0.tar.gz -C /opt/modules/
2. 清理Hadoop的目录,将hadoop/share/doc目录删除,节省磁盘空间,通过这个命令查看df -h
3. 修改hadoop/etc/hadoop/hadoop-env.sh文件
修改hadoop/etc/hadoop/mapred-env.sh文件
修改hadoop/etc/hadoop/yarn-env.sh文件
指定Java安装路径
export JAVA_HOME=/opt/modules/jdk1.7.0_67
4. 注意:hadoop中的四个核心模块对应四个默认配置文件
指定默认的文件系统为HDFS,文件系统的访问入口,namenode所在的机器
9000端口是早期Hadoop 1.x使用的,现在Hadoop 2.x使用的是8020
端口号用于节点直接内部通信,使用RPC通信机制
5. 修改hadoop/etc/hadoop/core-site.xml文件
<property>
<name>fs.defaultFS</name>
<value>hdfs://hostname:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/modules/hadoop-2.7.3/data/tmp</value>
</property>
6. 注意:/tmp表示临时存储目录,系统每次重启会按照脚本预先设置好的删除里面的文件
重新自定义系统生成的文件路径,/tmp会被清空,无法保证数据文件安全性
7. 修改hadoop/etc/hadoop/hdfs-site.xml文件
指定HDFS文件存储的副本数个数,默认是3个,这里是单台机器就设置为1,这个数字要小于datanode的节点数
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
8. 修改hadoop/etc/hadoop/slaves文件
指定从节点的机器位置,添加主机名即可
hostname 比如:bd1.ibeifeng.com
9. 格式化namenode
bin/hdfs namenode -format
10. 启动命令
sbin/hadoop-daemon.sh start namenode
sbin/hadoop-daemon.sh start datanode
11. 查看HDFS外部UI界面
bigdata-04或者IP地址 跟上50070端口号,外部通信http
dfs.namenode.http-address 50070
12. 测试HDFS环境
创建文件夹,HDFS中有用户主目录的概念,和Linux一样
bin/hdfs dfs -mkdir -p ibf_test/iuput
13. 上传文件到HDFS
bin/hdfs dfs -put etc/hadoop/core-site.xml etc/hadoop/hdfs-site.xml /
14. 读取HDFS的文件
bin/hdfs dfs -text /core-site.xml
15. 下载文件到本地(指定下载到哪里,同时可以重命名成get-site.xml)
bin/hdfs dfs -get /core-site.xml /home/beifeng/get-site.xml
HDFS的缺陷
1. HDFS存储的文件是不能够被修改的
2. HDFS不支持多用户并发写入
3. HDFS不适合存储大量小文件
yarn的配置
1. 修改hadoop/etc/hadoop/mapred-site.xml文件
指定mapreduce计算模型运行在yarn上
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
2. 修改hadoop/etc/hadoop/yarn-site.xml文件
指定启动运行mapreduce上的nodemanager的运行服务
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
3. 指定resourcemanager主节点机器,可选项,不一定要配置,默认是本机,但是指定了之后在其他机器上启动,就会报错
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hostname</value>
</property>
4. 启动yarn
sbin/yarn-daemon.sh start resourcemanager
sbin/yarn-daemon.sh start nodemanager
5. 查看yarn外部web界面
bigdata-04或者IP地址 跟上8088端口号,外部通信http
6. 测试环境,运行一个mapreduce,wordcount单词统计案例
一个mapreduce分为五个阶段
input -> map() -> shuffle -> reduce() -> output
步骤:将mapreduce运行在yarn上,需要打jar包
新建一个数据文件,用于测试mapreduce
将数据文件从本地上传到HDFS
bin/hdfs dfs -put /opt/datas/1.txt /user/beifeng/iuput/
使用官方提供的示例jar包:share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar
7. 运行
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /user/beifeng/1.txt /user/beifeng/output
application_1500824570525_0001
0001表示第一个job
1500824570525表示Unixtime(格林威治时间)
HDFS架构
1. 数据块block
2. 每个块默认大小:128MB,大小可以用户自定义修改
3. 如果要修改就写到hdfs-site.xml中
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>
The default block size for new files, in bytes.
You can use the following suffix (case insensitive):
k(kilo), m(mega), g(giga), t(tera), p(peta), e(exa) to specify the size (such as 128k, 512m, 1g, etc.),
Or provide complete size in bytes (such as 134217728 for 128 MB).
</description>
</property>
4. 500MB,默认大小:128MB
- 128MB
- 128MB
— 128MB
- 128MB(12MB)
5. 如果一个文件的大小小于块的大小,是不会占据整个块的空间的
6. 存储模式:
首先,HDFS默认会分块,大小是128M(这个值是可以设置的)
那么HDFS上面的块的大小可以大于128吗,答案是肯定的。
有不同的方式去设定
1)通过HDFS的API的create方法,可以指定创建的文件块的大小(可以任意大小)
2)hive当中也可以在hive-site当中设定,hive输出的块的大小(可以大于128M)
3)也会有其他的方法,这里就不一一列举
但是,当我储存一个129MB的文件的时候,存了几块!?
存储了2块!
第一块128M,第二块1M
计算数据:
HDFS上的文件进行mapreduce计算,默认情况下一个map当中会有128M(和块大小一样)的数据输入
所以这里就涉及到我的一个129M的文件会启动几个Map任务来操作
答案是:1个
mapreduce有这样的机制,最后一个文件的输入如果小于128*1.1(其实就是可以多10%)
那么只会启动一个Map来执行这个job,避免了第一个Map跑了128M的数据,第二个Map只跑了1M的数据的尴尬
这种情况只会在最后一块出现
再举个例子,比如522M的文件,分成几个Map来处理呢?
第一个map-》128M
第二个map-》128M
第三个map-》128M
第四个map-》138M ——》138小于128*1.1,所以这里就不会再开启一个map来处理最后剩余的那10M的数据
直接在最后一个map当中把所有138M的数据输入!!谨记
HDFS不适合存储大量的小文件
可以考虑合并大文件,效果不明显
阿里巴巴开源了TFS淘宝文件系统,参考了HDFS
7. 保证数据安全性机制
副本数
一份文件写多份备份,写到不同机器节点上
文件切分成块之后,对于每个块的备份
8. 放置策略
第一个block块的副本,如果client客户端在集群中的某台机器,那么第一个就放在这台
如果client不在集群中,那么第一个块就随机放置
第二个block块的副本,会放置在和第一个不同的机架的node节点上,随机的
第三个block块的副本,会放置在和第二个相同机架的不同的node节点上,随机的
其他的随便放
负载均衡,均匀分布
数据块的扫描机制
HDFS文件生成key,定期检查,生成KEY,如果块被损坏,当你执行操作的时候就会报错
块的修复(需要人工参与)
把这个块所在的机器节点停掉(有可能是磁盘坏了,或者磁盘满了,也有可能是进程原因)
HDFS上的节点
主节点:namenode
管理元数据
文件属性
名称
位置
权限
数据块
....
元数据是存储在namenode内存中
元数据在本地也有备份,fsimage镜像文件
namenode在启动的时候会去读取加载fsimage镜像文件
edits称作编辑日志文件,用于记录用户对于HDFS所有的行为操作
namenode在启动的时候还会去读取加载edits编辑日志文件
edits越来越大,考虑将fsimage和edits合并
secondarynamenode进行合并,功能
合并文件
减少下一次namenode启动时间
namenode在重新启动之后会读取新的合并的文件
生成新的fsimage镜像文件和edits编辑日志文件
原来的初始化的两个文件就没有用了
配置
修改hdfs-site.xml,指定机器以及外部交互端口号
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>class-bigdata01.ibeifeng.com:50090</value>
</property>
-》启动
sbin/hadoop-daemon.sh start secondarynamenode
从节点:datanode
物理磁盘存储数据的
会和nodemanager部署在一起,通过slaves配置文件来指定
注意:HDFS数据块,存储在linux的/opt/modules/hadoop-2.5.0/data/tmp/dfs/data/current/BP-275988769-192.168.163.104-1504420277120/current/finalized/路径下
优化配置(在hdfs-site中)
1. 单独指定fsimage文件存放的路径
注意:你可以自己选择路径,也可以不修改使用默认的
<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/name</value>
<description>Determines where on the local filesystem the DFS name node
should store the name table(fsimage). If this is a comma-delimited list
of directories then the name table is replicated in all of the
directories, for redundancy. </description>
</property>
2. 单独指定edits文件的路径
<property>
<name>dfs.namenode.edits.dir</name>
<value>${dfs.namenode.name.dir}</value>
<description>Determines where on the local filesystem the DFS name node
should store the transaction (edits) file. If this is a comma-delimited list
of directories then the transaction file is replicated in all of the
directories, for redundancy. Default value is same as dfs.namenode.name.dir
</description>
</property>
3. 指定datanode数据本地路径
<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data</value>
<description>Determines where on the local filesystem an DFS data node
should store its blocks. If this is a comma-delimited
list of directories, then data will be stored in all named
directories, typically on different devices.
Directories that do not exist are ignored.
</description>
</property>
MapReduce historyserver
1. 历史服务器:查看已经运行完成的应用记录
修改mapred-site.xml
指定historyserver的地址,内部和外部通信端口号,如果不指定默认是本机
historyserver是一个轻量级的服务,可以部署在任意一台节点上
2. 配置(在mapred-site.xml中):
注意:这里的端口号不能随便修改
<property>
<name>mapreduce.jobhistory.address</name>
<value>bigdata-04:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>bigdata-04:19888</value>
</property>
日志聚合功能
1. 修改yarn-site.xml
指定开启聚合功能
指定日志存放在HDFS上的时间期限,一般建议3-7天左右,存放在HDFS的/tmp/用户之下
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>106800</value>
</property>
2. 重启yarn进程
3. 指定存放已经完成的Hadoop的作业记录
(接下来的3、4、5的操作可以使用默认的值,不用修改,只是让大家看下,是可以修改的)
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>${yarn.app.mapreduce.am.staging-dir}/history/done</value>
</property>
4. 指定存放的正在运行的Hadoop作业记录
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate</value>
<description></description>
</property>
5. 提交job作业记录的目录位置
<property>
<name>yarn.app.mapreduce.am.staging-dir</name>
<value>/tmp/hadoop-yarn/staging</value>
<description>The staging dir used while submitting jobs.</description>
</property>
HDFS权限检测
1. 取消HDFS权限检测功能
2. 修改hdfs-site.xml
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
常见问题
1. 格式化问题
一般情况格式化一次即可
bin/hdfs namenode -format
生成fsimage
操作HDFS之后生成edits
多次格式化之前,需要清空hadoop.tmp.dir参数下设置的路径下的所有文件和目录
2. 出现错误
一定要学会查看日志文件,查看logs/下的对应进程的.log后缀的文件
【框架名】【用户名】【进程名】【主机名】【文件后缀】
hadoop-beifeng-datanode-bigdata-04.log
查看最新的日志记录,最新时间
3. clusterID
[dfs/name/current]:namenode
clusterID=CID-6172ab83-9c0a-4da0-8d14-d5e14a2c57cd
[dfs/data/current]:datanode
clusterID=CID-6172ab83-9c0a-4da0-8d14-d5e14a2c57cd
集群ID不一致
第一种解决方法:修改VERSION文件,以namenode的ID为准,改为一致即可
第二种解决方法:重新格式化生成
4. pid进程号(注意权限问题)
系统进程每次启动之后会有一个pid编号,每次启动会随机生成
多个用户启动进程,会记录不同用户的pid进程编号
建议不要多个用户混用
如果出现pid进程编号多个的话,直接删除rm /tmp/*.pid
再用同一个用户去启动
5. host主机名与IP不一致
检查core-site
检查/etc/hosts
检查ip
6. 细节问题,能复制的尽量复制,不要手打,容易出错
权限错乱问题解决思路
1. cd /tmp hadoop-root-namenode.pid hadoop-root-datanode.pid 带root的全部删掉
2. 用root身份chown ibeifeng:ibeifeng -R hadoop-2.7.3
3. cd /opt/modules/hadoop-2.7.3
rm -rf logs/
4. cd /opt/modules/hadoop-2.7.3/data(如果前三步操作完,可以成功启动,那就不需要后续步骤)
rm -rf dfs/
注意:如果不想删掉datanode上的数据,就这么做:
1.namenode格式化成功之后,到这个目录(/opt/modules/hadoop-2.7.3/data/dfs/name/current/VERSION)下把clusterID复制出来
2.把datanode的VERSION文件里的clusterID更新成刚刚格式化成功的namenode的clusterID,VERSION文件在这个目录/opt/modules/hadoop-2.7.3/data/dfs/data/current/VERSION
3. bin/hdfs namenode -format
hadoop web界面打开失败的解决方案
1.首先jps,查看相应的进程有没有启动(namenode,resourcemanager)
2.其次,如果你用的是hostname:50070 或 hostname:8088,主机名+端口,检查windows的C:\Windows\System32\drivers\etc\hosts文件有没有做网络映射
3.centos里检查防火墙(iptables)是否有关闭
service iptables status 查看防火墙状态
如果没有关闭,service iptables stop
chkconfig iptables off 关闭防火墙启动服务
【SSH免密码登录】
1、ssh-keygen -t rsa
id_rsa -》私钥
id_rsa.pub -》公钥
ssh-copy-id bigdata-01
ssh-copy-id bigdata-02
ssh-copy-id bigdata-03
自己也要给自己发送公钥和私钥
authorized_keys -》将公钥保存到文件中远程拷贝到其他机器上保存
known_hosts -》记录秘钥信息
2、报错解决:
如果没有生效,就删除.ssh目录下所有文件,重新生成
或者直接删除.ssh目录,生成方式ssh-keygen,就会生成.ssh目录,不要使用mkdir
3、配置了SSH之后,就不需要输入密码,直接可以启动多个节点的服务进程(sbin/start-dfs.sh)
【本地库无法加载的警告】
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
这个版本的压缩包被其他用户编译了,但是编译者的编译环境和我们自己的环境有不一样
解决方法:
下载官方的源码包
hadoop-2.5.0-src.tar.gz
1、编译Hadoop源码
2、替换hadoop/lib下的native本地库文件
3、将编译好的native拷贝到lib下
4、通过bin/hadoop checknative检验是否成功
【搭建eclipse环境(Linux)】
1、安装配置maven
2、解压
3、修改/etc/profile添加环境变量
#MAVEN_HOME
export MAVEN_HOME=/opt/modules/apache-maven-3.0.5
export PATH=$PATH:$MAVEN_HOME/bin
4、验证maven是否生效
mvn -version
5、解压eclipse
在虚拟机图形界面中执行启动eclipse
./eclipse
6、创建一个.m2的文件夹用于存放settings.xml和maven仓库文件夹
【搭建eclipse环境(windows)】
【Mapreduce & yarn】
特点:
1、分布式并行计算
2、主要核心功能:排序,默认的排序方式是按照key进行排序
概念定义:
1、MapReduce执行流程涉及到Client、ResourceManager、NodeManager、ApplicationMaster、Container、Task
2、其中Client是提交Mapreduce的机器
3、ApplicationMaster是负责该Job调度的进程,一个job一个applicationMaster
4、Container是资源表示形式
5、Task是运行在NodeManager上的进程,使用到资源就是Container
6、resourcemanager是管理整个集群的资源
7、nodemanager是单个节点的资源管理
提交流程:
1、Clinet向RM申请资源,RM上有所有NM的节点资源信息,RM将资源信息(NM的hostname、以及分配的内存和CPU大小)发送给Client
2、Client根据请求到资源信息发送到对应的NM,NM中产生Container对象,然后在Container对象中调用相关代码,启动AM
3、AM开始获取job相关设置信息,获得得到map task数量(由InputFormat的getSplits方法决定)和reduce task数量(由参数mapreduce.job.reduces影响)
4、然后AM向RM申请Map Task运行的资源(一个task就需要申请一个container),RM将分配的资源发送给AM,AM远程调用NM的相关方法启动对应的Container,并在Container中启动对应的Map Task
5、当一个Map Task执行完成后,会通知AM进程,当前Map Task执行完成;当总Map Task中有5%执行完成,AM向RM申请reduce task运行资源(一个task需要一个container)
6、RM将资源信息发送给AM,AM在对应的NM节点启动对应的Container,并在Container中运行对应的reduce task任务
7、当reduce task任务执行完成后,会AM进程,当所有的reduce task执行完成,AM通知client表示程序执行完成
疑问一:map task在哪儿运行,是由谁来决定的?
这个东西是由RM返回给AM资源的时候决定的
RM上会有全部CPU和内存,已使用CPU和内存,RM会根据底层写好的算法,返回NM的信息
所以说如果一个NM上只有一个Map Task任务,一个maptask默认使用1核cpu,使用1G内存,那这个Map Task任务只处理该task本身的数据,也就是说该任务处理的数据可以是当前节点的,也可以是其它节点上的数据
RM在分配资源的时候,会尽可能的将Map Task所运行的资源(Container所在机器的NM的hostname等)的放到数据节点上,这样的话AM在启动RM的时候就会在数据节点上启动处理该数据的task任务,该机制叫做mapreduce的数据本地化机制
但是如果资源不够,就会选择其他机器,所以主要还是使用资源来做选择哪台机器执行Task的判断)
疑问二:怎么设定map和reduce的数量?
MapReduce有两套API,org.apache.hadoop.mapred(旧API)和org.apache.hadoop.mapreduce(新API),开发时基本用新的API
map和reduce的数量受两个操作的影响,split和partition,一个split就是对应一个maptask,一个partition对应一个reduce数据输入
控制map数量:
新版本mapreduce的textInputFormat使用参数:mapreduce.input.fileinputformat.split.maxsize和mapreduce.input.fileinputformat.split.minsize来控制split也就是map的数量
公式:split_size = max(minsize,min(maxsize,blocksize))
默认maxsize为Long.MaxValue,minsize为0,所以默认map大小等于blocksize,也就是128M
如果要增多map数量,就将maxsize的值设置比blocksize小
如果要减少map数量,就将minsize的值设置比blocksize大
提交job:
hadoop jar XXXX.jar wordcont -Dmapreduce.input.fileinputformat.split.maxsize=xxx -Dmapreduce.input.fileinputformat.split.minsize=xxx
或者通过api设置
FileInputFormat.setMaxInputSplitSize(job, 20971520l);
FileInputFormat.setMinInputSplitSize(job, 1000);
mapreduce流程
1、输入(HDFS、hbase、mysql...)
2、map(数据的切分和清洗)
3、shuffle
4、reduce
5、输出(HDFS、hbase、hive、MySQL....)
两套API(新旧):
org.apache.hadoop.mapred -》 Hadoop 1.x
org.apache.hadoop.mapreduce -》 Hadoop 2.x
数据格式:
input:
hadoop hive
<0,hadoop hive>
map:
<hadoop,1>
<hive,1>
<hadoop,1>
<hadoop,1>
<hive,1>
<hadoop,1>
shuffle:
<key,list(values)>
<hadoop,List(1,1,1,1)>
<hive,List(1,1)>
reduce:
<hadoop,4>
<hive,2>
intpu<k1,v1> --> map<k2,v2> -->shuffle<k2,list<v2,v2>> -->reduce<k3,v3> -->output
【MR输入和输出类】
输入:InputFormat
TextInputFormat
SequenceFileInputFormat
DBInputFormat
KeyValueTextInputFormat
输出:OutputFormat
TextOutputFormat(LineRecordWriter)
SequenceFileOutputFormat、DBOutputFormat
因为离线分析,一般来说都是处理日志,以行分割的数据,所以用默认的TextInputFormat就可以,有特殊需求自己写类就行
【MR shuffle】
1、分为map端shuffle和reduce端shuffle
2、发生的阶段是map的输出到达reduce输入之前的中间阶段
3、wordcount 单词统计
4、输入input hadoop ,spark
<0,hadoop spark>
5、map()
<hadoop,1>,<spark,1>
6、map output -》 shuffle
7、数据往内存中写入,环形缓冲区
8、内存默认大小:100MB,用户自定义大小
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>100</value>
<description>The total amount of buffer memory to use while sorting
files, in megabytes. By default, gives each merge stream 1MB, which
should minimize seeks.</description>
</property>
当map()被调用次数过多的时候,代表内存中的空间会占用越来越大
当内存中的空间达到了80%,默认是80%,用户可以自定义
当达到80%就会触发spill溢写操作
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
<description>The soft limit in the serialization buffer. Once reached, a
thread will begin to spill the contents to disk in the background. Note that
collection will not block if this threshold is exceeded while a spill is
already in progress, so spills may be larger than this threshold when it is
set to less than .5</description>
</property>
9、溢写阶段
溢写会单独开启一个线程(将数据写入磁盘中)
溢写过程中map可以继续往剩余的内存空间中继续写入,互不影响
如果写入的速度快于溢写的速度,那么会造成阻塞,就会等待溢写完成
10、分区partition
默认hashpartiton算法,哈希值取余的方式
java当中的各种运算符见:http://blog.csdn.net/is_zhoufeng/article/details/8112199
1、reduce数目设置
时间是检验真理的唯一标准
reduce的数目设置影响运行总时间
假设:
20个reduce运行时间10分钟
10个reduce运行时间11分钟
选择数目少的,尽量的减少集群资源开销
2、分区的同时有排序sort
对于分区进行排序,这个阶段还是处于内存中的
溢写到本地磁盘,在本地会形成一个溢写文件
如果数据量很大的情况下,就会有多个溢写文件
合并merge,将溢写文件做一个合并
算法:归并方式
在map中的shuffle甚至可以有combiner
<property>
<name>mapreduce.cluster.local.dir</name>
<value>${hadoop.tmp.dir}/mapred/local</value>
<description>The local directory where MapReduce stores intermediate
data files. May be a comma-separated list of
directories on different devices in order to spread disk i/o.
Directories that do not exist are ignored.
</description>
</property>
=======================map端shuffle==============================
=======================reduce端shuffle===========================
APPmaster通知reduce进行拷贝数据
reduce开启线程去对应的map上拷贝数据,通过http网络方式进行传输
reduce将数据写入内存中
做sort排序,分组group,将相同key的value放在一起
<hadoop,list(1,1,1)>
reduce input
【本地程序打jar运行在yarn】
注意:程序必须指定主类
jar需要上传到Linux上,然后通过命令行运行
输入和输出路径在程序中指定,也可以在命令中指定
【MR程序的优化】
combiner,属于map的shuffle阶段
在map端设置combiner可以优化reduce输入的数据量
最主要的作用:
减轻了网络和磁盘IO的开销
<hadoop,1><hadoop,1><hadoop,1><hadoop,1><hadoop,1><hadoop,1><hadoop,1><hadoop,1>
<hadoop,10>
combiner并不是所有的MR程序都适合
combiner虽然是可选项但是默认情况下是不启用的
适合场景:
不能更改最终的计算结果
适合做累加的、最大值、最小值等
combiner和reduce区别:
combiner对象是对于单个map来说的,只是处理单台机器生成的数据
reduce对象是对于多个map来说的,所以如果有聚合,reduce阶段是不可避免的
combiner获取的数据
<hadoop,list(1,1,1)>
combiner输出的数据
<hadoop,3>
【克隆虚拟机】
-》克隆之前需要关闭所有的服务进程
-》然后关闭虚拟机
-》选择虚拟机【管理】【克隆】
-》选择完整克隆
克隆虚拟机或直接拷贝磁盘文件
## 稍后安装操作系统
## 使用现有虚拟磁盘
## 生成新的MAC地址
## 调整网络地址
观察这个配置文件信息
$ vi /etc/sysconfig/network-scripts/ifcfg-eth0
修改mac地址
$ vi /etc/udev/rules.d/70-persistent-net.rules
具体流程看word文档(克隆虚拟机)
完全分布式安装
1、
角色分配
组件 linux01 linux02 linux03
HDFS namenode Secondarynamenode
HDFS datanode datanode datanode
Yarn resourcemanger
Yarn nodemanager nodemanager nodemanager
history historyserver
2、环境准备
2.1系统和软件【3台】
CentOS 6.5 hadoop 2.5.0 jdk1.70—67
2.2 配置IP和DNS(root)
配置静态IP
DNS
//检查主机映射
$ cat /etc/hosts
//检查主机名
$cat /etc/sysconfig/network
//检查IP和DNS
$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
2.3关闭防火墙 (3台) (root)
# service iptables stop
# chkconfig iptables off
关闭Linux安全子系统
# vi /etc/sysconfig/selinux
2.4创建相同普通用户名和密码 【3台】
# useradd hadoop
# echo 123456 | passwd --stdin hadoop
2.5配置主机映射 【三台都需要需要添加】
# vi /etc/hosts
192.168.7.9 hadoop.senior01
192.168.7.10 hadoop.senior02
192.168.7.11 hadoop.senior03
2.6卸载自带的jdk
# rpm -qa | grep jdk
# rpm -e --nodeps tzdata-java-2012j-1.el6.noarch
# rpm -e --nodeps java-1.6.0-openjdk-1.6.0.0-1.50.1.11.5.el6_3.x86_64
# rpm -e --nodeps java-1.7.0-openjdk-1.7.0.9-2.3.4.1.el6_3.x86_64
3、配置NTP服务
*.把Linux01作为整个集群的时间同步服务器
*.集群中所有其他服务器都来这台服务器Linux01同步时间
1.检查每台服务器所在的时区
$ date -R --检查当前系统时区
Thu, 16 Feb 2017 17:06:18 +0800
# rm -rf /etc/localtime ---如果时区不是+0800
# ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
2.安装ntp服务
# rpm -qa | grep ntp --查看ntp软件包是否已安装
#yum -y install ntp --如果没有那就需要安装ntp
3.修改ntp的配置文件(Linux01)
# vi /etc/ntp.conf
*去掉下面这行下面的#,并把网段修改成自己的网段
restrict 192.168.7.0 mask 255.255.255.0 nomodify notrap
*注释掉一下几行
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
把下面前面两行的#号去掉,如果没有,需要手动去添加
server 127.127.1.0 #local clock
fudge 127.127.1.0 stratum 10
4.启动ntp服务(默认是开启)Linux01
# service ntpd start
# chkconfig ntpd on
5.同步服务器的时间(Linux01)
# ntpdate cn.pool.ntp.org -->操作这一步时关闭ntp服务
16 Feb 17:14:40 ntpdate[26564]: step time server 188.39.37.91 offset -12.669996 sec
6.如果另外两台的ntp的进程开启,那么需要关闭
# service ntpd stop
# chkconfig ntpd off
7.第2、3台向第一台同步时间
# ntpdate hadoop.senior01
16 Feb 17:43:27 ntpdate[2554]: adjust time server 192.168.7.9 offset -0.001412 sec
8.制定周期性时间同步计划任务(第2、3台-Linux02 、Linux03)
## 每10分钟同步一次服务器时间
*/10 * * * * /usr/sbin/ntpdate hadoop.senior01
# date -s "19:05:56 2017/2/16"
4、配置免密钥登陆
使用ssh 登陆的时候不需要用户名和密码
$ ssh-keygen
* 回车,产生的当前主机的公钥和私钥
//分发密钥(要向3台都发送)
$ ssh-copy-id hadoop.senior01
$ ssh-copy-id hadoop.senior02
$ ssh-copy-id hadoop.senior03
(Linux01~03 )
$ ssh-keygen
$ ssh-copy-id hadoop.senior01
$ ssh-copy-id hadoop.senior02
$ ssh-copy-id hadoop.senior03
分发完成会在用户主目录下的.ssh目录生成以下文件:
authorized_keys id_rsa id_rsa.pub known_hosts
如果配置错误可以先删除.ssh目录,重新做一遍
==========================================
3.====core-site.xml====
<!--指定第一台做namenode-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop.senior01:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/modules/hadoop-2.5.0/data</value>
</property>
=========hdfs-site.xml=====
<!-- 分布式副本数设置为3 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- secondarynamenode主机名 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop.senior02:50090</value>
</property>
<!-- namenode的web访问主机名:端口号 -->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop.senior01:50070</value>
</property>
<!-- 关闭权限检查用户或用户组 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
============yarn-site.xml=======
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop.senior02</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>86400</value>
</property>
=========================mapred-site.xml============
$ cp mapred-site.xml.template mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop.senior01:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop.senior01:19888</value>
</property>
HA环境部署
为了能够实时同步Active和Standby两个NameNode的元数据信息(实际上editlog),需提供一个共享存储系统,
可以是NFS、QJM(Quorum Journal Manager)或者Bookeeper,Active Namenode将数据写入共享存储系统,而
Standby监听该系统,一旦发现有新数据写入,则读取这些数据,并加载到自己内存中,以保证自己内存状态与
Active NameNode保持基本一致,如此这般,在紧急情况下standby便可快速切为active namenode。
一、HA集群规划(5台)
bigdata1.ibeifeng.com namenode resourcemanager zkfc
bigdata2.ibeifeng.com namenode resourcemanager zkfc
bigdata3.ibeifeng.com datanode nodemanager zookeeper journalnode
bigdata4.ibeifeng.com datanode nodemanager zookeeper journalnode
bigdata5.ibeifeng.com datanode nodemanager zookeeper journalnode
二、环境准备
1.主机名(5台PC)
2.hosts文件
3.网络配置
4.防火墙
5.设置SSH免登陆
6.时间同步
三、安装部署
1.安装JDK(5台PC都要安装JDK)
配置环境变量
2.安装zk集群
1)安装zk
2)配置zoo.cfg文件
dataDir=/opt/modules/zookeeper-3.4.5/zkdata
server.1=bigdata3.ibeifeng.com:2888:3888
server.2=bigdata4.ibeifeng.com:2888:3888
server.3=bigdata5.ibeifeng.com:2888:3888
3)创建zkdata目录,在zkdata目录下创建myid文件,编辑myid文件内容
就是此台server的zk的id号
4)启动三台zkServer
$ bin/zkServer.sh start ##启动服务
$ bin/zkServer.sh status ##查看状态
3.配置hadoop集群
1)安装hadoop
2)配置环境文件
hadoop-env.sh
mapred-env.sh
yarn-env.sh
export JAVA_HOME=/opt/modules/jdk1.8.0_151
3)配置文件
【core-site.xml】
<!-- 指定hdfs的nameservice为ns1 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns1</value>
</property>
<!-- 指定hadoop临时目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/modules/hadoop-2.7.3/data/tmp</value>
</property>
<!--指定zookeeper运行的节点-->
<property>
<name>ha.zookeeper.quorum</name>
<value>bigdata3.ibeifeng.com:2181,bigdata4.ibeifeng.com:2181,bigdata5.ibeifeng.com:2181</value>
</property>
【hdfs-site.xml】
<!--指定hdfs的nameservice为ns1,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn1</name>
<value>bigdata1.ibeifeng.com:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn1</name>
<value>bigdata1.ibeifeng.com:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn2</name>
<value>bigdata2.ibeifeng.com:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn2</name>
<value>bigdata2.ibeifeng.com:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://bigdata3.ibeifeng.com:8485;bigdata4.ibeifeng.com:8485;bigdata5.ibeifeng.com:8485/ns1</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/modules/hadoop-2.7.3/data/journal</value>
</property>
<!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行
sshfence:当Active出问题后,standby切换成Active,此时,原Active又没有停止服务,这种情况下会被强制杀死进程。
shell(/bin/true):NN Active和它的ZKFC一起挂了,没有人通知ZK,ZK长期没有接到通知,standby要切换,此时,standby调一个shell(脚本内容),这个脚本返回true则切换成功。
-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>
<!-- 使用sshfence隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/[whoami]/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>
<!--开启故障自动转移-->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.ns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
【mapred-site.xml】
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
【yarn-site.xml】
<!-- 指定nodemanager启动时加载server的方式为shuffle server -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--启用resourcemanager ha-->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>rmcluster</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>bigdata1.ibeifeng.com</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>bigdata2.ibeifeng.com</value>
</property>
<!--指定zookeeper集群的地址-->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>bigdata3.ibeifeng.com:2181,bigdata4.ibeifeng.com:2181,bigdata5.ibeifeng.com:2181</value>
</property>
<!--启用自动恢复-->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
【slaves】
bigdata3.ibeifeng.com
bigdata4.ibeifeng.com
bigdata5.ibeifeng.com
4)分发配置文件到其他机器
scp etc/hadoop/core-site.xml etc/hadoop/hdfs-site.xml etc/hadoop/yarn-site.xml etc/hadoop/slaves bigdata2.ibeifeng.com:/opt/modules/hadoop-2.7.3/etc/hadoop
scp etc/hadoop/core-site.xml etc/hadoop/hdfs-site.xml etc/hadoop/yarn-site.xml etc/hadoop/slaves bigdata3.ibeifeng.com:/opt/modules/hadoop-2.7.3/etc/hadoop
scp etc/hadoop/core-site.xml etc/hadoop/hdfs-site.xml etc/hadoop/yarn-site.xml etc/hadoop/slaves bigdata4.ibeifeng.com:/opt/modules/hadoop-2.7.3/etc/hadoop
scp etc/hadoop/core-site.xml etc/hadoop/hdfs-site.xml etc/hadoop/yarn-site.xml etc/hadoop/slaves bigdata5.ibeifeng.com:/opt/modules/hadoop-2.7.3/etc/hadoop
4.启动hadoop集群
##注意:每一个步骤都严格按照以下步骤执行
1)如果有,删除hadoop中的tmp目录和journal目录(bigdata1)
$ rm -rf data/
$ rm -rf logs/
2)启动zk(bigdata3,bigdata4,bigdata5上执行)
$ bin/zkServer.sh start
3)启动journalnode(bigdata3,bigdata4,bigdata5上执行)
$ sbin/hadoop-daemon.sh start journalnode
4)格式化hdfs(bigdata1上执行)
$ bin/hdfs namenode -format ##格式化
5)同步nn1的元数据信息(bigdata2上执行)
$ bin/hdfs namenode -bootstrapStandby ##同步数据
6)格式化ZKFC(bigdata1上执行)
$ bin/hdfs zkfc -formatZK
7)启动hdfs(bigdata1上执行)
$ sbin/start-dfs.sh
8)启动yarn
$ sbin/start-yarn.sh (bigdata1上执行)
$ sbin/yarn-daemon.sh start resourcemanager (bigdata2上执行)
9)查看web
http://bigdata1.ibeifeng.com:50070/ ##standby
http://bigdata2.ibeifeng.com:50070/ ##standby
10)手动切换namenode状态
$ bin/hdfs haadmin -transitionToActive nn1 ##切换成active
$ bin/hdfs haadmin -transitionToStandby nn1 ##切换成standby
11)查看状态
$ bin/hdfs haadmin -getServiceState nn1 #查看nn1状态
$ bin/hdfs haadmin -getServiceState nn2 #查看nn2状态
$ bin/yarn rmadmin -getServiceState rm1 #查看rm1状态
$ bin/yarn rmadmin -getServiceState rm2 #查看rm2状态