为什么要有Hadoop?
从计算机诞生到现今,积累了海量的数据,这些海量的数据有结构化、半结构化、非
结构的数据,并且这些海量的数据存储和检索就成为了一大问题。
我们都知道大数据技术难题在于一个数据复杂性、数据量、大规模的数据计算。
Hadoop就是为了解决这些问题而出现的。
Hadoop的诞生
DougCutting是Lucene的作者,当时Lucene面临和谷歌同样的问题,就是海量的数据存储和检索,于是就诞生了Nutch。
在这之后,谷歌的大牛就为解决这个问题发了三篇论文(GFS、Map-Reduce、BigTable),这三篇论文总体表达的意思就是部署多台廉价的服务器集群,通过分布式的方式将海量数据存储在这个集群上,然后利用集群上的所有机器进行数据计算,这样谷歌就不用买很多很贵的服务器,只需要把普通的机器组合在一起。
Doug Cutting等人就去研究这三篇论文,发现价值巨大,于是Doug Cutting等人在Nutch上实现了GFS和Map-Reduce,使得Nutch的性能飙升。
于是Doug Cutting等人就把这两部分纳入到Hadoop项目中,主要还是为了将Hadoop项目作为一个大数据整体化的解决方案。
所以为什么后面就出现了Hadoop而不是在Nutch上去做整体化大数据解决方案。
这三篇论文对应Hadoop的组件:
GFS-> HDFS 文件系统
Map-Reduce-> MR 计算框架
BigTable-> Hbase 数据库系统
什么是Hadoop?
Hadoop是Apache下的一个分布式系统基础架构,主要是为了解决海量数据存储和海量的数据计算问题。
在这个基础之上发展出了的更多的技术,使得Hadoop称为大数据技术生态圈之一。
Hadoop发行版本
1、Apache版本最原始的版本
2、Clodera版本,在大型互联网企业中用的比较多,软件免费,通过服务收费。
3、Hortonworks文档比较好
特点
高可靠:维护多个副本,假设计算元素和存储出现故障时,可以对失败节点重新分布处理
高扩展:在集群间分配任务数据,可方便的扩展数以千计的节点
高效性:并行工作
高容错:自动保存多个副本,并且能够对失败任务重新分配
Hadoop组成
HDFS:一个高可靠高吞吐量的分布式文件系统
NameNode(nn):存储文件的元数据,如:文件名、文件目录结构等信息
DataNode(dn):在文件系统存储文件块数据,以及数据的校验和,也就是真正存储文件内容的,只是文件大的时候会切割成一小块一小块的。
SecondayNameNode(2nn):用于监控HDFS状态的辅助后台程序,每隔一段时间就获取HDFS的快照,就是备份和监控状态
Yarn:作业调度与集群资源管理框架。(Hadoop2.0加入)
ResourceManager(rm):处理客户端请求、启动和监控MRAppMaster、监控NodeManager,以及资源分配和调度。
NodeManager(nn):单个节点上的资源管理、处理来自ResourceManager的命令,处理来自MRAppMaster的命令。
MRAppMaster:数据切分、为应用程序申请资源,并分配内部任务、任务监控和容错。
Container:对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关信息(hadoop内部文件操作命令和Liunx差不多)
MapReduce:分布式离线并行计算框架。
Map阶段:并行处理数据
Reduce阶段:对Map阶段处理的结果数据进行汇总
Common:支持其他模块的工具模块。
理解Hadoop组成
有一个建筑工地的建造时间很紧急,设立了一个支持小组,支援各个小分队(Common),首先1000包水泥,这些水泥要进行存储(HDFS),假设这些水泥有防水的和不防水的,防水的水泥存到仓库1(HDFS-dn),不防水的存储到仓库2(HDFS-dn),那么就要进行记录,哪些水泥存放到哪里了(HDFS-nn),因为赶工期担心水泥可能会因为潮湿那些问题,出现不可用,所以又准备了1000包水泥,并且每天都要对这些水泥进行检查(HDFS-2nn)。
如果一个小分队要领取水泥就要和工地仓储管理人员申请,仓储管理人员同意了,就要向公司申请人员来搬水泥(Yarn-MRAppMaster),开始调动这些人员搬运水泥(Yarn-rm),小分队领取到了水泥之后,开始决定给修外墙的多少包水泥(Yarn-nm)。
修外墙小组就开始拿着水泥干活了(MapReduce-Map),直到整栋楼的外墙修好了(MapReduce-Reduce),第N栋也是如此(MapReduce-Map)。
Hadoop内为什么要如此划分?
数据存放在Hadoop,那么Hadoop必然需要对数据进行管理,如果没有一个专门管理数据存储的组件或数据运算的组件,全部都融合在一个东西里面就会显得很臃肿,并且组件之间只需要通过接口进行沟通,那么各自的组件就可以仅仅自身的需求做优化等,那么就不会影响到其他的组件。
各自的组件只需要做好自己的事情,对外提供接口接收相应的数据及返回数据,只要符合我组件规范的就运行,不符合就不运行,而不需要关心其他,专心做自己的事情,也可以使得组件之间可以单独的运行。
Hadoop目录
bin:程序级命令(hdfs、Yarn等)
etc:配置文件
include:类库等文件
lib:类库等文件
libexec:类库等文件
sbin:hadoop系统命令(关闭、启动等)
share:官方提供的案例等
Hadoop运行模式
本地模式:不需要启动单独进程,直接运行,一般测试和开发使用,一台机器就可以运行,如果是在Liunx,跑的是本地,可以直接通过命令运行相应的jar包。
伪分布式模式:等同于分布式,但只有一个节点,具有集群的配置信息和运行,由于伪分布式只有一台机器,可以不启动Yarn,那么也就算是Hadoop的HDFS启动了,直接运行MapReduce程序的话,结果都在HDFS上,不在是在本地,如果需要交由YARN上进行资源调度和分配任务,则需要配置Yarn地址,以及指定数据获取方式。
完全分布式模式:多个节点一起运行,可以指定不同节点干不同的活,比如机器1干NameNode的活,机器2干ResourceManger的活。
注意:启动NameNode时,DataNode会记录NameNode信息(id),当缓存的NameNode记录删除了,这个时候启动就会报错,这个时候就需要将NameNode格式化(删除所有数据),之后在重新启动。
HDFS
HDFS是什么?
HDFS就是一个分布式文件存储系统,通过目录树来定位文件,由于分布式特点那么集群中的服务器就有各自的角色。
特点
低成本:由于是众多服务器组成的,那么在某服务器挂了,只需要付出一台廉价的服务器。
高容错性:HDFS是由众多服务器实现的分布式存储,每个文件都会有冗余备份,那么如果存储数据的某个服务器挂了,那么还有备份的数据,允许服务器出现故障。
高吞吐量:HDFS是一次写多次读的访问模型,不允许修改文件,并简化了数据的一致性问题。
就近原则:在数据附近执行程序,也体现出来移动计算比移动数据效率高。
可移植性:HDFS可以实现不同平台之间的移植。
应用场景
一次写入,多次读取,且不支持文件的修改。
适合数据分析场景,不适合网盘应用。
HDFS数据块
HDFS的文件在物理上是分块存储的,1.x版本的数据块默认大小是64MB,2.x版本的数据块默认块大小是128MB,这个值是可以通过配置参数(dfs.blocksize)进行调整的。
HDFS的块比磁盘的块大,目的就在于要减少寻址的开销(标准:寻址时间只占传输时间的1%),如果块设置的够大,从磁盘传输数据的时间明显就大于定位这个块开始位置所需要的文件,因此传输一个由多个块组成的文件的时间取决于磁盘传输速率。
HDFS常用命令(和Liunx差不多)
基本命令:hadoop fs
查看帮助:hadoop fs 或 hadoop fs -help(详情)
创建目录:hadoop fs -mkdir /usr
查看目录信息:hadoop fs -ls /usr
本地剪切,粘贴到集群:hadoop fs -moveFromLocal test.txt /usr/
追加一个文件到已存在文件的末尾:hadoop fs -appendToFile test2.txt /usr/test.txt
显示文件内容:hadoop fs -cat /usr/test.txt
显示一个文件末尾:hadoop fs -tail /usr/ test.txt
以字符形式打印一个文件内容:hadoop fs -text /usr/test.txt
修改文件所属权限(-chgrp、-chomd、chown)(liunx一样用法):hadoop fs -chmod777 /usr/test.txt
从本地复制到hdfs:hadoopfs -copyFormLocal text.txt /usr/test
hdfs复制到本地:hadoop fs -copyToLocal /usr/ text.txt ./
从hdfs路径拷贝到hdfs另一个路径:hadoop fs -cp /usr/dir1 /usr/dir2
在hdfs目录中移动文件:hadoop fs -mv /usr/test.txt /usr/dir
从hdfs下载文件到本地:hadoop fs -get /usr/test.txt ./
合并下载多个文件:hadoop fs -getmerge /usr /*.txt ./result.txt
上传文件等于copyFormLocal:hadoop fs -put test.txt /usr
删除文件或文件夹:hadoop fs -rmr /usr/test.txt
删除空目录:hadoop fs -rmdir /usr/test3
统计文件系统可用空间信息(-h格式化信息):hadoop fs -df -h
统计文件夹大小信息:hadoop fs -du -h /
统计制定目录下的文件节点数据量(嵌套级,当前文件个数,大小):hadoop fs -count -h /usr
设置文件的副本数:hadoop fs -setrep 3 /usr/test.txt
NameNode
NameNode和SecondaryNameNode工作机制
第一阶段:NameNode的工作
1、第一次启动namenode格式化后,创建fsimage和edits文件,如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
2、客户端对元数据进行操作请求
3、NameNode记录操作日志,更新滚动日志。
4、NameNode在内存中对数据进行操作
第二阶段:Secondary NameNode的工作
1、Secondary NameNode询问NameNode是否需要checkpoint,直接带回NameNode检查结果。
2、Secondary NameNode请求执行checkpoint
3、NameNode滚动正在写的eits日志
4、将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
5、Secondary NameNode加载编辑日志和镜像文件到内存并且合并
6、生成新的镜像文件fsimage.chkpoint
7、拷贝fsimage.chkpoint到NameNode
8、NameNode将fsimage.chkpoint重命名为fsimage
说明
Fsimage文件:HDFS文件系统元数据的一个永久检查点,其中包含HDFS文件系统所有目录和文件,以及node序列化信息。
Edits文件:存放HDFS文件系统的所有更新操作,文件系统客户端执行的所有写操作日志都会记录到edits文件。
Secondary
NameNode:在主NameNode挂了,可以从Secondary NameNode中恢复数据,但是由于同步的条件限制,会出现数据不一致。
DataNode
工作机制
集群安全模式
NameNode启动时,受限将镜像文件加载进去内存,并编辑日志文件中的各项操作,一旦内存中成功建立文件系统元数据镜像,则创建一个新的fsimage文件和一个空的编辑日志。
此时的NameNode开始监听DataNode请求,但此刻,NameNode是运行在安全模式,则此时NameNode文件系统对于客户端来说是只可读。
系统中数据块文件并不是由NameNode维护的,而是以块列表的形式存储在DataNode,在系统正常操作期间,NameNode会在内存中保留所有块位置影像信息。
在安全模式下,各个DataNode会向NameNode发送最新的块列表信息,NameNode了解到足够多的块信息之后,即可高效运行文件系统。
如果满足最小复本条件,NameNode会在30秒后就退出安全模式,最小复本条件指的是整个文件系统中99%的块都满足最小复本级别,在启动一个刚刚格式化的HDFS集群时,因为系统中还没有块,所以NameNode不会进入安全模式。
集群启动完成后自动退出安全模式。
安全模式的应用场景
银行对账、维护。
Java操作HDFS
Demo
public static void main(String[] args) throws IllegalArgumentException, IOException,
InterruptedException, URISyntaxException {
//配置信息
Configurationconfiguration = new Configuration();
//获取文件系统
FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");
//拷贝本地文件到集群
fileSystem.copyFromLocalFile(new Path("e:/hdfs/test.txt"), new Path("/usr/hdfs/test.txt"));
//关闭
fileSystem.close();
}
HDFS数据流
IO流写流程
IO流方式上传文件 (Java)
public void fileUpload() throws IOException, InterruptedException,
URISyntaxException {
//配置
Configurationconfiguration = new Configuration();
//文件系统
FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"),configuration,"levi");
//获取输出流(上传到服务器) - 服务器
FSDataOutputStreamfsDataOutputStream = fileSystem.create(new Path("/usr/hdfs/test03.txt"));
//文件输入流(本地上传)
FileInputStreamfileInputStream = new java.io.FileInputStream(new File("E:/hdfs/test03.txt"));
//流对接
IOUtils.copyBytes(fileInputStream, fsDataOutputStream, configuration);
fsDataOutputStream.hflush();
IOUtils.closeStream(fileInputStream);
IOUtils.closeStream(fsDataOutputStream);
//关闭
fileSystem.close();
}
IO流读流程
IO流方式下载文件 (Java)
public void readFile() throws IOException, InterruptedException,
URISyntaxException {
//配置
Configurationconfiguration = new Configuration();
//文件系统
FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");
//输入流(下载) 服务器
FSDataInputStreamfsDataInputStream = fileSystem.open(new Path("/usr/hdfs/hadoop-2.7.2.tar.gz"));
//输出(本地)
FileOutputStreamfileOutputStream = new FileOutputStream(new File("E:/hdfs/block.txt"));
//流对接
//--- 第一块
/*byte[]buf= newbyte[1024];
for (inti = 0; i <1024*128; i++) {
fsDataInputStream.read(buf);
fileOutputStream.write(buf);
}
//关闭
fsDataInputStream.close();
fileOutputStream.close();*/
//--- 第二块
fsDataInputStream.seek(1024 * 1024 * 128);//定位这个位置开始读
IOUtils.copyBytes(fsDataInputStream, fileOutputStream, 1024);
IOUtils.closeStream(fileOutputStream);
IOUtils.closeStream(fsDataInputStream);
fileSystem.close();
}
副本节点选择
在海量数据的处理中,节点之间的数据传输速率是很重要,特别是在带宽很稀缺的情况下,而节点和节点之间的距离越远,那么必然会影响数据的传输。
在成千的服务器集群中,Hadoop是怎么选择副本节点呢?
低版本Hadoop
第一个副本在客户端所处的节点上,但是如果客户端是在集群外,随机选取一个节点
第二个副本和第一个副本位于不同机架的随机节点上,也就是不和第一个副本在相同机架。
第三个副本和第二个副本位于相同机架,节点随机
Hadoop2.5版本以上
第一个副本在客户端所处节点上。如果客户端在集群外,随机选一个
第二个副本和第一个副本位于相同机架,随机节点
第三个副本位于不同机架,节点随机
HDFS误区
小文件存储
每个文件均按照块存储,每个块的元数据存储在NamNode的内存中(一个文件/目录/文件块一般占有150字节的元数据内存空间),因此Hadoop存储小文件会非常低效,因为大量小文件会耗尽NameNode中大部分内存,但存储小文件所需要的磁盘容量和存储这些文件原始内容所需要的磁盘空间相比也不会增多。
例如:上传一个文件1MB,那么这个文件会在HDFS中的一个块存储着,这个块默认是128MB,那么是不是占用了128MB的磁盘空间呢?
每一个块128MB只是HDFS的逻辑上的划分,所以在磁盘占用空间还是1MB,只有当一个或多个文件在一个块内超过128MB,之后将这个文件进行切割。
副节点处理
HDFS是先把当前这个节点处理完,在去处理副本节点的。
回收站
回收站默认是不启用的,在core-site.xml文件中的配置fs.trash.interval默认是为0.
HDFS全过程
MapReduce
MapReduce是什么?
MapReduce是一个分布式运算程序的编程框架,是用户开发基于Hadoop的数据分析应用的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发的运行在一个Hadoop集群上。
作用
由于硬件资源限制,海量数据无法在单机上处理,单机版程序扩展到集群进行分布式运算,增加程序的复杂度和开发难度。
MapReduce框架就是要使得开发人员开源将绝大部分工作集中在业务逻辑的开发上,而分布式运算的复杂性交由MapReduce来处理。
特点
适合数据复杂度运算
不适合算法复杂度运算
不适合实时计算、流式计算
核心思想
分布式的运算程序最少需要分成两个阶段:
第一个阶段:MapTask并发实例,完全并行运行,互不相干
第二个阶段:ReduceTask并发实例,互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
MapReduce编程模型只能包含一个Map阶段和Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
总结
Map:并行处理任务(运算)。
Reduce:等待相关的所有Map处理完任务,在将任务数据汇总输出。
MRAppMaster:负责整个程序的过程调度和状态协调。
MapReduce进程
一个完整的MapReduce程序在分布式允许时有三类实例进程:
MRAppMaster:负责整个程序的过程调度和状态协调。
MapTask:负责Map阶段的整个数据处理流程。
ReduceTask:负责Reduce阶段的整个数据处理流程。
序列化
序列化就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储(持久化)和网络传输。
而序列化就是Map到Reducer的桥梁。
Java序列化是一个重量级的序列化框架(Serializable),使用这个框架进行序列化后会附带很多额外信息(各种校验信息、header等),不便于网络传输,所以Hadoop自己开发了一套序列化机制(Writable),精确、高效。
Java类型Hadoop
Writable类型
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
stringText
mapMapWritable
arrayArrayWritable
备注:自定义的反序列类中的write方法和read方法中DataOutput和DataInput这两个类所提供的方法中,对应Java类型String的方法,分别是writeUTF()和readUTF()。
实例(统计单词)
public class WordCountMapper extends Mapper<LongWritable, Text, Text,
IntWritable> {
private Text key = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//读取每一行
Stringline = value.toString();
//切割出每一个单词
String []words = line.split("\t");
//将读取到的每一个单词都写出,并且值都为1,因为是在map计算完后到reduce进行汇总,形成Key 多个Value
for (String word : words) {
//每次文件内的读取一行都调用一次map,那样就形成了调用多次map,那样的话就不用创建多个key对象了
this.key.set(word);
context.write(this.key, new IntWritable(1));
}
}
}
public class WorkCountReducer extends Reducer<Text, IntWritable, Text,
IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
//这里就形成 多个值的汇总结果,那么将这个值多个进行汇总后,统一归并到一个key,就形成了一个key对应多个value
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
public class WordCountDriver {
public static void main(String[] args) throws Exception {
//配置
Configurationconfiguration = new Configuration();
//任务运行
Jobjob= Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
//运算类和汇总类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WorkCountReducer.class);
//运算和汇总输入和输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//运算文件的输入和结果输出
FileInputFormat.setInputPaths(job, new Path("E:/hadooptest/mapreduce/input"));
FileOutputFormat.setOutputPath(job, new Path("E:/hadooptest/mapreduce/output"));
//提交
job.submit();
//等待
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
程序流程分析
1、MapReduce程序读取输入目录存放的相应文件。
2、客户端在submit方法执行之前,获取到待处理的数据信息,让后根据急群众参数配置形成一个任务分配规划。
1、建立连接
2、创建提交任务的代理(本地:LocalRunner、远程:YarnRunner)
3、创建给集群提交数据的stag路径
4、获取到任务id,并创建任务路径
5、获取到任务jar包,拷贝jar包到集群(这个jar就是程序运行的业务代码)
6、计算切片,生成切片规划文件
computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128MB
7、提交任务,返回提交状态
3、客户端提交job.split、jar包、job.xml等文件给Yarn,Yarn中的resourcemanager启动MRAppMater。
4、MRAppMater启动后根据job的描述信息,计算出需要的MapTask实例数量,然后向集群申请机器,启动相应数量的Map Task进程。
5、MapTask利用客户指定的InputFormat来读取数据,形成KV对。
6、MapTask将输入KV对传递给客户定义的map()方法,做逻辑运算。
7、map()运算完毕后将运算结果的KV对,手机到MapTask缓存。
8、MapTask缓存中的KV对按照K分区排序后不断写到磁盘文件。
9、MRAppMaster监控到所有MapTask进程任务完成后,会根据用户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据分区。
10、ReduceTask进程启动后,根据MRAppMaster告知待处理数据所在位置,从N台MapTask运行所在的机器上获取到N个MapTask输出结果文件,并在本地运行重新归并排序,按照相同Key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
11、ReduceTask运算完毕后,调用客户指定的OuputFormat将结果数据输出(文件)到外部存储。
说明:
切片是逻辑上的切片
规划文件就是里面描述了切多少个片,每个片是怎么样的。
数据切片
MapTask的并行任务是否越多越好?并行度是如何决定的?MapTask到底开多少个合适?
1、一个job的map()阶段并行度(MapTask开几个),由客户端在提交job时决定。
2、每一个Split切片分配一个MapTask并行实例处理。
3、默认情况下切片大小=块大小(blocksize)
4、切片时不考虑数据集整体,而是针对每一个文件单独切片(这个是逻辑上的划分)
切片流程
1、获取到数据存储目录
2、找到要便利处理目录下的每一个文件
3、读取第一个文件test.txt(257MB)
1、获取文件大小
2、计算分片大小,每次切片时,都要判断剩下的部分是否大于块大小的1.1倍,大于就在划分一个块切片
切片:
第一块:128MB
第二块:129MB / 128MB = 1.0078125
1.0078125< 1.1 =不在切片,反之继续切
源码:computeSliteSize(Math.max(minSize,Math.max(naxSize,blocksize)));
3、将切片信息写到一个切片规划文件(说明文件)中
4、整个切片的核心过程在于getSplit()方法(看submit()源码)中完成,数据切片只是逻辑上对输入数据进行切片,并不会在磁盘上,将文件切分进行存储。
InputSplit只是记录了分片的元数据信息。比如:起始位置、长度、所在的节点列表等。
注意:块是HDFS上物理存储的数据,切片只是逻辑上的划分。
5、提交切片规划文件(说明文件)到Yarn上,Yarn上的MrAppMaster就根据切片规划文件(说明文件)计算开启的MapTask个数(多少个切片就多少个MapTask)。
FileInputFormat中默认的切片机制
1、简单按照文件内容长度切片
2、切片大小,默认是块大小
3、切片时不考虑数据集整体性,而是逐个文件的单独切片,循环遍历每一个文件。
MaxSize(切片最大值):如果比块大小还小,则会让切片变小。
MinSize(切片最小值):如果比块大小还大,则会让切片变得比块还大。
假设:块大小128MB
MaxSize设为100MB
切片后的存储占块大小100MB
小文件切片处理
如果有大量的小文件,而每一个文件都是一个单独的切片,都会各自交给一个MapTask处理,那么需要开启大量的MapTask,则会产生大量的MapTask,导致处理效率低下。
解决方案
1、在数据处理前端,先把小文件合并成大文件,在上传到HDFS做后续分析
2、如果已经有大量的小文件存在HDFS,使用CombineFileInputFormat进行处理,CombineFileInputFormat的切片逻辑跟TextFileInputFormat不同,他可以将多个小文件逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask。
3、优先满足最小切片大小,不超过最大切片大小的前提下。
文件合并
//-------- 使用提供的自定义类,指定切片大小
job.setInputFormatClass(CombineTextInputFormat.class);
//最大输入切片大小,一个文件的大小是4M就开始切,算法是1.1倍
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
//最小输入切片大小,多个文件合并到了一起,到了2M就切,算法是1.1倍,优先满足最小切片大小
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
备注:在运行日志中查找number of就可以看到了
Shuffle机制
1、在MapReduce中,Map阶段处理的数据如何传递给Reduce阶段额,是MapReduce框架中关机的一个流程,这个流程就叫Shuffle。
2、Shuffle(洗牌、发牌):核心就是数据分区、排序、缓存
3、MapTaks输出处理结果数据,分发给ReduceTask并在分发过程中对数据按照Key进行分区和排序。
Shuffle机制缓存流程图
Shuffle是MapReduce处理流程中一个过程,每一个步骤都是分散在各个MapTask和ReduceTask节点上。
MapReduce详细运行流程
总结
MapReduce详细运行流程图就是一个流水线一般的作业,从左向右过去,而在开发的过程中,需要使用到什么组件,这些组件会起到什么作用,在哪一个时间起作用,都可以在这个图中详细的描述
分区
自定义分区
public classMyPartitionerextends Partitioner<Text, FlowBean>{
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
//拿到手机号码前三位
StringphoneNum = key.toString().substring(0, 3);
//建立5个分区,从0开始
int partition = 4;
//判断
if("135".equals(phoneNum)) {
partition = 0;
}else if("136".equals(phoneNum)) {
partition = 1;
}else if("137".equals(phoneNum)) {
partition = 2;
}else if("138".equals(phoneNum)) {
partition = 3;
}
return partition;
}
}
//设置分区类
//如果没有设置分区,那么则会按照块大小去计算什么时候进行分区
job.setPartitionerClass(MyPartitioner.class);
//设置ReduceTask数量
job.setNumReduceTasks(5);
总结
reduce数量小于分区数量就会报错。
reduce数量是1,那么则所有结果输出到一个文件内,即便配置了分区也不会去跑分区的代码(执行分区)
reduce数量大于分区数量,输出的其他文件为空
分区数量 = reduce数量,按照分区数量输出结果文件数量
分区就是对map的结果数据进行二次处理,从而再去决定是否影响输出的reduce结果输出。
排序
MapTask和ReduceTask均会对数据(按Key排序)进行排序,这个操作属于Hadoop默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
对于MapTask,他会把处理的结果暂时放到一个缓冲区,当缓冲区使用率达到了阈值就对缓冲区的数据进行排序,并将这些有序的数据写到磁盘上,而当数据处理完后,他会对磁盘上所有文件进行一次合并,将这些文件合并成一个有序的文件。
对于ReduceTask,他从每个MapTask远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中,如果磁盘上文件数目达到一定阈值,则进行一次合并,生成一个更大的文件,如果内存文件大小或数目超过阈值,则进行合并后将数据写出到磁盘上,当所有的数据拷贝完毕后,再统一的对内存核磁盘上的所有文件进行一次合并。
自定义排序
public class FlowBean implements WritableComparable<FlowBean>
{
private Long sum;
public FlowBean() {
super();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(sum);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
sum = dataInput.readLong();
}
public Long getSum() {
return sum;
}
public void setSum(Long sum) {
this.sum = sum;
}
@Override
public String toString() {
return sum.toString();
}
@Override
public int compareTo(FlowBean o) {
return this.sum > o.getSum() ? -1 : 1;
}
}
总结
Shullt规定Key是要进行排序的,如果作为Key是必须要实现WritableComparable接口的。
Combiner合并
ReducrTask是接收总的MapTask结果,Combiner在每一个MapTask运行的,对每每个MapTask的结果汇总(局部汇总),将MapTask汇总后之后进行压缩传输,可以减少网络传输量。
但是Combiner的前提是不能影响到最终的业务逻辑,如果是累加求和是没有问题的,如果是求平均值就有问题的。
如:
1、在每一个MapTask进行求平均值之后在ReduceTask再求一次平均值,结果是不一样的。
2、将MapTask的数据全部汇总到ReduceTask之后再求平均值。
这两种结果是不一样的。
自定义Combiner合并
public class WordCountCombiner extends Reducer<Text, IntWritable, Text,
IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Contextcontext)throws IOException, InterruptedException {
int count = 0;
for (IntWritable intWritable : values) {
count += intWritable.get();
}
context.write(key, new IntWritable(count));
}
}
//reduce是接收总的MapTask汇总,combiner在每一个maptask运行的,对每一个maptask汇总
//如:每一个maptask都进行汇总,之后进行压缩传输
job.setCombinerClass(WordCountCombiner.class);
分组
就是对分区排序好的数据,在进行一次合并分类开来,再一次合并的话,就有个比较标识,如果两个数据标识是一样的,就认为是一组数据,最后过滤去重,最终得到有哪些组。
自定义分组
public classOrderGoupingComparatorextends WritableComparator {
protected OrderGoupingComparator() {
super(OrderBean.class, true);
}
@Override
public intcompare(WritableComparablea,WritableComparableb) {
OrderBeanabean = (OrderBean) a;
OrderBeanbbean = (OrderBean) b;
// 将orderId相同的bean都视为一组
return abean.getOrderId().compareTo(bbean.getOrderId());
}
}
//设置Reduce端分组
job.setGroupingComparatorClass(OrderGoupingComparator.class);
//分区
job.setPartitionerClass(OrderPartition.class);
job.setNumReduceTasks(3);
自定义InputFormat
对小文件的输入进行合并处理。
1、设置文件不可切割
2、读取到整个文件,并且整个文件的数据作为value输出给MapTask(将分片传进去去读取后,将读取到的所有分片数据合并给到MapTask)
3、MapTask在对合并后的数据做操作
public class DistriutedCacheMapper extends Mapper<LongWritable, Text, Text,
NullWritable>{
private Map<String, String> map = new HashMap<>();
private Text key = new Text();
@Override
protected void setup(Mapper<LongWritable, Text,
Text, NullWritable>.Context context)
throws IOException, InterruptedException {
//获取缓存文件,这个文件给加载进了hadoop系统了,在缓存中的根,可以直接通过名字调用
BufferedReaderbufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("pd.txt"))));
Stringline;
while(StringUtils.isNotEmpty(line = bufferedReader.readLine())) {
//数据处理
String []strings = line.split("\t");
//将数据放到缓存集合中
map.put(strings[0], strings[1]);
}
bufferedReader.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//读取到order的每一行
String[]strings = value.toString().split("\t");
StringorderId = strings[0];
Stringname = map.get(orderId);
this.key.set(value.toString() + "\t" + name);
context.write(this.key, NullWritable.get());
}
}
自定义OutputFormat
获取到ReduceTask的运行结果,自定义要输出的结果数据和文件
public class FilterRecordWriter extends RecordWriter<Text,
NullWritable>{
private FileSystem fileSystem;
private FSDataOutputStream aCreate;
private FSDataOutputStream oCreate;
publicFilterRecordWriter(TaskAttemptContext job) {
try {
fileSystem= FileSystem.get(job.getConfiguration());
//创建输出文件路径
PathaPath = new Path("E:\\hadooptest\\mapreduce\\a.log");
PathoPath = new Path("E:\\hadooptest\\mapreduce\\o.log");
//创建输出流
aCreate = fileSystem.create(aPath);
oCreate = fileSystem.create(oPath);
}catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
if(key.toString().contains("levi")) {
aCreate.write(key.toString().getBytes());
}else {
oCreate.write(key.toString().getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if(null != aCreate) {
aCreate.close();
}
if(null != oCreate) {
oCreate.close();
}
}
}
//设置输出类为自定义输出类
job.setOutputFormatClass(FliterOutputFormat.class);
//虽然自定义了一个输出,但是还是要输出,因为有个成功状态标识文件要输出,不然会报错
FileOutputFormat.setOutputPath(job, new Path("E:\\hadooptest\\mapreduce\\output"));
计数器
Hadoop为每一个作业维护了若干个内置计算器,以描述多项指标,例如:某些计数器记录已处理的字节数等。
计数器的使用
context.getCounter("counterGroup组名","countera变量"),increment(1);
说明:计数器的结果在程序运行后的控制台日志中可查看
总结
HDFS根据配置在各个节点存储数据,并且存储相应的副本数据。
MapReduce就是在需要执行无论是MapTask或ReduceTask的时候,会先去ResouceManager去询问,任务要在哪里运行,其实ResourceManger就是看要运行这个任务的输入数据在哪个节点,从而去告知这个节点执行任务,那么就形成了直接移动计算,而不是移动数据的方式。
因为数据可能存储在服务器1或服务器2…服务器,那么不需要移动数据,负责执行任务的服务器,到指定的路径,下载要运算的任务jar包,直接在本地运行,那么当数据非常大的时候就不用去移动数据。
YARN
Yarn是什么?
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,在之前说了,可以配置MapReduce在Yarn之上运行,所以MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
Yarn机制
1、不需要知道用户提交的程序运行机制,只要符合Yarn规范的资源请求机制即可使用Yarn,Spark、Storm等运算框架都可以整合在Yarn上运行,意味着与用户程序完全解耦。
2、只提供运算资源的调度,程序向Yarn申请资源,Yarn负责分配资源
3、Yarn总的资源调度是ResourceManager,提供运算资源的角色叫NodeManager。
4、Yarn作为一个通用的资源调度平台,企业以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。
Yarn作业流程
1、客户端将MapReduce程序提交到客户端所在的节点。
2、YarnRunner就向RsourceManager申请一个Application。
3、RsourceManager内部运行一下,看看哪个节点离提交申请节点近,以及系统资源等,内部运行完了,就将应用程序资源路径返回给YarnRunner。
4、程序就将运行程序所需要的资源提交到HDFS上。
5、程序资源提交完后,申请运行MRAppMaster。
6、RsourceManager将用户请求转化为一个task(任务),并寻找最适合的NodeManager,并将任务分配给这个NodeManager。
7、NodeManager领取到任务后,创建容器(Container),并产生MRAppMaster。
8、MRAppMaster向RsourceManager申请运行N个MapTask容器(切片文件中有说明)。
9、RsourceManager又寻找了一下,将MapTask分配给另外两个NodeManager,这两个NodeManager领取到任务,并且创建容器(Container)。
10、RsourceManager告知申请运行MapTask容器的NodeManger,向那两个接受到任务的NodeManager发送程序启动脚本,这两个NodeManger就分别启动MapTask,MapTask对数据进行分区排序。
11、MRAppMaster看程序都跑完了,赶紧申请2个容器,运行ReduceTask。
12、ReduceTask的容器向MapTask容器获取相应分区的数据,并执行任务。
13、程序运行完毕后,MapResource会向RsourceManager注销自己。
Hadoop – HelloWorld
准备
1、三台机器
2、ssh
3、防火墙
配置
JAVA_HOME
hadoop-env.sh
yarn-env.sh
mapred-env.sh
core-site.xml
<!-- 指定HDFS中NameNode的地址-->
fs.defaultFS
hdfs://hadoop-senior00-levi.com:8082
<!-- 指定hadoop运行时产生文件的存储目录-->
hadoop.tmp.dir
/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp
yarn-site.xml
<!-- reducer获取数据的方式-->
yarn.nodemanager.aux-services
mapreduce_shuffle
<!-- 指定YARN的ResourceManager的地址-->
yarn.resourcemanager.hostname
hadoop-senior01-levi.com
<!-- 任务历史服务-->
yarn.log.server.url
http://hadoop-senior00-levi.com:19888/jobhistory/logs/
hdfs-site.xml
<!-- 指定seconddaryNameNode地址,主要这个是避免NameNode挂了 -->
dfs.namenode.secondary.http-address
hadoop-senior02-levi.com:50090
<!-- 指定name.dir,默认就是,但是避免未启用,设置一下-->
dfs.namenode.name.dir
/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp/name
mapred-site.xml
<!-- 指定mr运行在yarn上-->
mapreduce.framework.name
yarn
<!-- 配置 MapReduce JobHistory Server 地址 ,默认端口10020-->
mapreduce.jobhistory.address
hadoop-senior00-levi.com:10020
<!-- 配置 MapReduce JobHistory Server web ui 地址, 默认端口19888-->
mapreduce.jobhistory.webapp.address
hadoop-senior00-levi.com:19888
slaves
hadoop-senior00-levi.com
hadoop-senior01-levi.com
hadoop-senior02-levi.com
Hadoop-HA
Hadoop为什么要有HA?
我们都知道NameNode是存储了所有数据的路径,在Hadoop第一个版本是没有HA,单台的NameNode节点挂了,那么整个数据就没办法访问了,
那个的工程师就自己写一个脚本去解决这个问题,定时的拷贝NameNode的fsimage和edits到别的服务器,但是数据量大的时候,拷贝就很慢了,而且工程师半夜正在和周公下棋的时候,NameNode挂了,那就很尴尬了。
虽然可以到第二天早上来恢复,但是数据量那么大的时候,太慢了,满足不了需求。
所以Hadoop为了解决这个问题,在后面的版本继承了HA(高可用)。
Hadoop-HA是什么?
Hadoop-HA(高可用)就是在一台服务器挂了,第二台服务器可以马上顶上去。
两个基本问题:
1、第一台服务器和第二台服务器的数据必须要同步。
Hadoop-HA通过edits-log的变化,来将数据写入到JournalNode节点里面去,以分享给其他的NameNode。
2、要解决第一台和第二台服务器同时启用的情况,在这种情况下,子节点怎么提交数据,会提交到两台服务器,但是又会出现抢占资源的情况,(给一个人送东西和给两个人送东西所耗费的体力是不言而喻的),
这个问题在Hadoop-HA中称为脑裂,借助第三方框架(Zookeeper)实现隔离机制来解决脑裂这个问题。
Hadoop–HA 实现
NameNode
hdfs-site.xml
dfs.replication
3
dfs.namenode.secondary.http-address
hadoop104:50090
dfs.namenode.checkpoint.period
120
dfs.namenode.name.dir
/opt/module/hadoop-2.7.2/data/tmp/dfs/name
dfs.hosts
/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts
dfs.hosts.exclude
/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude
dfs.nameservices
mycluster
dfs.ha.namenodes.mycluster
nn1,nn2
dfs.namenode.rpc-address.mycluster.nn1
hadoop102:8020
dfs.namenode.rpc-address.mycluster.nn2
hadoop103:8020
dfs.namenode.http-address.mycluster.nn1
hadoop102:50070
dfs.namenode.http-address.mycluster.nn2
hadoop103:50070
dfs.namenode.shared.edits.dir
qjournal://hadoop102:8485;hadoop103:8485;hadoop104:8485/mycluster
dfs.ha.fencing.methods
sshfence
dfs.ha.fencing.ssh.private-key-files
/home/levi/.ssh/id_rsa
dfs.journalnode.edits.dir
/opt/module/hadoop/data/jn
dfs.permissions.enable
false
dfs.client.failover.proxy.provider.mycluster
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
dfs.ha.automatic-failover.enabled
true
core-site.xml
fs.defaultFS
hdfs://hadoop102:8020
-->
fs.defaultFS
hdfs://mycluster
hadoop.tmp.dir
/opt/module/hadoop-2.7.2/data/tmp
fs.trash.interval
1
hadoop.http.staticuser.user
levi
ha.zookeeper.quorum
hadoop102:2181,hadoop103:2181,hadoop104:2181
ResourceManager
hdfs-site.xml
yarn.resourcemanager.hostname
hadoop103
-->
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.log-aggregation-enable
true
yarn.log.server.url
http://hadoop103:19888/jobhistory/logs/
yarn.log-aggregation.retain-seconds
86400
yarn.resourcemanager.ha.enabled
true
yarn.resourcemanager.cluster-id
cluster-yarn1
yarn.resourcemanager.ha.rm-ids
rm1,rm2
yarn.resourcemanager.hostname.rm1
hadoop103
yarn.resourcemanager.hostname.rm2
hadoop104
yarn.resourcemanager.zk-address
hadoop102:2181,hadoop103:2181,hadoop104:2181
yarn.resourcemanager.recovery.enabled
true
yarn.resourcemanager.store.class
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore