1、HDFS文件读写流程
1.1、剖析文件读取过程
1.2、剖析文件写入过程
1、HDFS文件读写流程:
作为一个文件系统,文件的读和写是最基本的需求,这一部分我们来了解客户端是如何与HDFS进行交互的,也就是客户端与HDFS,以及构成HDFS的两类节点(namenode和datanode)之间的数据流是怎样的。
1.1、剖析文件读取过程:
客户端从HDFS读取文件,其内部的读取过程实际是比较复杂的,可以用下图来表示读取文件的基本流程。
对于客户端来说,首先是调用FileSystem对象的open()方法来打开希望读取的文件,然后HDFS会返回一个文件输入流FSDataInputStream ,客户端对这个输入流调用read()方法,读取数据,一旦完成读取,就对这个输入流调用close()方法关闭,这三个过程对应图中的步骤1、3、6。
以上三个步骤是从客户端的角度来分析的,实际上,要实现文件读取,HDFS内部还需要比较复杂的机制来支持,而这些过程都是对客户端透明的,所以客户端感受不到,在客户看来就像是在读取一个连续的流。
具体的,从HDFS的角度来说:
- 客户端调用的FileSystem对象的open()方法,这个FileSystem对象实际上是分布式文件系统DistributedFileSystem的一个实例,DistributedFileSystem通过远程过程调用(RPC)来调用namenode,以获得文件起始块的位置(步骤2,namenode返回存有该数据块副本的datanode的地址)。当然,由于HDFS保存了一个数据块的多个副本(默认是3),所以满足请求的datanode地址不止一个,此时会根据它们与客户端的距离来排序,优先选择距离近的datanode,如果该客户端本身就是一个datanode,该客户端就可以从本地读取数据(比如:mapReduce就利用了这里的数据本地化优势)。
- open方法完成后,DistributedFileSystem类返回一个FSDataInputStream(支持文件定位的输入流)对象给客户端以便于读取数据。这个类转而封装为DFSInputStream对象,该对象管理着datanode和namenode的I/O。
- 这个DFSInputStream存储着文件起始几个块的datanode地址,因此,客户端对这个输入流调用read()方法就可以知道到哪个datanode(网络拓扑中距离最近的)去读取数据,这样,反复调用read方法就可以将数据从datanode传输到客户端(步骤4)。到达一个块的末端时,会关闭和这个datanode的连接,寻找下一个块的最佳datanode,重复这个过程。
当然,上面我们说DistributedFileSystem只存储着文件起始的几个块,在读取过程中,也会根据需要再次询问namenode来获取下一批数据块的datanode地址。一旦客户端读取完成,就调用close方法关闭数据流。
如果在读取过程中,datanode遇到故障,很明显,输入流只需要从另外一个保存了该数据块副本的最近datanode读取即可,同时记住那个故障datanode,以后避免从那里读取数据。
总结:
以上就是HDFS的文件读取过程,从这个过程的分析中我们可以看出:其优势在于客户端可以直接连接到datanode进行数据的读取,这样由于数据分散在不同的datanode上,就可以同时为大量并发的客户端提供服务。而namenode作为管理节点,只需要响应数据块位置的请求,告知客户端每个数据块所在的最佳datanode即可(datanode的位置信息存储在内存中,非常高效的可以获取)。这样使得namenode无需进行具体的数据传输任务,否则namenode在客户端数量多的情况下会成为瓶颈。
1.2、剖析文件写入过程:
接下来我们分析文件写入的过程,重点考虑的情况是如何新建一个文件、如何将数据写入文件并最后关闭该文件。
然而,具体的,从HDFS的角度来看,这个写数据的过程就相当复杂了:
- 首先客户端通过DistributedFileSystem上的create()方法指明一个预创建的文件的文件名;
- DistributedFileSystem会对namenode创建一个RPC调用,在文件系统的命名空间中新建一个文件,此时还没有相应的数据块(步骤2)。namedata接收到这个RPC调用后,会进行一系列的检查,确保这个文件不存在,并且这个客户端有新建文件的权限,如果检查通过,namenode会为该文件创建一个新的记录,否则的话文件创建失败,客户端得到一个IOException异常,最后DistributedFileSystem返回一个FSDataOutputStream以供客户端写入数据,与FSDataInputStream类似,FSDataOutputStream封装了一个DFSOutputStream用于处理namenode与datanode之间的通信;
- 当客户端开始写数据时(DFSOutputStream把写入的块数据时会将其拆分成64k的数据包(packet), 放入一个中间队列——数据队列(data queue)中去。DataStreamer从数据队列中取数据,同时向namenode申请一个新的block来存放它已经取得的数据。namenode选择一系列合适的datanode(个数由文件的replica数决定)构成一个管道线(pipeline),这里我们假设replica为3,所以管道线中就有三个datanode;
- DataSteamer把数据流式的写入到管道线中的第一个datanode中,第一个datanode再把接收到的数据转到第二个datanode中,以此类推;
- DFSOutputStream同时也维护着另一个中间队列——确认队列(ack queue),确认队列中的包只有在得到管道线中所有的datanode的确认以后才会被移出确认队列。上面的DataStreamer 线程会从 dataQueue 队列中取出 Packet 对象,通过底层 IO 流发送到 pipeline 中的第一个 DataNode,然后继续将所有的包转到第二个 DataNode 中,以此类推。发送完毕后,这个 Packet 会被移出 dataQueue,放入 DFSOutputStream 维护的确认队列 ackQueue 中,该队列等待下游 DataNode 的写入确认。当一个包已经被 pipeline 中所有的 DataNode 确认了写入磁盘成功,这个数据包才会从确认队列中移除;
如果某个datanode在写数据的时候挂掉了,下面这些对用户透明的步骤会被执行:
- 管道线关闭,所有确认队列上的数据会被挪到数据队列的首部重新发送,这样可以确保管道线中挂掉的datanode下游的datanode不会因为挂掉的datanode而丢失数据包;
- 在还在正常运行的datanode上的当前block上做一个标志,这样当挂掉的datanode重新启动以后namenode就会知道该datanode上哪个block是刚才宕机时残留下的局部损坏block,从而可以把它删掉;
- 已经挂掉的datanode从管道线中被移除,未写完的block的其他数据继续被写入到其他两个还在正常运行的datanode中去,namenode知道这个block还处在under-replicated状态(也即备份数不足的状态)下,然后他会安排一个新的replica从而达到要求的备份数,后续的block写入方法同前面正常时候一样。有可能管道线中的多个datanode挂掉(虽然不太经常发生),但只要dfs.replication.min(默认为1)个replica被创建,我们就认为该创建成功了。剩余的replica会在以后异步创建以达到指定的replica数;
当客户端完成写数据后,它会调用close()方法。这个操作会冲洗(flush)所有剩下的package到pipeline中。
等待这些package确认成功,然后通知namenode写入文件成功。这时候namenode就知道该文件由哪些block组成(因为DataStreamer向namenode请求分配新block,namenode当然会知道它分配过哪些blcok给给定文件),它会等待最少的replica数被创建,然后成功返回。
注意:hdfs在写入的过程中,有一点与hdfs读取的时候非常相似,就是"DataStreamer在写入数据的时候,每写完一个datanode的数据块,都会重新向nameNode申请合适的datanode列表。这是为了保证系统中datanode数据存储的均衡性"。
hdfs写入过程中,datanode管线的确认应答包并不是每写完一个datanode,就返回一个确认应答,而是一直写入,直到最后一个datanode写入完毕后,统一返回应答包。如果中间的一个datanode出现故障,那么返回的应答就是前面完好的datanode确认应答,和故障datanode的故障异常。这样我们也就可以理解,在写入数据的过程中,为什么数据包的校验是在最后一个datanode完成。