HDFS的设计
HDFS以流式数据访问模式来存储超大文件,运行于商用硬件集群上。
超大文件:指的是几百MB、几百GB甚至几百TB大小的文件。
流式数据访问:HDFS构建思路:一次写入、多次读取时做高效的访问模式
商用硬件:Hadoop是设计运行在通用商用硬件的集群上的,因此对于庞大的集群而言,节点发生故障的几率非常高。HDFS遇到故障时,被设计成能够继续运行且不让用户察觉到明显的中断。
低时延的数据访问:HDFS是为高数据吞吐量应用优化的,这可能会以提高时延为代价,对低时延的访问需求,HBase是更好的选择。
大量的小文件:namenode将文件系统的元数据存储在内存中,因此该文件系统所能存储的文件总数受限于namenode的内存容量。
多用户写入,任意修改文件:HDFS中的文件写入支持单个写入者,而且写操作总是以“只添加”方式在文件末尾写数据,不支持多个写入者的操作。
HDFS的组成
数据块
HDFS上的文件被划分为块大小的多个分块(chunk),作为独立的存储单元,默认大小为120MB,HDFS的块比磁盘块大的目的是为了最小化寻址开销,如果块足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。HDFS中小于一个块大小的文件不会占据整个块空间。对分布式文件系统的块进行抽象的好处如下:
一个文件的大小可以大于网络中任意一个磁盘的容量。文件的所有块不需要存储在同一个磁盘上,因此它们可以利用集群上的任意一个磁盘进行存储。
使用抽象块而非整个文件作为存储单元,大大简化了存储子系统的设计。因为块的大小时固定的,因此计算单个磁盘能存储多少个块就相对容易。
namenode和datanode
namenode为管理节点,用于管理文件系统的命名空间,维护着文件系统树及整棵树内所有的文件和目录。这些信息以命名空间镜像文件和编辑日志文件的形式永久保存在本地磁盘上。为实现namenode的高可用,可以备份那些组成文件系统元数据持久状态的文件或者运行一个辅助namenode。
datanode是文件系统的工作节点,根据需要存储并检索数据块,定期向namenode发送它们所存储的块的列表。
块缓存
对于访问频繁的文件,其对应的块会被显示地缓存在DataNode的内存中,以堆外缓存的形式存在。作业调度器通过缓存块的datanode上运行任务,可以实现高性能读操作。
联邦HDFS:
联邦HDFS允许系统通过添加namenode实现扩展,其中每个namenode负责管理文件系统命名空间中的一部分。
HDFS的高可用
Hadoop2对HDFS提供了高可用支持,通过配置一对活动-备用namenode,当活动namenode失效,备用namenode就好接管它的任务并开始服务于来自客户端的请求,不会有任何明显中断。架构需要作如下调整。
namenode之间需要通过高可用共享存储实现编辑日志的共享。
datanode需要同时向两个namenode发送数据块处理报告,因为数据块的映射信息存储在namenode的内存中而非硬盘。
客户端需要使用特定的机制来处理namenode的实效问题,且该机制是透明的。
实现高可用共享存储有:NFS过滤器或者群体日志管理器QJM
故障切换
系统中存在一个故障转移控制器,管理着将活动namenode转换为备用namenode的转换过程。每个namenode运行着一个轻量级的故障转移控制器,其工作是通过一个简单的心跳机制来监视宿主namenode是否失效,并在namenode失效时进行故障转移。
规避
确保先前活动的namenode不会执行危害系统并导致系统崩溃的操作,该方式即为规避。
HDFS的Java 接口实现
从Hadoop URL读取数据
通过URLStreamHandler实例以标准输出方式显示Hadoop文件系统的文件:
public class URLCat{
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args){
InputStream in = null;
try{
in = new URL("hdfs://host/path").openStream();
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
通过FileSystem API读取数据
Hadoop文件系统通过Hadoop Path对象(而非java.io.File对象)来代表文件,可以将路径视为一个Hadoop文件系统URI,如:hdfs://localhost/user/tom/quangle.txt
使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
public class FileSystemCat{
public static void main(String[] args){
String uri = "hdfs://localhost/user/tom/abc.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputSteam in = null;
try{
in = fs.open(new Path(uri));
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
FSDataInputStream对象:
FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io对象。该对象支持随机访问,可以从流的任意位置读取数据。其源码如下:
package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream
implements Seekable,PositionedReadable{
//...
}
其中Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相当于文件起始位置偏移量的查询方法
public interface Seekable{
void seek(long pos) throws IOException;
long getPos() throws IOExceptiom;
}
调用seek()来定位大于文件长度的位置会引发IOException异常
FSDataInputStream类也实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分:
public interface PositionedReadable{
//read()方法从文件的指定position处读取至少length字节的数据
//并存入buffer缓冲区的指定偏移量offset处
//返回值是实际读到的字节数
public int read(long position,byte[] buffer,int offset,int length) throws IOException;
//readFully()方法将指定length长度的字节数据读取到buffer中
public void readFully(long position,byte[] buffer,int offset,int length)throws IOException;
public void readFully(long position,byte[] buffer) throws IOException;
}
以上方法会保留文件当前偏移量,并且是线程安全的。但seek()方法是一个相对高开销的操作,需要慎用。
写入数据
将本地文件复制到hdfs:重要方法为create()方法
public class FileCopyWithProgress{
public static void main(String[] args)throws Exception{
String localSrc = "c://tmp/abc.txt";
String dst = "hdfs://localhost/usr/tmp/abc.txt";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst),conf);
OutputStream out = fs.create(new Path(dst),new Progressable(){
public void progress(){
System.out.print(".");
}
});
IOUtils.copyBytes(in,out,4096,true);
}
}
其中,FileSystem实例的create()方法返回FSDataOutputStream对象,该对象与FSDataInputStream类似,实现如下:
package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable{
public long getPos()throws IOException{
//...
}
....
}
FSDataOutputStream类不允许在文件中定位,这是因为hdfs只允许对一个已打开的文件顺序写入,或在现有文件的末尾追加数据(append方法)
创建目录
FileSystem实例提供了创建目录的方法:
public boolean mkdir(Path f)throws IOException;
通常不需要显示创建目录,调用create()方法写入文件时会自动创建父目录。
查询文件系统
1、文件元数据:FileStatus类封装了文件系统中文件的目录和元数据,包括长度、块大小、复本、修改时间、所有者以及权限信息,FileSystem的getFileStatus()方法用于获取文件或者目录的FileStatus对象。检查文件或者目录是否存在调用exists()方法
public boolean exists(Path f)throws IOException
2、列出文件:FileSystem类提供的listStatus()方法来实现列出目录中的内容功能
public FileStatus[] listStatus(Paht f)throws IOException
public FileStatus[] listStatus(Paht f,PathFilter filter)throws IOException
public FileStatus[] listStatus(Paht[] files)throws IOException
public FileStatus[] listStatus(Paht[] files,PathFilter filter)throws IOException
3、文件模式:Hadoop为执行通配提供了两个FileSystem方法:
public FileStatus[] globStatus(Path pathPattern)throws IOException
public FileStatus[] globStatus(Path pathPattern,PahtFilter filter)throws IOException
4、PathFilter对象:PathFilter与java.io.FileFilter一样,是Path对象而不是file对象
PathFilter用于排除匹配正则表达式的路径
public class RegexExcludePathFilter implements PathFilter{
private final String regex;
public RegexExcludePathFilter(String regex){
this.regex = regex;
}
public boolean accept(Path path){
return !path.toString.matches(regex);
}
}
自定义过滤器,结合3中的globStatus()方法,可以实现大多数的文件名匹配。
删除数据
使用FileSystem的delete()方法可以永久性删除文件或目录
public boolean delete(Path f,boolean recursive) throws IOException
如果f是一个文件或者空目录,那么recursive的值则会别忽略,只有在recrusive的值为true时,非空目录及其内容才会被删除,否则抛出IOException异常。
数据流
文件读取剖析
1.客户端通过调用FileSystem对象的open()方法打开希望读取的文件,对于HDFS而言,该对像是一个DistributedFileSystem的一个实例。
2.DistributedFileSystem通过使用RPC(远程过程调用)来调用namenode,以确定文件起始块的位置。对于每一个块,namenode返回存有该块副本的datanode地址。DistributedFileSystem类返回一个FSDataInputStream对象给客户端以便读取数据,FSDataInputStream类封装DFSInputStream对象,该对象管理着datanode和namenode的IO。
3.客户端对FSDataInputStream输入流调用read()方法。存储着文件起始几个块的datanode地址的DFSInputStream随机连接距离最近的文件中第一个块所在的datanode。
4.对数据流反复调用read()方法,将数据从datanode传输到客户端。
5.到达块的末端时,DFSInputStream关闭与该datanode的连接,然后寻找下一个块的最佳datanode。所有这些对于客户端都是透明的,在客户看来它一直在读取一个连续的流。
6.客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的,也会根据需要询问namenode来检索下一批数据块的datanode的位置,一旦完成读取,就对FSDataInputStream调用close()方法。
故障处理:在读取数据的时候,如果DFSInputStream与datanode通信时遇到错误,会尝试从这个块的另外一个最邻近datanode读取数据,它也记住那个故障datanode,以保证以后不会反复读取该节点上后续的块。DFSInputStream也会通过校验和确认从datanode发来的数据是否完整,如果发现有损坏块,DFSInputStream会试图从其他datanode读取其复本,也会将被损坏的块通知namenode。
设计重点:客户端可以直接连接到datanode检索数据,且namenode告知客户端每个块所在的最佳datanode。由于数据流分散在集群中的所有datanode,所以这种设计能使HDFS扩展到大量的并发客户端。同时namenode只需要响应块位置的请求(这些信息存在namenode的内存上,性能高),无需响应数据请求。
文件写入剖析
1.客户端通过对DistributedFileSystem对象调用create()来新建文件。
2.DistributedFileSystem对namenode创建一个RPC调用,在文件系统的命名空间创建一个文件,此时该文件还没有相应的数据块。namenode执行各种不同的检查以确保这个文件不存在以及客户端有新建该文件的权限。检查通过,namenode则为创建新文件立即一条记录;否则创建失败并向客户端抛出一个IOException异常。
DistributedFileSystem向客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。与读取事件一样,FSDataOutputStream封装一个DFSoutputStream对象,该对象负责处理datanode和namenode之间的通信。
3.客户端写入数据,DFSOutputStream将它分成一个个数据包(大化小),并写入内部队列,成为数据队列(data queue)DataStreamer处理数据队列,它的责任是挑选出适合存储数据复本的一组datanode,并根据此来要求datanode分配新的数据块,这一组datanode构成一个管线。
4.管线内部节点将数据包流式传输,假设复本数为3,则管线中有3个节点,DataStreamer将数据包流式传输到管线中的第一个datanode,该datanode存储数据包,并将它发送到管线中的第二个datanode,同样,第二个存储该数据包且发送到管线中的第三个datanode(最后一个)。
5.DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为确认队列ack queue。收到管道中所有datanode确认信息后,该数据包才会从确认队列中删除。
6.客户端完成数据写入后,对数据流调用close()方法。该操作将剩余的所有数据包写入datanode管线,并在联系到namenode告知其文件写入完成之前,等待确认(7)
故障处理:如果任何datanode在数据写入期间发生故障,则执行以下操作:
首先关闭管线,确认把队列中的所有数据包都添加回数据队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据包
为存储在另一正常datanode的当前数据块指定一个新的标识,并将该标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块。
从管线中删除故障datanode,基于两个正常datanode构建一条新管线。
余下的数据块写入管线中正常的datanode
namenode注意到块复本量不足时,会在另一个节点创建一个新的复本。
复本布局策略:Hadoop的默认布局策略是运行在客户端的节点上方第1个复本(如果客户端运行在集群外,则随机选一个节点,不过系统会避免挑选存储过满或太忙的节点);第2个复本放在与第1个复本不同且随机另外挑选的机架中的节点(离架),第3个复本与第2个复本放在同一个机架上,且随机选择另一个节点。一旦选定复本的放置位置,就根据网络拓扑创建一个管线。
一致模型
文件系统的一致模型(coherency model)描述了文件读写的数据可见性。HDFS为性能牺牲了一些POSIX要求。新建一个文件后,它能在文件系统的命名空间中立即可见,但写入文件的内容却不保证能立即可见,为此,Hadoop提供一种强行将所有缓存刷新到datanode中的方法,即对FSDataOutputStream调用hflush()方法。当hflush()方法返回成功后,对所有新的reader而已,HDFS能保证文件中到目前为止写入的数据均能到达所有datanode的写入管道,并且对新的reader均可见。
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("utf-8"));
out.hflush();
...
注意:hflush()不保证datanode已经将数据写到磁盘上,进确保数据在datanode的内存中,为确保数据写入磁盘,可以用hsync()替代。
在HDFS中关闭文件out.close()时,隐含执行hflush()方法。
并行复制
Hadoop自带一个distcp程序,可以并行从Hadoop文件系统中复制出大量数据,也可以将大量数据复制到Hadoop中,distcp的一种用法是替代hadoop fs -cp
% hadoop distcp file1 file2
% hadoop distcp dir1 dir2</pre>
参考资料:
- 《Hadoop权威指南》