引子
为什么需要HDFS?
因为一个物理计算机的存储已经hold不住我们庞大的数据集。
HDFS的特点是什么?
HDFS以流式数据访问模式来存储超大文件,运行于商用硬件集群上
1.超大文件:数量级MB、GB、TB等
2.流式数据访问模式:以块为单位进行读写。一次写入、多次读取。
3.高数据吞吐量,时间延迟不低
4.不能存储大量小文件:namenode的内存中存储HDFS中文件元信息,每个元信息大约占150B,因此HDFS能存储的文件总数受限于namenode的内存大小。
5.不支持多用户写入:HDFS中的文件只有一个writer
6.不能任意修改文件:写操作是追加模式
基础概念
数据块
作为独立的存储单元,读写最小单位。默认64MB,可在hdfs-site.xml中自定义。
块要比磁盘块(512B)大得多,是因为最小化寻址开销。磁盘传输数据耗时>定位这个块开始位置的耗时。然而块不能设置过大,是因为MR任务中,map任务通常一次处理一个块,如果块数量少,则并行map任务就少,job运行速度较慢。
再说说......
· 文件所有的块分布式存储在各个datanode上,
· 小于一个块默认大小的文件,不会占据整个块的空间。
namenode和datanode
namenode管理文件系统的命名空间和每个文件中各个块所在的数据节点信息。命名空间是HDFS的文件系统树以及树内所有目录和文件,以fsimage和editlog文件永久保存在本地磁盘上。块的存储信息在内存中,系统启动时由datanode上报。
datanode是HDFS的工作节点,负责存储并检索数据块,定期向namenode发送它们所存储的块的列表。
关于配置:
dfs.replication默认3,一个数据块存3份,HDFS会自动备份到3个不同的datanode上。
HDFS读写流程
读文件
【一句话版本】namenode告知客户端数据的块位置,让客户端联系datanode流式检索数据。
好处: namenode内存存储块索引信息,相应快;block分散在集群所有节点上,以便HDFS可扩展大量并发客户端。
瓶颈:随客户端数量增长,namenode的I\O成为瓶颈。
1. 【概括版】客户端调用DistributedFileSystem对象的open()方法,RPC调用namenode的GetBlockLocations()方法,namenode返回存有该文件所有block信息,包括其副本所在的所有datanode地址。
【细节版】客户端调用DistributedFileSystem.open(Path f, int bufferSize),open()函数中new了一个DFSInputStream对象;在DFSInputStream的构造函数中,openInfo()函数被调用,其主要从namenode中得到要打开的文件所对应的blocks的信息,通过callGetBlockLocations()实现,核心代码如下:
// openInfo():
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
//callGetBlockLocations()中将发起一个RPC调用,返回 LocatedBlocks 对象
namenode.getBlockLocations(src, start, length);
LocatedBlocks 是一个链表,List<LocatedBlock> blocks,其中每个元素包含以下信息:
Block b:此block的信息
long offset:此block在文件中的偏移量
DatanodeInfo[] locs:此block位于哪些DataNode上
2. namenode接收到请求后,将进行一系列操作。RPC调用NameNode.getBlockLocations(),里面再调用namesystem.getBlockLocations(getClientMachine(), src, offset, length);
namesystem保存着namenode上的命名空间树,具体是一个INode链表,INode有两种子类:INodeFile和INodeDirectory。其中,INodeFile有成员变量BlockInfo blocks[],是此文件包含的block的信息。
getBlockLocations()具体步骤:1) 得到此文件的block信息; 2) 从offset开始,长度为length所涉及的blocks; 3) 找到各个block对应的、健康的datanode机器。返回LocatedBlocks对象。
3~5. 回到客户端,在DFSInputStream的构造函数通过RPC收到一串block信息(即LocatedBlocks对象)之后,DFSInputStream读取文件起始块的datanode地址,随即与距离最近的datanode建立Socket连接和读入流,客户端反复调用FSDataInputStream的read()读取block信息。
e.g.对于64M一个block的文件系统来说,欲读取从100M(offset)开始,长度为128M(length)的数据,则block列表包括第2,3,4块block。第2号block从36MB开始读取28MB,第3号block从0MB开始读取64MB....
到达block末端,DFSInputStream关闭与该datanode的连接,寻找下一个block的最佳datanode。
6.到达文件末端时,客户端对FSDataInputStream调用close()方法。
再说说...
遇到读失败,1) DFSInputStream和datanode的连接发生错误时,从距离次近的datanode读取,并将该节点记入“故障节点列表”,以防反复从该节点读。2)读取到一个损坏的block,先通知namenode,再从其他datanode读取该块的另一个副本。
写文件
【一句话版本】客户端向namenode申请创建文件,namenode分配datanode,客户端通过pipeline形式写数据,全部确认正常写入后通知namenode。
1.客户端通过调用DistributedFileSystem对象的create()方法,该方法生成一个FSDataOutputStream用于向新生成的文件中写入数据,其成员变量dfs的类型为DFSClient,DFSClient的create()函数中返回一个DFSOutputStream对象。在DFSOutputStream的构造函数中,做了两件重要的事情:一是通过RPC调用NameNode的create()来创建一个文件;二是streamer.start(),即启动了一个pipeline,用于写数据。
//以下为客户端调用的create public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { return new FSDataOutputStream(dfs.create(getPathName(f), permission, overwrite, replication, blockSize, progress, bufferSize), statistics); }
2. namenode 先在命名空间(在磁盘)中检查文件是否存在以及客户端是否有权限,再新建一个新文件的元信息到fsimage 中,就是命名空间,此时没有任何块与之对应。
3~5. 客户端调用DFSOutputStream对象的write()方法写数据。按照HDFS的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1, 2, 3,则会进行如下的过程:
1) 创建写入流,RPC调用namenode为文件分配block, 并设置block对应的DataNode。
2) 将block分成若干个chunk(512B),每N个chunk + 校验值形成一个package。package进入dataQueue。
3) 客户端将DataNode 2、3信息和 package 1写入DataNode 1,package 1从dataQueue移至ackQueue,等待确认。
4) 由DataNode 1负责将DataNode3信息和package1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1。package 2从dataQueue移至ackQueue,等待确认。
5) DataNode 2负责将package 1写入DataNode 3, 同时客户端可以将package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2。package 3从dataQueue移至ackQueue,等待确认。
就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕并且都接收到ACK确认包。
6~7.写完所有块之后,断开与DataNode 1的连接,客户端通知namenode,完成。
再说说....
遇到写失败,DataNode1故障时,1)关闭pipeline,2)把ackQueue中的所有数据包添加到dataQueue的头部, 3)为DataNode2中当前block指定一个新标识,通知namenode,以便DataNode1恢复后删除本block的残缺数据,4)将故障DataNode1从pipeline中删除,然后继续将剩余数据包写入正常节点。异步完成本block的副本复制。
关于“文件一致性”:在文件创建后,写完前,正在写入的block是读取不到的(e.g.读文件内容、获取文件大小)。除非调用HDFS的sync()方法强制所有缓存与数据节点同步。
参考文章:
《Hadoop- The Definitive Guide, 4th Edition》