本文包含如下内容:
① Pipeline的建立过程,以及下游节点如何给上游节点发Ack
② DFSOutputStream、DataStreamer的原理
③ Sender、BlockReceiver、PacketResponder的原理
作为引子,先从最上游谈起:
我们使用HDFS API创建文件,写文件时,首先会调用FileSystem的create方法,获得一个FSDataOutputStream流,然后通过用这个流来write数据即可。
别看API这么简单,这后面发生的事情可是十分复杂!比如这后面涉及到Client通过RPC调用在NameNode侧的文件系统目录中创建文件、addBlock,NameNode给客户端返回文件的LocatedBlock、客户端根据LocatedBlock创建输出流、Datanode建立Pipeline、Client发送Packet到Pipeline,Sender发送数据到DataXeicver等等。
接下来让我们一起来深入探索吧!
一、写数据Pipeline总览
我们跳过客户端调用RPC添加数据块,NameNode侧为数据块选择最优存放的DataNode列表这一步骤的讲解,直接来到已经选好DataNode列表后,客户端向列表中的DataNode写数据的步骤。
这个步骤需要客户端与DataNode列表之间建立一个Pipeline。如下图所示:
数据是以DFSPacket对象的形式封装的,一个Block可能由很多个Packet组成,Packet的具体格式参照HDFS源码中的DFSPacket.java的注释。
Client发送Packet给Pipeline中的第一个DataNode A,A收到数据后转发给B,B再转发给C。 当Pipeline中的最下游节点收到数据包后,会按照数据包传送方向的反方向发送对数据包的Ack信息。这个Ack信息是一个数据包的序列号,在Client侧是单调递增的。上图是从单个数据包的视角出发。我们来从多个数据包的视角出发来看,下图引自雅虎实验室的HDFS论文:
OK,从整体上看,整个写pipeline的过程很容易理解,但这里面的代码量非常多,涉及到Client侧、DataNode侧的很多线程类。后文中,我们将逐一击破。
二、DFSOutputStream的获取
首先来看看DFSClient中如何获取写数据的流:DFSOutputStream
/**
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
* Progressable, int, ChecksumOpt, InetSocketAddress[])} with the addition of
* ecPolicyName that is used to specify a specific erasure coding policy
* instead of inheriting any policy from this new file's parent directory.
* This policy will be persisted in HDFS. A value of null means inheriting
* parent groups' whatever policy.
*/
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException {
checkOpen();
final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName);
beginFileLease(result.getFileId(), result);
return result;
}
通过DFSOutputStream#newStreamForCreate方法获取到一个DFSOutputStream流对象。参数比较多,关注一下this代表DFSClient对象,还有favoredNodes列表代表优先选择的DataNode。
在newStreamForCreate方法中主要做两个工作:
①调用Namenode代理对象上的create方法,在NameNode文件系统中创建一个文件,并获得此文件的元数据信息HdfsFileStatus对象。
②将①中的HdfsFileStatus对象传入DFSOutputStream构造方法中,构造出一个DFSOutputStream流对象,同时启动DFSOutputStream对象中的DataStreamer线程。
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress,
DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
throws IOException {
try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate", src)) {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
break;
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
AccessControlException.class,
DSQuotaExceededException.class,
QuotaByStorageTypeExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
RetryStartFileException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else {
throw e;
}
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
// 构造一个DFSOutputStream对象
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
// 启动DataStreamer线程
out.start();
return out;
}
}
那我们就一路跟踪DFSOutputStream的构造方法吧:
也是主要做了两件事:
①设置一下DFSOutputStream的成员变量的值,比如对应的fileId、src、对应的DFSClient对象、数据packet大小等等
②创建DataStreamer线程(外层去start这个线程)
/** Construct a new output stream for creating a file. */
protected DFSOutputStream(DFSClient dfsClient, String src,
HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
this(dfsClient, src, flag, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
if (createStreamer) {
streamer = new DataStreamer(stat, null, dfsClient, src, progress,
checksum, cachingStrategy, byteArrayManager, favoredNodes,
addBlockFlags);
}
}
OK,到这里似乎断线了?不知道接下来改怎么看了?DataStreamer线程的run方法啊。为了让行文逻辑更顺畅,我进行了段落的重排,下面的输出流write数据这块代码是我DataStreamer源码后回来写的。
DataStreamer线程从dataQueue中提取数据包,并将数据包发送到将管道中的第一个datanode,然后把这个数据包从dataQueue移动到ackQueue。那我们不就得看看数据包是怎么添加到dataQueue中的。全局代码搜索dataQueue.add,发现dataQueue.addLast是我们想要的方法。通过查看调用关系:
最后你会发现:是在调用FSOutputSummer#write方法时,才会将数据封装成DFSPacket对象入到dataQueue里等待DataStreamer去发送给DataNode的。
而DFSOutputStream类恰恰就是FSOutputSummer的子类,且没有重写write方法。也就是说我们最后的write方法是走FSOutputSummer的write方法,也就会把数据封装成DFSPacket对象入队到dataQueue中了。
notes
①我们在使用HDFS API的时候,虽然流对象是FSDataOutputStream类型的,但是本质上底层的流还是DFSOutputStream的,FSDataOutputStream这个流是个包装流,是DFSClient#createWrappedOutputStream方法中把DFSOutputStream类包装成了FSDataOutputStream类。
②构造流的时候,我们根据配置参数获得到max packet的大小,然后再write数据的时候,由于流是FSOutputSummer的子类,会记录当前已经写了的字节数,如果达到配置设置的最大上限,那么就构造DFSPacket,入队到dataQueue中,等待DataStreamer线程进行处理。
三、DataStream类
DataStreamer是Client写数据的核心类。它本质上是个Thread类。它的主要功能在JavaDoc中描述的很详细,这里摘录一下:
/*********************************************************************
*
* The DataStreamer class is responsible for sending data packets to the
* datanodes in the pipeline. It retrieves a new blockid and block locations
* from the namenode, and starts streaming packets to the pipeline of
* Datanodes. Every packet has a sequence number associated with
* it. When all the packets for a block are sent out and acks for each
* if them are received, the DataStreamer closes the current block.
*
* The DataStreamer thread picks up packets from the dataQueue, sends it to
* the first datanode in the pipeline and moves it from the dataQueue to the
* ackQueue. The ResponseProcessor receives acks from the datanodes. When an
* successful ack for a packet is received from all datanodes, the
* ResponseProcessor removes the corresponding packet from the ackQueue.
*
* In case of error, all outstanding packets are moved from ackQueue. A new
* pipeline is setup by eliminating the bad datanode from the original
* pipeline. The DataStreamer now starts sending packets from the dataQueue.
*
*********************************************************************/
翻译一下,什么tmd叫tmd惊喜:
DataStreamer类负责给在pipeline中的datanode发送数据包。它从namenode检索一个新的blockid和block位置,并开始把数据包以流的形式发送到datanodes组成的pipeline中。每个数据包都有一个与之相关联的序列号。当一个块的所有包都被发送出去并且每个包的ack信息也都被收到,则DataStreamer会关闭当前块。
DataStreamer线程从dataQueue中提取数据包,并将数据包发送到将管道中的第一个datanode,然后把这个数据包从dataQueue移动到ackQueue。ResponseProcessor线程接收来自datanode的响应。当一个数据包的成功的ack信息从所有datanode发送过来时,ResponseProcessor会从ackQueue中移除相应的数据包。
在出现错误的情况下,所有未完成的数据包将从ackQueue中被移除。接着在原来的出错的pipeline中消除掉bad datanode的基础上构建一个新的pipeline。DataStreamer再继续开始发送dataQueue中的数据包。
OK,看完JavaDoc知道了DataStreamer的作用之后,来看下流程图和一些关键源码。
从整体上看,DataStream#run方法的执行逻辑如下图:
它是一个线程类,那我们就看它的run方法就好了。run方法很长,大概有200行代码左右,读者可以不用全看,只需关注我在代码中注释中抛出的问题即可,后面我会针对每个问题进行解答:
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
*/
@Override
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (errorState.hasError()) {
closeResponder();
}
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError();
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
// ①halfSocketTimeout和timeout时间是怎么回事?
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) || doSleep) {
long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
LOG.warn("Caught exception", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
} else {
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.warn("Caught exception", e);
}
one = dataQueue.getFirst(); // regular data packet
SpanId[] parents = one.getTraceParents();
if (parents.length > 0) {
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0]);
scope.getSpan().setParents(parents);
}
}
}
// get new block from namenode.
if (LOG.isDebugEnabled()) {
LOG.debug("stage=" + stage + ", " + this);
}
// ②如何建立起Pipeline的,Socket怎么创建的?
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
setPipeline(nextBlockOutputStream());
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" < lastByteOffsetInBlock, " + this + ", " + one);
}
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Caught exception", e);
}
}
}
if (shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
SpanId spanId = SpanId.INVALID;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
spanId = scope.getSpanId();
scope.detach();
one.setTraceScope(scope);
}
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll();
}
}
LOG.debug("{} sending {}", this, one);
// write out data to remote datanode
// 此处是写,调了flush刷流
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanId)) {
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (shouldStop()) {
continue;
}
// ③如果是block的最后一个packet,则等待ackQueue中的Packet都被移除(代表接收到了DataNode的响应)
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (!errorState.isRestartingNode()) {
// Since their messages are descriptive enough, do not always
// log a verbose stack-trace WARN for quota exceptions.
if (e instanceof QuotaExceededException) {
LOG.debug("DataStreamer Quota Exception", e);
} else {
LOG.warn("DataStreamer Exception", e);
}
}
lastException.set(e);
assert !(e instanceof NullPointerException);
errorState.setInternalError();
if (!errorState.isNodeMarked()) {
// Not a datanode issue
streamerClosed = true;
}
} finally {
if (scope != null) {
scope.close();
scope = null;
}
}
}
closeInternal();
}
OK,上面我们抛出了三个问题:
① wait的参数timeout时间是怎么确定的?
为什么需要wait呢?看while的条件:
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) || doSleep)
dataQueue的size是0,证明没有数据需要发送,因此不需要执行后面的发送逻辑,所以线程可以wait进入等待状态。至于为什么要用halfSocketTimeout这个值,我觉得单纯是HDFS这块开发者做的一个trade-off,你也可以减小这个值,这样无非就是多发送几个heartbeat Packet而已。(而且,最新的HDFS社区代码这里已经移除了wait halfSocketTimeout的逻辑)。
做个简单的数学计算
long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
那如果timeout>0实际上结束wait的时间是:
now + timeout = now + halfSocketTimeout - now + lastPacket = lastPacket + halfSocketTimeout;
也即保证在上一个Packet包发送后,在wait至少halfSocketTimeout时长。
②如何构建的DN网络连接,Pipeline
这块是重中之重,以create文件为例:
这里有个细节,需要了解一下。就是假设数据块存放的DataNode列表是:【A、B、C】三台。 Client首先和A节点建立Socket连接,然后A节点和B节点再建立Socket连接,B节点再和C节点建立Socket连接。这里实现的方法如下:
在构造参数的序列化对象时,涉及到生成发送到下一步的目标节点列表时,会调用PBHelperClient.convert方法,并传入原始列表和startIndex=1。看这个convert方法的实现,startIndex = 1的意思就是每次都从列表的第二个元素开始构造新的target列表。举个例子说明:开始是target DataNode列表是:【A、B、C】,客户端和A建立Socket连接时,发送的target DataNode列表就是:【B、C】了,这样A就可以和B建立连接,以此类推 B -> C。
OK,到这里我们就知道Pipeline的整个构造过程了。
③ pipeline下游的DataNode怎么给上游发送ack的,以及ackQueue这个数据结构相关的操作。 这块放到第三章讲。
三、BlockReceiver & PacketResponder
上面提到过,DataStream线程从dataQueue队列中取出待发送的DFSPacket对象时,会把packet加入到ackQueue中,表示此Packet需要等待pipeline中的DataNode都返回ACK信息。那DataNode是如果给上游的DataNode以及Client返回Ack信息的呢?下面我们就来看看。
在DataStreamer中不是调用了new Sender(xxx).writeBlock方法么?这个东西会被DataNode侧的DataXceiver类中的writeBlock方法处理。writeBlock方法中又会委托给BlockReceiver#receiveBlock方法。receiveBlock方法中启动了PacketResponder线程用来对接收到的packet进行响应。
观察一下receiveBlock方法的参数:
这三个流分别代表给下游datanode发送数据的输出流、接收下游datanode数据的输入流、以及回复上游数据的输出流。看一下PacketResponder的构造函数,将其中的两个流传入作为参数了:
type表示此responder是pipeline中的最后一个节点还是中间节点。
回到receiveBlock里,会有这样一行代码:
while (receivePacket() >= 0) { /* Receive until the last packet */ }
这是一个空循环,只要还有数据过来,那我就一直调用receivePacket方法。这个receivePacket方法返回的是接收的数据字节数。里面会从输入流中读取Packet的各种信息,然后入队,等待packet线程后台去处理,如下图:
那接下来就是看PacketResponeder怎么给上游的DataNode或者Client返回响应的吧。其实是在PacketResponeder线程的run方法中最终调用了sendAckUpstreamUnprotected方法给上游发送Ack,如下图所示:
OK,到这里我们也就知道了DataNode是如何回复Ack信息的了。