随着数据量越来越大, 在 一个操作系统管辖的范围存不下了, 那么就 分配到更多的操作系统管理的磁盘中, 但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。 分布式文件系统是一种允许文件通过网络在多台主机上分享的 文件的系统,可让多机器上的多用户分享文件和存储空间。分布式文件管理系统很多,HDFS 只是其中一种。适用于一次写入、多次查询的情况,不支持并发写情况,小文件不合适。因为小文件也占用一个块,小文件越多(1000个1k文件)块越 多,NameNode压力越大。
基本概念
元数据节点和数据节点
元数据节点(NameNode)的作用是 管理文件目录结构,接受用户的操作请求,是管理数据节点的。名字节点维护两套数据, 一套 是文件 目录与数据块之间的关系 , 另一套 是 数据块与节点之间的关系 。 前一套 数据是 静态的 ,是存放在磁盘上的, 通过fsimage和edits文件来维护 ; 后一套 数据是 动态的 ,不持久放到到磁盘的,每当集群启动的时候,会自动建立这些信息,所以一般都放在内存中。 所以他是整个文件系统的 管理节点。 它维护着整个文件系统的 文件目录树,文件/目录的 元信息和每个文件对应的 数据块列表。接收用户的操作请求 。
文件包括:
① fsimage (文件系统镜像):元数据镜像文件。存储某一时段NameNode内存 元数据信息。
② edits: 操作日志文件。
③ fstime: 保存最近一次checkpoint的时间
以上这些文件是保存在linux的文件系统中
Namenode 特点
<1>是一种允许文件 通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。
<2>通透性。让实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般。
<3>容错。即使系统中有某些节点脱机,整体来说系统仍然可以持续运作而不会有数据损失。
<4>适用于 一次写入、 多次查询的情况,不支持并发写情况,小文件不合适
既然NameNode维护这么多的信息,那么 这些信息都存放在哪里呢?
在hadoop源代码中有个文件叫做 hdfs-site.xml ,有两个配置信息,一个是 dfs.name.dir, 另一个是dfs.name.edits.dir 。这两个文件表示的是 NameNode的核心文件fsimage和edits的存放位置,
我们可以进入linux文件系统
执行命令 cd /usr/local/hadoop/conf,more core-site.xml
可以看出,这 两个文件的存储位置 是在linux文件系统的/usr/local/hadoop/tmp/dfs/name目录下。
NameNode的核心文件 fsimage和 edits的存放在current目录下, 与此同时 name目录下有一个文件 in_use.lock 而查看其内容的时候发现,内容为空,也就是说只能有一个Namenode进程能够访问该目录,读者可以自己试一下,当没有开启hadoop时,该目录下是没有文件 in_use.lock 的,当hadoop启动以后才会生成该文件。
文件 fsimage 是NameNode的核心文件
这个文件非常重要,丢失的话,Namenode无法使用, 那么如何防止该文件丢失而造成不良后果呢。我可以下再次看一下hdfs-default.xml中的一段代码,如下图所示
由其中的描述可知,该变量,决定DFS NameNode 的NameTable(fsimage)应该在本地文件系统上的存储位置。如果这是 一个用逗号分隔的列表的目录,那么nametable,会被复复制到所有的目录中,来冗余(备份来保证数据的安全性)。 如${hadoop.tmp.dir}/dfs/name,~/name2,~/name3,~/name4。那么fsimage会分别复制到~/name1,~/name2,~/name3,~/name4 目录中。所以这些目录一般是在不同的机器,不同的磁盘,不同的文件夹上,总之越分散越好,这样能保证数据的安全性。
看一下edits的描述
查看一下 hdfs-default.xml 中的一段代码,如下图所示,
由其中的描述可知,该变量,决定DFSNameNode的 存储事务文件(edits)在本地文件系统上的位置。 如果这是一个以逗号分隔的目录列表,那么,事务文件会被复制所有的目录中,来冗余。默认值是dfs.name.dir一样。(edit保存事务过程)
HDFS的基本结构之 DataNode
数据节点(datanode)就是负责存储数据文件的。
block: 如果一个文件非常大,比如100GB,那么怎么存储在DataNode中呢?DataNode在存储数据的时候是按照block为单位读写数据的。block是hdfs读写数据的基本单位。
假设文件大小是100GB,从字节位置0开始,每64MB字节划分为一个block,依此类推,可以划分出很多的block。每个block就是64MB大小
由上图可知,类中的属性没有一个是可以存储数据的。 所以block本质上是一个 逻辑概念,意味着block里面不会真正的存储数据,只是划分文件的。
为什么一定要划分为64MB大小呢?
因为这是在默认配置文件中设置的,我们查看 core-default.xml 文件,如下图所示。
上图中的参数ds.block.name指的就是block的大小,值是67 108 864字节,可以换算为64MB。如果我们不希望使用64MB大小,可以在core-site.xml中覆盖该值。注意单位是字节。
目录结构
DataNode是按block来划分文件的
那么划分后的文件到底存放在哪里哪?我们查看文件core-default.xml,如下图所示。
参数 dfs.data.dir的值就是 block存放在linux文件系统中的位置。变量 hadoop.tmp.dir的值 前面已经介绍了,是 /usr/local/hadoop/tmp ,那么 dfs.data.dir 的完整路径是/usr/local/hadoop/tmp/dfs/data
上传一个文件
我们打开另一个Linux终端,上传一个文件 jdk-6u24-linux-i586.bin,文件大小为 84927175k,如下图所示。
然后我们可以在原来终端,查看上传文件,就是在该Linux文件系统的/usr/local/hadoop/tmp/dfs/data目录下, 如下图所示
上图中以 “blk_”开头的文件就是 存储数据的block。这里的命名是有规律的,除了block文件外,还有后 缀是“meta”的文件 ,这是block的源数据文件,存放一些元数据信息。因此,上图中只有2个block文件。
注意:我们从linux 磁盘上传一个完整的文件到hdfs 中,这个文件在linux 是可以看到的,但是上传到hdfs 后,就不会有一个对应的文件存在,而是被划分成很多的block 存在的。
HDFS的基本结构之 SecondaryNode
HA的一个解决方案。但不支持热备。配置即可。由于数据操作越多edits文件膨胀越大,但不能让他无限的膨胀下去,所以要把日志过程转换出来 放到fsimage中。由于NameNode要接受用户的操作请求,必须能够快速响应用户请求,为了保证NameNode的快速响应给用户,所以将此项工 作交给了 SecondaryNode ,所以他也备份一部分fsimage的一部分内容。
执行过程:从NameNode上 下载元数据信息(fsimage,edits),然后把二者合并,生成新的fsimage,在本地保存,并将其推送到NameNode,同时重置NameNode的edits.默认在安装在NameNode节点上,但这样...不安全!
HDFS架构与设计
HDFS 采用Master/Slave的架构来存储数据,这种架构主要由四个部分组成,分别为HDFS Client、NameNode、DataNode和Secondary NameNode。
Client:就是客户端。
1、切分文件:文件上传 HDFS 的时候,Client 将文件切分成 一个一个的Block,然后进行存储。
2、与 NameNode 交互,获取文件的位置信息。
3、与 DataNode 交互,读取或者写入数据。
4、Client 提供一些命令来管理 HDFS,比如启动或者关闭HDFS。
5、Client 可以通过一些命令来访问 HDFS。
NameNode:就是 master,它是一个主管、管理者。
1、管理 HDFS 的名称空间(namespace)。Namenode维护文件系统的namespace,一切对namespace和文件属性进行修改的都会被namenode记录下来
2、管理数据块(Block)映射信息。Namenode全权管理block的复制,它周期性地从集群中的每个Datanode接收心跳包和一个Blockreport。心跳包的接收表示该Datanode节点正常工作,而Blockreport包括了该Datanode上所有的block组成的列表
3、文件系统元数据的持久化。 Namenode在内存中保存着整个文件系统namespace和文件Blockmap的映像。这个关键的元数据设计得很紧凑,因而一个带有4G内存的 Namenode足够支撑海量的文件和目录。当Namenode启动时,它从硬盘中读取Editlog和FsImage,将所有Editlog中的事务作用(apply)在内存中的FsImage ,并将这个新版本的FsImage从内存中flush到硬盘上,然后再truncate这个旧的Editlog,因为这个旧的Editlog的事务都已经作用在FsImage上了。这个过程称为checkpoint。在当前实现中,checkpoint只发生在Namenode启动时,在不久的将来我们将实现支持周期性的checkpoint
3、配置副本策略,
4、处理客户端读写请求。
DataNode:就是Slave,NameNode 下达命令,DataNode 执行实际的操作。
1、存储实际的数据块。
2、执行数据块的读/写操作。
Secondary NameNode:并非 NameNode 的热备份(两个节点同时运行,一个挂掉了切换另一个)。当NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务。
1、辅助 NameNode,分担其工作量。
2、定期合并 fsimage和fsedits,并推送给NameNode。
3、在紧急情况下,可辅助恢复 NameNode。
设计理解
Hadoop 主要由HDFS(Hadoop Distributed File System)和MapReduce 引擎两部分组成。最底部是HDFS,它存储Hadoop 集群中所有存储节点上的文件。
HDFS 可以执行的操作有创建、删除、移动或重命名文件等,架构类似于传统的分级文件系统,HDFS 包括唯一的NameNode,它在HDFS 内部提供元数据服务;DataNode 为HDFS 提供存储块。由于NameNode 是唯一的,这也是HDFS 的一个弱点(单点失败)。一旦NameNode 故障,后果可想而知。
错误检测和快速、自动的恢复是HDFS的核心架构目标
HDFS应用对文件要求的是write-one-read-many访问模型
通信协议: 所有的HDFS通讯协议都是构建在TCP/IP协议上。客户端通过一个可配置的端口连接到Namenode,通过ClientProtocol与 Namenode交互。而Datanode是使用DatanodeProtocol与Namenode交互。从ClientProtocol和 Datanodeprotocol抽象出一个远程调用(RPC),在设计上,Namenode不会主动发起RPC,而是是响应来自客户端和 Datanode 的RPC请求。
HDFS中的读写流程
读文件流程
客户端(client)首先调用FileSystem的open()函数打开它想要打开的文件,对于HDFS来说就是通过DistributedFileSystem实例通过RPC调用请求元数据节点,得到组成文件的前几个数据块信息。对于每一个数据块,元数据节点返回保存数据块的datanode地址(是分批次获取的,每次是几个数据块),这些datanode会按照与客户端的接近度来排序,(如果客户端节点自己就是存放了目标数据块的datanode,就优先从本节点读取)。
DistributedFileSystem返回FSDataInputStream(支持文件seek的输入流)给客户端,客户端就能从流中读取数据了,FSDataInputStream中封装了一个管理了datanode与namenode IO的DFSInputStream。
客户端调用read()方法开始读取数据,存储了文件前几个块的地址的DFSInputStream,就会链接存储了第一个块的第一个(最近的)datanode,然后DFSInputStream就通过重复调用read()方法,数据就从datanode流向了客户端,当该datanode中最后一个快的读取完成了,DFSInputStream会关闭与datanode的连接,然后为下一块寻找最佳节点。这个过程对客户端是透明的,在客户端那边就像是读取了一个连续不断的流。
块是顺序读取的,通过DFSInputStream在datanode上打开新的连接去作为客户端读取流,同样它也会请求namenode来获取下一批所需要的块所在的datanode地址。当客户端完成了读取就在FSDataInpuStream上调用close()方法结束整个流程。
在读取过程中,如果FSDataInputStream在和一个datanode进行交流时出现了一个错误,它就去试一试下一个最接近的块,同时也会记住刚才发生错误的datanode,之后便不会再在这个datanode上进行没必要的尝试。 DFSInputStream 也会在 datanode 上传输出的数据上核查检查数(checknums).如果损坏的块被发现了,DFSInputStream 就试图从另一个拥有备份的 datanode 中去读取备份块中的数据。
在这个设计中一个重要的方面是客户端直接从datanode上检索数据,并通过namenode指导来得到每一个块的最佳datanode。这种设计HDFS拓展大量的并发客户端,因为数据传输只是与集群上的所有datanode展开,namenode仅仅只需要服务于获取块位置的请求。而块位置信息是存放在内存中,所以效率很高,如果不这样设计,随着客户端数据量的增加,数据服务就会很快成为一个瓶颈。
写数据流程
使用HDFS提供的客户端Client,向远程的Namenode发起RPC请求
Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会文件创建一个记录,否则会让客户端抛出异常,
当客户端开始写入文件的时候,客户端会将文件切分成多个packets,并在内部以数据队列“data queue"形式管理这些packets, 然后向Namenode申请blocks,获取用来存储replications的合适的datanode列表,列表的大小根据Namenode中的replication值而定。
开始以pipeline的形式将packet写入所有的replications中,客户端把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此pipeline的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线形式。
最后一个datanode成功存储之后会返回一个ack packet(确认队列),在pipeline里传递给客户端,在客户端的开发库内部维护者“ack queue”,成功收到datanode返回的ack packet后会从“ack queue”移除相应的 packet。
如果传输过程中, 有某个datanode出现了故障, 那么当前的pipeline会被关闭, 出现故障的datanode会从当前的pipeline中移除, 剩余的block会继续剩下的datanode中继续以pipeline的形式传输, 同时Namenode会分配一个新的datanode, 保持replications设定的数量。
客户端完成数据的写入后,会对数据流调用close(),关闭数据流。
只要写入了dfs.replication.min的复本数( 默认为1),写操作就会成功, 并且这个块可以在集群中异步复制, 直到达到其目标复本数(replication的默认值为3),因为namenode已经知道文件由哪些块组成, 所以它在返回成功前只需要等待数据块进行最小量的复制
从上面的图中,我们可以清楚的看出NameNode对应于用户的三个动作分别
以create、 addBlock、 complete来进行相关的处理
NameNode的create动作主要是为客户端传过来的文件名在HDFS的Namesystem中申请一个名字空间,并为之建立一个相应的iNode,当然,这个iNode的状态是underConstruction,然后为这个客户创建一个该文件的独占锁,以防止其它的客户端对这个文件同时写。
NameNode的addBlock动作主要是为文件创建一个新的Block,并为这个Block的副本分配存储DataNode节点,最后给客户端返回一个LocatedBlock对象,该对象包含Block的副本应该存放的位置。在这里,NameNode节点此时并不保存该Block的副本位置,而是等到成功接收该Block的数据节点自动报告时它才正式记录该Block的一个副本的位置,这样做是由于HDFS不能保证Block一开始分配的数据节点都能成功结束Block
NameNode的complete动作就是更改与当前文件节点相关的状态,同时释放文件的文件锁。另外,NameNode还要判断文件的所有Blocks的副本是否已满足,对于还不满足的Blocks, NameNode将其放入neededReplications队列中,让其它的后台线程来负责这些Block的副本情况。 block是datanode存放数据的基本单位。
HDFS中常用到的命令
1、hadoop fs
hadoop fs -ls /
hadoop fs -lsr
hadoop fs -mkdir /user/hadoop
hadoop fs -put a.txt /user/hadoop/
hadoop fs -get /user/hadoop/a.txt /
hadoop fs -cp src dst
hadoop fs -mv src dst
hadoop fs -cat /user/hadoop/a.txt
hadoop fs -rm /user/hadoop/a.txt
hadoop fs -rmr /user/hadoop/a.txt
hadoop fs -text /user/hadoop/a.txt
hadoop fs -copyFromLocal localsrc dst 与hadoop fs -put功能类似。
hadoop fs -moveFromLocal localsrc dst 将本地文件上传到hdfs,同时删除本地文件。
写流程
package com.zouxxyy.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
public class HDFSClient {
private FileSystem fs;
@Before
public void before() throws IOException, InterruptedException {
// 获取HDFS的抽象对象
fs = FileSystem.get(URI.create("hdfs://server-2:9000"), new Configuration(), "xxx");
}
@Test
public void put() throws IOException, InterruptedException {
Configuration configuration = new Configuration();
configuration.setInt("dfs.replication", 1);
fs = FileSystem.get(URI.create("hdfs://server-2:9000"), configuration, "xxx");
// 本地文件上传到HDFS
fs.copyFromLocalFile(new Path("data/input/wordCount/1.txt"), new Path("/"));
}
@Test
public void get() throws IOException{
// HDFS文件下载到本地
fs.copyToLocalFile(new Path("/1.txt"), new Path("./"));
}
@Test
public void rename() throws IOException{
// HDFS重命名
fs.rename(new Path("/1.txt"), new Path("/2.txt"));
}
@Test
public void delete() throws IOException{
// HDFS删除
boolean delete = fs.delete(new Path("/1.txt"), true);
if (delete) {
System.out.println("删除成功");
}
else{
System.out.println("删除失败");
}
}
@Test
public void append() throws IOException{
// HDFS 文件追加测试
FSDataOutputStream append = fs.append(new Path("/1.txt"), 1024);
FileInputStream open = new FileInputStream("data/input/wordCount/1.txt");
IOUtils.copyBytes(open, append, 1024, true);
}
@Test
public void ls() throws IOException{
// fileStatuses包含文件和文件夹
FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : fileStatuses) {
if(fileStatus.isFile()) {
System.out.println("文件:");
System.out.println(fileStatus.getPath());
System.out.println(fileStatus.getOwner());
}
else {
System.out.println("文件夹:");
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getPermission());
}
}
}
@Test
public void listFiles() throws IOException {
// 注意listFiles方法只能得到文件
RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/"), true);
while (files.hasNext()) {
LocatedFileStatus file = files.next();
System.out.println("===========================");
System.out.println(file.getPath());
System.out.println("块信息:");
BlockLocation[] blockLocations = file.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
String[] hosts = blockLocation.getHosts();
System.out.print("块在: ");
for(String host : hosts) {
System.out.println(host + " ");
}
}
}
}
@After
public void after() throws IOException {
fs.close();
}
}
重新format namenode后,datanode无法正常启动
原因是datanode的clusterID 和 namenode的clusterID 不匹配。
执行hdfs namenode -format后,current目录会删除并重新生成,其中VERSION文件中的clusterID也会随之变化,而datanode的VERSION文件中的clusterID保持不变,造成两个clusterID不一致。
所以为了避免这种情况,可以再执行的namenode格式化之后,删除datanode的current文件夹,或者修改datanode的VERSION文件中出clusterID与namenode的VERSION文件中的clusterID一样,然后重新启动datanode。