namenode主要创建客户端RPC请求的准写的文件,namenode会把文件名保存到namespace中,然后返回给客户端HdfsFileStatus,最后客户端在获取hdfsFileStatus时候new DFSOutputStream实例,并启动DFSOutputStream中的 DataStreamer Demon线程并且返回给客户端DFSOutputStream对象。
* Create an FSDataOutputStream at the indicated Path.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize
) throws IOException {
return create(f, overwrite, bufferSize, replication, blockSize, null);
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress
) throws IOException {
return this.create(f, FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(getConf())), overwrite, bufferSize,
replication, blockSize, progress);
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
// 1. DistributedFileSystem DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1307090268_1, ugi=lixun_XXX (auth:SIMPLE)]]
return this.create(f, permission,
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
blockSize, progress, null);
然后调用FileSystemLinkResolver. resolve()方法,该方法中调用了doCall方法,如果创建流失败,会调用next方法继续调用doCall直到流成功为止,当然默认只能尝试创建32次。
public FSDataOutputStream create(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize, final Progressable progress,
final ChecksumOpt checksumOpt) throws IOException {
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
public FSDataOutputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication, blockSize, progress, bufferSize,
return dfs.createWrappedOutputStream(dfsos, statistics);
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission, cflags, bufferSize,
replication, blockSize, progress, checksumOpt);
}.resolve(this, absF);
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
* long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
* set to true.
public DFSOutputStream create(String src,
FsPermission permission,
EnumSet<CreateFlag> flag,
short replication,
long blockSize,
Progressable progress,
int buffersize,
ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
replication, blockSize, progress, buffersize, checksumOpt, null);
public DFSOutputStream create(String src,
FsPermission permission,
EnumSet<CreateFlag> flag,
boolean createParent,
short replication,
long blockSize,
Progressable progress,
int buffersize,
ChecksumOpt checksumOpt,
InetSocketAddress[] favoredNodes) throws IOException {
if (permission == null) {
permission = FsPermission.getFileDefault();
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
// 通过rpc请求namenode创建hdfsfile之后,然后在创建FSDataOutputStream流
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt),
beginFileLease(result.getFileId(), result);
return result;
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum, String[] favoredNodes) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForCreate", src);
try {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
} else {
throw e;
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
//启动DataStreamer Demon线程
return out;
} finally {
/** Construct a new output stream for creating a file. */
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes) throws IOException {
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
//计算chunk大小已经每个packet包含多少个chunk(默认127个),writePacketSize默认65536 bytesPerChecksum默认521,计
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
streamer = new DataStreamer(stat, null);
//favoredNodes null
if (favoredNodes != null && favoredNodes.length != 0) {
通过ClientProtocol协议向namenode server发起请求
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
throws AccessControlException, AlreadyBeingCreatedException,
DSQuotaExceededException, FileAlreadyExistsException,
FileNotFoundException, NSQuotaExceededException,
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
IOException {
CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
CreateRequestProto req = builder.build();
try {
CreateResponseProto res = rpcProxy.create(null, req);
return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
@Override // ClientProtocol
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
throws IOException {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
+src+" for "+clientName+" at "+clientMachine);
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
//cacheEntry.isSuccess() return false
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
HdfsFileStatus status = null;
try {
PermissionStatus perm = new PermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
//在namespace中创建hdfs file并返回文件信息状态
status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions,
cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status != null, status);
return status;
* Create a new file entry in the namespace.
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}, except it returns valid file status upon
* success
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
HdfsFileStatus status = null;
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions,
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
return status;
private HdfsFileStatus startFileInt(final String srcArg,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions,
boolean logRetryCache)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine
+ ", createParent=" + createParent
+ ", replication=" + replication
+ ", createFlag=" + flag.toString()
+ ", blockSize=" + blockSize);
builder.append(", supportedVersions=");
if (supportedVersions != null) {
} else {
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
blockManager.verifyReplication(src, replication, clientMachine);
boolean skipSync = false;
HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
if (blockSize < minBlockSize) {
throw new IOException("Specified block size is less than configured" +
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
+ "): " + blockSize + " < " + minBlockSize);
//src不是以/.reserved开头的文件名则pathComponents null
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
//flag 包含0 = {CreateFlag@5195} "CREATE" 1 = {CreateFlag@5196} "OVERWRITE"
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
* If the file is in an encryption zone, we optimistically create an
* EDEK for the file by calling out to the configured KeyProvider.
* Since this typically involves doing an RPC, we take the readLock
* initially, then drop it to do the RPC.
* Since the path can flip-flop between being in an encryption zone and not
* in the meantime, we need to recheck the preconditions when we retake the
* lock to do the create. If the preconditions are not met, we throw a
* special RetryStartFileException to ask the DFSClient to try the create
* again later.
CryptoProtocolVersion protocolVersion = null;
CipherSuite suite = null;
String ezKeyName = null;
EncryptedKeyVersion edek = null;
//provider is null
if (provider != null) {
try {
src = dir.resolvePath(pc, src, pathComponents);
INodesInPath iip = dir.getINodesInPath4Write(src);
// Nothing to do if the path is not within an EZ
final EncryptionZone zone = dir.getEZForPath(iip);
if (zone != null) {
protocolVersion = chooseProtocolVersion(zone, supportedVersions);
suite = zone.getSuite();
ezKeyName = zone.getKeyName();
"Chose an UNKNOWN CipherSuite!");
} finally {
(suite == null && ezKeyName == null) ||
(suite != null && ezKeyName != null),
"Both suite and ezKeyName should both be null or not null");
// Generate EDEK if necessary while not holding the lock
edek = generateEncryptedDataEncryptionKey(ezKeyName);
// Proceed with the create, using the computed cipher suite and
// generated EDEK
BlocksMapUpdateInfo toRemoveBlocks = null;
try {
checkNameNodeSafeMode("Cannot create file" + src);
try {
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
toRemoveBlocks = startFileInternal(
pc, iip, permissions, holder,
clientMachine, create, overwrite,
createParent, replication, blockSize,
isLazyPersist, suite, protocolVersion, edek,
stat = FSDirStatAndListingOp.getFileInfo(dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
} finally {
} catch (StandbyException se) {
skipSync = true;
throw se;
} finally {
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
if (toRemoveBlocks != null) {
//在blockmanager 删除removeBlocks
logAuditEvent(true, "create", srcArg, null, stat);
return stat;
* Create a new file or overwrite an existing file<br>
* Once the file is create the client then allocates a new block with the next
* call using {@link ClientProtocol#addBlock}.
* <p>
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}
private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
INodesInPath iip, PermissionStatus permissions, String holder,
String clientMachine, boolean create, boolean overwrite,
boolean createParent, short replication, long blockSize,
boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
EncryptedKeyVersion edek, boolean logRetryEntry)
throws IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INode inode = iip.getLastINode();
final String src = iip.getPath();
if (inode != null && inode.isDirectory()) {
throw new FileAlreadyExistsException(src +
" already exists as a directory");
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
if (isPermissionEnabled) {
if (overwrite && myFile != null) {
dir.checkPathAccess(pc, iip, FsAction.WRITE);
* To overwrite existing file, need to check 'w' permission
* of parent (equals to ancestor in this case)
dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
if (!createParent) {
dir.verifyParentDir(iip, src);
FileEncryptionInfo feInfo = null;
final EncryptionZone zone = dir.getEZForPath(iip);
if (zone != null) {
// The path is now within an EZ, but we're missing encryption parameters
if (suite == null || edek == null) {
throw new RetryStartFileException();
// Path is within an EZ and we have provided encryption parameters.
// Make sure that the generated EDEK matches the settings of the EZ.
final String ezKeyName = zone.getKeyName();
if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
throw new RetryStartFileException();
feInfo = new FileEncryptionInfo(suite, version,
ezKeyName, edek.getEncryptionKeyVersionName());
try {
BlocksMapUpdateInfo toRemoveBlocks = null;
if (myFile == null) {
if (!create) {
throw new FileNotFoundException("Can't overwrite non-existent " +
src + " for client " + clientMachine);
} else {
if (overwrite) {
toRemoveBlocks = new BlocksMapUpdateInfo();
List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
long ret = FSDirDeleteOp.delete(dir, iip, toRemoveBlocks,
toRemoveINodes, now());
if (ret >= 0) {
iip = INodesInPath.replace(iip, iip.length() - 1, null);
removeLeasesAndINodes(src, toRemoveINodes, true);
} else {
// If lease soft limit time is expired, recover the lease
iip, src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
// dfs.namenode.max.objects默认0不校验
INodeFile newNode = null;
// Always do an implicit mkdirs for parent directory tree.
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
.createAncestorDirectories(dir, iip, permissions);
if (parent != null) {
iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
replication, blockSize, holder, clientMachine);
newNode = iip != null ? iip.getLastINode().asFile() : null;
if (newNode == null) {
throw new IOException("Unable to add " + src + " to namespace");
.getClientName(), src);
// Set encryption attributes if necessary
if (feInfo != null) {
dir.setFileEncryptionInfo(src, feInfo);
newNode = dir.getInode(newNode.getId()).asFile();
setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added {}" +
" inode {} holder {}", src, newNode.getId(), holder);
return toRemoveBlocks;
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
throw ie;