一、流程图
- client和namenode之前的rpc协议都是ClientProtocol。
- locateFollowingBlock主要是通过rpc向namenode分配新的block,并且返回哪些datanode可以写。
- client通过createBlockOutputStream向第一个datanode发起writeBlock请求。
- pipeline中如果datanode出现了写错误,满足某种策略后可以添加新的datanode,并将之前的写成功的datatnode数据通过transfer复制到新添加的节点。并且通过updateBlockForPipeline方法向namenode请求更新block时间戳,这样datanode会清除过期的block。
- pipeline中如果多个副本datanode写成功,比如三个副本以此写client->dn1->dn2->dn3,dn3发起ack给dn2,以此给client。
- pipeline如何建立(比如三个副本):
- 客户端向第一个发送scoket请求writeBlock,并同步接收第一个dn1的DataXceiver.replyOut返回结果。
- 然后dn1会继续向dn2发送scoket请求writeBlock,并同步接收第一个dn2的DataXceiver.replyOut返回结果。
- dn2会继续向dn3发送scoket请求writeBlock,并同步接收第一个dn3的DataXceiver.replyOut返回结果。
- 最后可以调用blockReceiver.receiveBlock来接收pipeline中的packet数据包。
二、DataStreamer构造方法
1、构造方法定义
该构造方法中定义了chunk使用空间,如果有空闲空间就会appendChunk设置true会执行resetChecksumBufSize方法,并且重新计算每个packet chunk 大小。
/**
* Construct a data streamer for appending to the last partial block
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
* @param bytesPerChecksum number of bytes per checksum
* @throws IOException if error occurs
*/
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException {
isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
isLazyPersistFile = isLazyPersist(stat);
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
// calculate the amount of free space in the pre-existing
// last crc chunk
// 计算chunk数据块中,已经使用的空间大小
int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
//计算chunk数据块中空闲的空间的大小
int freeInCksum = bytesPerChecksum - usedInCksum;
// if there is space in the last block, then we have to
// append to that block
if (freeInLastBlock == blockSize) {
throw new IOException("The last block for file " +
src + " is full.");
}
//如果有chunk没有满则 appendChunk = true;
if (usedInCksum > 0 && freeInCksum > 0) {
// if there is space in the last partial chunk, then
// setup in such a way that the next packet will have only
// one chunk that fills up the partial chunk.
//
computePacketChunkSize(0, freeInCksum);
setChecksumBufSize(freeInCksum);
appendChunk = true;
} else {
// if the remaining space in the block is smaller than
// that expected size of of a packet, then create
// smaller size packet.
//
computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
bytesPerChecksum);
}
// setup pipeline to append to the last block XXX retries??
setPipeline(lastBlock);
errorIndex = -1; // no errors yet.
if (nodes.length < 1) {
throw new IOException("Unable to retrieve blocks locations " +
" for last block " + block +
"of file " + src);
}
}
2、DataStreamer.run方法实现
- 如果dataQueue队列为空或者没有超时等条件会线程等待客户端发送数据到dataQueue中。
- 如果dataQueue为空,则创建一个空的心跳packet(猜想和下游datanode节点做心跳检查作用)。
- 如果dataQueue不为空,则从队列中取出一个packet。
- 当管道状态处于写文件初始化状态PIPELINE_SETUP_CREATE时候,调用nextBlockOutputStream方法请求namenode需要写哪些datanode返回以及让namenode分配新的block并且持久化到editlog中。
- 调用initDataStreaming方法启动ResponseProcessor守护线程,处理ack请求。
- 如果是最后一个packet (isLastPacketInBlock),说明该block已经写满了,可以在ResponseProcessor线程中返回ack了,但是这里等待1秒钟来确认ack。此时可以修改pipline状态PIPELINE_CLOSE,说名这个block已经写完了。
- 如果没有异常并且不是心跳packet,则将dataQueue中packet移除到ackQueue中。然后开始写数据 writeTo(blockStream)。
/* DataStreamer
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
*/
@Override
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = NullScope.INSTANCE;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
//response也是个守护线程,处理datanode写响应结果response
response.close();
response.join();
response = null;
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = false;
//写datanode有异常或者datanode重启
if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
//如果出现异常将数据packet从askqueue中移到dataqueue中
//其中setupPipelineForAppendOrRecovery会重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
doSleep = processDatanodeError();
}
synchronized (dataQueue) {
// wait for a packet to be sent.
//等待客户端发送数据
//socketTimeout默认超时60秒
long now = Time.monotonicNow();
while ((!streamerClosed && !hasError && dfsClient.clientRunning
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// get packet to be sent.
//如果队列为空,为什么要发送一个空的心跳packet
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
assert one != null;
} else {
//从队列中取出packet
one = dataQueue.getFirst(); // regular data packet
long parents[] = one.getTraceParents();
if (parents.length > 0) {
scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
// TODO: use setParents API once it's available from HTrace 3.2
// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
// scope.getSpan().setParents(parents);
}
}
}
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Allocating new block");
}
//nextBlockOutputStream方法向namenode请求要写入的哪些datanodes和分配新的block位置并且写入editlog中,
//第一个datanode写block失败抛IOException异常
setPipeline(nextBlockOutputStream());
//启动ResponseProcessor线程处理ackQueue,并修改pipline状态为BlockConstructionStage.DATA_STREAMING,说明可以传输数据了
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Append to block " + block);
}
setupPipelineForAppendOrRecovery();
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
lastByteOffsetInBlock +
" Aborting file " + src);
}
//最后一个packet,说明该block写满了,可以确认response线程中的ack了
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
//等待之前的ack确认完后再发送该packet
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
//发送最后一个packet之前可以将pipline的状态可以修改close了,这个block已经写完了,
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
Span span = null;
//如果不是心跳packet,则将packet移除到ackQueue队列中
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
span = scope.detach();
one.setTraceSpan(span);
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DataStreamer block " + block +
" sending packet " + one);
}
// write out data to remote datanode
TraceScope writeScope = Trace.startSpan("writeTo", span);
try {
//开始在IO中写数据
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
tryMarkPrimaryDatanodeFailed();
throw e;
} finally {
writeScope.close();
}
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
//如果最后一个packet发送完成,关闭packet。
// 所有的pipline的datanode都写成功了,关闭pipline,将状态设置PIPELINE_SETUP_CREATE,可以重新重新申请block
endBlock();
}
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.warn("DataStreamer Exception", e);
}
if (e instanceof IOException) {
//出现IO异常
setLastException((IOException)e);
} else {
setLastException(new IOException("DataStreamer Exception: ",e));
}
hasError = true;
if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
// Not a datanode issue
streamerClosed = true;
}
} finally {
scope.close();
}
}
closeInternal();
}
三、DataStreamer.nextBlockOutputStream各种方法实现
1、DataStreamer.nextBlockOutputStream方法实现
- 把故障的datanode缓存下来
- 通过locateFollowingBlock方法向namenode发起rpc请求获取所要写的datanode
- 通过createBlockOutputStream方法在pipline管道中为第一个datanode节点创建输出流,发起block写请求。
- 如果createBlockOutputStream返回失败,并重试三次都失败抛IO异常并且RPC通知namenode放弃这个block写入,最后会调用processDatanodeError方法把该故障datanode节点从nodes[]数组移除并继续重试到三次。
/**
* Open a DataOutputStream to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
private LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null;
//dfs.client.block.write.retries默认3次,向第一个datanode写block只能重试三次,失败之后?
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
ExtendedBlock oldBlock = block;
do {
hasError = false;
lastException.set(null);
errorIndex = -1;
success = false;
//不需要写的datanode缓存下来
DatanodeInfo[] excluded =
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
.keySet()
.toArray(new DatanodeInfo[0]);
block = oldBlock;
//rpc请求namenode获取要写入的datanodes和分配新的block
lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
storageTypes = lb.getStorageTypes();
//
// Connect to first DataNode in the list.
//创建pipline管道中第一个datanode输出流,向第一个datanode发送写block请求
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
//如果不成功将失败的datanode放到excludedNodes,并且继续重试写
if (!success) {
DFSClient.LOG.info("Abandoning " + block);
dfsClient.namenode.abandonBlock(block, fileId, src,
dfsClient.clientName);
block = null;
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
}
} while (!success && --count >= 0);
if (!success) {
throw new IOException("Unable to create new block.");
}
return lb;
}
2、DataStreamer. locateFollowingBlock方法实现
该方法通过ClientProtocol协议发起RPC请求namenode获取要写入的datanode,并且将block持久化到edit log中。如果提交的block没有达到hdfs指定的副本数,默认重试5次。
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException {
//dfs.client.block.write.locateFollowingBlock.retries默认5次
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400;
while (true) {
long localstart = Time.monotonicNow();
while (true) {
try {
//ClientProtocol RPC请求namenode获取要写入的datanode,并且将block持久化到edit log中
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
block, excludedNodes, fileId, favoredNodes);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class);
if (ue != e) {
throw ue; // no need to retry these exceptions
}
//该异常是上一个提交的block没有达到hdfs指定的副本数,则需要等待重试
if (NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
if (retries == 0) {
throw e;
} else {
--retries;
DFSClient.LOG.info("Exception while adding a block", e);
long elapsed = Time.monotonicNow() - localstart;
if (elapsed > 5000) {
DFSClient.LOG.info("Waiting for replication for "
+ (elapsed / 1000) + " seconds");
}
try {
DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
+ " retries left " + retries);
Thread.sleep(sleeptime);
sleeptime *= 2;
} catch (InterruptedException ie) {
DFSClient.LOG.warn("Caught exception ", ie);
}
}
} else {
throw e;
}
}
}
}
}
ExtendedBlock getBlock() {
return block;
}
DatanodeInfo[] getNodes() {
return nodes;
}
Token<BlockTokenIdentifier> getBlockToken() {
return accessToken;
}
private void setLastException(IOException e) {
lastException.compareAndSet(null, e);
}
}
3、DataStreamer.createBlockOutputStream方法实现
- 该方法在pipline管道中创建第一个datanode输出流
- 通过Sender方法发起socket请求writeBlock
- 如果有重启节点出现,也会标志重启标志,然后在上层方法移除改节点
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
if (nodes.length == 0) {
DFSClient.LOG.info("nodes are empty for write pipeline of block "
+ block);
return false;
}
Status pipelineStatus = SUCCESS;
String firstBadLink = "";
boolean checkRestart = false;
if (DFSClient.LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
DFSClient.LOG.debug("pipeline = " + nodes[i]);
}
}
// persist blocks on namenode on next flush
persistBlocks.set(true);
int refetchEncryptionKey = 1;
while (true) {
boolean result = false;
DataOutputStream out = null;
try {
assert null == s : "Previous socket unclosed";
assert null == blockReplyStream : "Previous blockReplyStream unclosed";
//第一个datanode建立socket
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
//datanode写socket超时时间,默认是8分钟加上datanode个数乘以5秒钟
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s);
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(unbufIn);
//
// Xmit header info to datanode
//
BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
// We cannot change the block length in 'block' as it counts the number
// of bytes ack'ed.
ExtendedBlock blockCopy = new ExtendedBlock(block);
blockCopy.setNumBytes(blockSize);
//目标datanode
boolean[] targetPinnings = getPinnings(nodes, true);
// send the request
//通过scoket远程写
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
// receive ack for connect
//接收下游的pipeline ack确认
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(blockReplyStream));
//返回pipline状态值
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
// Got an restart OOB ack.
// If a node is already restarting, this status is not likely from
// the same node. If it is from a different node, it is not
// from the local datanode. Thus it is safe to treat this as a
// regular node error.
//如果有重启的datanode抛IO异常,然后在processDatanodeError从nodes[]异常故障节点
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
restartingNodeIndex.get() == -1) {
checkRestart = true;
throw new IOException("A datanode is restarting.");
}
String logInfo = "ack with firstBadLink as " + firstBadLink;
DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
restartingNodeIndex.set(-1);
hasError = false;
} catch (IOException ie) {
if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
}
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ nodes[0] + " : " + ie);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
// Don't close the socket/exclude this node just yet. Try again with
// a new encryption key.
continue;
}
// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
// NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
errorIndex = i;
break;
}
}
} else {
assert checkRestart == false;
errorIndex = 0;
}
// Check whether there is a restart worth waiting for.
//校验是不是有datanode重启
if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.monotonicNow();
restartingNodeIndex.set(errorIndex);
errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
nodes[restartingNodeIndex.get()]);
}
hasError = true;
setLastException(ie);
result = false; // error
} finally {
if (!result) {
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
out = null;
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
}
}
return result;
}
}
四、DataStreamer. processDatanodeError处理异常逻辑
1、processDatanodeError该方法异常处理实现
- 该方法主要关闭所有流并且将ackQueue中packet全部放到dataQueue中。
- 如果一个packet异常恢复超过5次都失败了,则 streamerClosed = true,停止恢复。
// If this stream has encountered any errors so far, shutdown
// threads and mark stream as closed. Returns true if we should
// sleep for a while after returning from this call.
//
private boolean processDatanodeError() throws IOException {
if (response != null) {
DFSClient.LOG.info("Error Recovery for " + block +
" waiting for responder to exit. ");
return true;
}
closeStream();
// move packets from ack queue to front of the data queue
synchronized (dataQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}
// Record the new pipeline failure recovery.
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
pipelineRecoveryCount = 1;
} else {
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
}
}
//setupPipelineForAppendOrRecovery会重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
boolean doSleep = setupPipelineForAppendOrRecovery();
if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
// If we had an error while closing the pipeline, we go through a fast-path
// where the BlockReceiver does not run. Instead, the DataNode just finalizes
// the block immediately during the 'connect ack' process. So, we want to pull
// the end-of-block packet from the dataQueue, since we don't actually have
// a true pipeline to send it over.
//
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
// a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) {
DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
Span span = endOfBlockPacket.getTraceSpan();
if (span != null) {
// Close any trace span associated with this Packet
TraceScope scope = Trace.continueSpan(span);
scope.close();
}
assert endOfBlockPacket.isLastPacketInBlock();
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
lastAckedSeqno = endOfBlockPacket.getSeqno();
dataQueue.notifyAll();
}
endBlock();
} else {
initDataStreaming();
}
}
return doSleep;
}
2、setupPipelineForAppendOrRecovery处理实现
- 该方法append操作时候创建pipline管道
- pipline管道写数据异常时候,进行恢复操作(本文讨论就是异常情况下)
- 等待重启的datanode启动
- 将错误的datanode从pipline中删除,调用addDatanode2ExistingPipeline方法将新的datanode加入到pipline中
- 更新namenode上namespace时间戳,异常datanode上的过期block就可以删除了。
/**
* Open a DataOutputStream to a DataNode pipeline so that
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
* //重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
*/
private boolean setupPipelineForAppendOrRecovery() throws IOException {
// check number of datanodes
if (nodes == null || nodes.length == 0) {
String msg = "Could not get block locations. " + "Source file \""
+ src + "\" - Aborting...";
DFSClient.LOG.warn(msg);
setLastException(new IOException(msg));
streamerClosed = true;
return false;
}
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
// Sleep before reconnect if a dn is restarting.
// This process will be repeated until the deadline or the datanode
// starts back up.
if (restartingNodeIndex.get() >= 0) {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
4000L);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
lastException.set(new IOException("Interrupted while waiting for " +
"datanode to restart. " + nodes[restartingNodeIndex.get()]));
streamerClosed = true;
return false;
}
}
boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
//
if (errorIndex >= 0) {
StringBuilder pipelineMsg = new StringBuilder();
for (int j = 0; j < nodes.length; j++) {
pipelineMsg.append(nodes[j]);
if (j < nodes.length - 1) {
pipelineMsg.append(", ");
}
}
if (nodes.length <= 1) {
lastException.set(new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting..."));
streamerClosed = true;
return false;
}
DFSClient.LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex]);
failed.add(nodes[errorIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
//
arraycopy(nodes, newnodes, errorIndex);
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
arraycopy(storageTypes, newStorageTypes, errorIndex);
final String[] newStorageIDs = new String[newnodes.length];
arraycopy(storageIDs, newStorageIDs, errorIndex);
//
//重新更新pipline状态,如果有异常的datanode,会从nodes[]输出删除
setPipeline(newnodes, newStorageTypes, newStorageIDs);
// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex.get() >= 0) {
// If the error came from a node further away than the restarting
// node, the restart must have been complete.
if (errorIndex > restartingNodeIndex.get()) {
restartingNodeIndex.set(-1);
} else if (errorIndex < restartingNodeIndex.get()) {
// the node index has shifted.
restartingNodeIndex.decrementAndGet();
} else {
// this shouldn't happen...
assert false;
}
}
if (restartingNodeIndex.get() == -1) {
hasError = false;
}
lastException.set(null);
errorIndex = -1;
}
// Check if replace-datanode policy is satisfied.
//如果不满足这种策略怎么,失败的一个节点就不写了吗,怎么保证三副本?
/**
/**
* 由dfs.client.block.write.replace-datanode-on-failure.policy参数控制策略,默认DEFAULT
*
* DEFAULT condition:
* Let r be the replication number.
* Let n be the number of existing datanodes.
* Add a new datanode only if r >= 3 and either
* (1) floor(r/2) >= n; or
* (2) r > n and the block is hflushed/appended.
* r:副本数,n:pipeline中存活的datanode数
* 如果r>=3并且满足下面任意一个条件才替换发生故障的dn
* floor(r/2) >= n; or (当hflush/append被调用并且r > n)
*/
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
nodes, isAppend, isHflushed)) {
try {
//pipeline中添加新的datanode节点并且 通过transfer将之前成功传输的数据块复制到另一个datanode上
addDatanode2ExistingPipeline();
} catch(IOException ioe) {
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
throw ioe;
}
DFSClient.LOG.warn("Failed to replace datanode."
+ " Continue with the remaining datanodes since "
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
+ " is set to true.", ioe);
}
}
// get a new generation stamp and an access token
//RPC请求更新时间戳,datanode可以删除过期的block
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
// good reports should follow bad ones, if client committed
// with those nodes.
Thread.sleep(2000);
} catch (InterruptedException ie) {}
} else {
//重新给第一个datanode创建输出流,向第一个datanode写block
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
}
if (restartingNodeIndex.get() >= 0) {
assert hasError == true;
// check errorIndex set above
if (errorIndex == restartingNodeIndex.get()) {
// ignore, if came from the restarting node
errorIndex = -1;
}
// still within the deadline
if (Time.monotonicNow() < restartDeadline) {
continue; // with in the deadline
}
// expired. declare the restarting node dead
restartDeadline = 0;
int expiredNodeIndex = restartingNodeIndex.get();
restartingNodeIndex.set(-1);
DFSClient.LOG.warn("Datanode did not restart in time: " +
nodes[expiredNodeIndex]);
// Mark the restarting node as failed. If there is any other failed
// node during the last pipeline construction attempt, it will not be
// overwritten/dropped. In this case, the restarting node will get
// excluded in the following attempt, if it still does not come up.
if (errorIndex == -1) {
errorIndex = expiredNodeIndex;
}
// From this point on, normal pipeline recovery applies.
}
} // while
if (success) {
// update pipeline at the namenode
ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
// update client side generation stamp
block = newBlock;
}
return false; // do not sleep, continue processing
}
3、addDatanode2ExistingPipeline处理实现
该方法RPC调用namenode的getAdditionalDatanode,增加一个新的datanode,来替换异常的datanode
private void addDatanode2ExistingPipeline() throws IOException {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
}
/*
* Is data transfer necessary? We have the following cases.
*
* Case 1: Failure in Pipeline Setup
* - Append
* + Transfer the stored replica, which may be a RBW or a finalized.
* - Create
* + If no data, then no transfer is required.
* + If there are data written, transfer RBW. This case may happens
* when there are streaming failure earlier in this pipeline.
*
* Case 2: Failure in Streaming
* - Append/Create:
* + transfer RBW
*
* Case 3: Failure in Close
* - Append/Create:
* + no transfer, let NameNode replicates the block.
*/
if (!isAppend && lastAckedSeqno < 0
&& stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
//no data have been written
return;
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE
|| stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
//pipeline is closing
return;
}
//get a new datanode
final DatanodeInfo[] original = nodes;
final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
src, fileId, block, nodes, storageIDs,
failed.toArray(new DatanodeInfo[failed.size()]),
1, dfsClient.clientName);
setPipeline(lb);
//find the new datanode
final int d = findNewDatanode(original);
//transfer replica
final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
final DatanodeInfo[] targets = {nodes[d]};
final StorageType[] targetStorageTypes = {storageTypes[d]};
//pipeline中添加新的datanode节点并且 通过transfer将之前成功传输的数据块复制到另一个datanode上
transfer(src, targets, targetStorageTypes, lb.getBlockToken());
}
4、transfer处理实现
通过sokect复制数据
//主要将之前成功传输的数据块复制到另一个datanode上
//src之前写过的节点,targets新加的节点
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//transfer replica to the new datanode
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
sock = createSocketForPipeline(src, 2, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
unbufOut, unbufIn, dfsClient, blockToken, src);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
targets, targetStorageTypes);
out.flush();
//ack
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
throw new IOException("Failed to add a datanode");
}
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
IOUtils.closeSocket(sock);
}
}