文件创建流程

NameNodeRpcServer 实现了ClientProtocol协议,rpc调用create方法;create方法主要是做一些参数检查,active/standby nn检查,以及metrics监控项统计,然后进入startFile方法中

  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws IOException {
    // check namenode 是否启动
    checkNNStartup();
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.create: file "
                         +src+" for "+clientName+" at "+clientMachine);
    }
    // 检查src path是否满足条件,最大长度不超过8000个字符,level深度不超过1000
    if (!checkPathLength(src)) {
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
        getRemoteUser().getShortUserName(), null, masked),
        clientName, clientMachine, flag.get(), createParent, replication,
        blockSize, supportedVersions);
    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return fileStatus;
  }

startFile函数中会先访问缓存,判断本次请求是否是一次重试请求,根据clientId和callId来标示同一次rpc调用,如果是,则判断上一次请求是否成功,将缓存结果直接返回,否则,进入startFileInt函数,在finally中,我们可以看到会缓存成功的结果;如果创建失败,则执行logAuditEvent打印审计日志

  /**
   * Create a new file entry in the namespace.
   * 
   * For description of parameters and exceptions thrown see
   * {@link ClientProtocol#create}, except it returns valid file status upon
   * success
   */
  HdfsFileStatus startFile(String src, PermissionStatus permissions,
      String holder, String clientMachine, EnumSet<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws AccessControlException, SafeModeException,
      FileAlreadyExistsException, UnresolvedLinkException,
      FileNotFoundException, ParentNotDirectoryException, IOException {
    HdfsFileStatus status = null;
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
        null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (HdfsFileStatus) cacheEntry.getPayload();
    }
    
    try {
      status = startFileInt(src, permissions, holder, clientMachine, flag,
          createParent, replication, blockSize, supportedVersions,
          cacheEntry != null);
    } catch (AccessControlException e) {
      logAuditEvent(false, "create", src);
      throw e;
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }
    return status;
  }

startFileInt
blockMananger的verifyReplication用来check副本数是否在配置范围内,否则抛出异常
这个地方做了一些加密判断,没有很了解,后面再看,我们继续往下看

  private HdfsFileStatus startFileInt(final String srcArg,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, CryptoProtocolVersion[] supportedVersions,
      boolean logRetryCache)
      throws AccessControlException, SafeModeException,
      FileAlreadyExistsException, UnresolvedLinkException,
      FileNotFoundException, ParentNotDirectoryException, IOException {
    String src = srcArg;
    if (NameNode.stateChangeLog.isDebugEnabled()) {
      StringBuilder builder = new StringBuilder();
      builder.append("DIR* NameSystem.startFile: src=" + src
              + ", holder=" + holder
              + ", clientMachine=" + clientMachine
              + ", createParent=" + createParent
              + ", replication=" + replication
              + ", createFlag=" + flag.toString()
              + ", blockSize=" + blockSize);
      builder.append(", supportedVersions=");
      if (supportedVersions != null) {
        builder.append(Arrays.toString(supportedVersions));
      } else {
        builder.append("null");
      }
      NameNode.stateChangeLog.debug(builder.toString());
    }
    if (!DFSUtil.isValidName(src)) {
      throw new InvalidPathException(src);
    }
    blockManager.verifyReplication(src, replication, clientMachine);

    boolean skipSync = false;
    HdfsFileStatus stat = null;
    FSPermissionChecker pc = getPermissionChecker();
    if (blockSize < minBlockSize) {
      throw new IOException("Specified block size is less than configured" +
          " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
          + "): " + blockSize + " < " + minBlockSize);
    }
    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
    boolean create = flag.contains(CreateFlag.CREATE);
    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
    boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);

    waitForLoadingFSImage();

    /**
     * If the file is in an encryption zone, we optimistically create an
     * EDEK for the file by calling out to the configured KeyProvider.
     * Since this typically involves doing an RPC, we take the readLock
     * initially, then drop it to do the RPC.
     * 
     * Since the path can flip-flop between being in an encryption zone and not
     * in the meantime, we need to recheck the preconditions when we retake the
     * lock to do the create. If the preconditions are not met, we throw a
     * special RetryStartFileException to ask the DFSClient to try the create
     * again later.
     */
    CryptoProtocolVersion protocolVersion = null;
    CipherSuite suite = null;
    String ezKeyName = null;
    EncryptedKeyVersion edek = null;

    if (provider != null) {
      readLock();
      try {
        src = dir.resolvePath(pc, src, pathComponents);
        INodesInPath iip = dir.getINodesInPath4Write(src);
        // Nothing to do if the path is not within an EZ
        final EncryptionZone zone = dir.getEZForPath(iip);
        if (zone != null) {
          protocolVersion = chooseProtocolVersion(zone, supportedVersions);
          suite = zone.getSuite();
          ezKeyName = zone.getKeyName();

          Preconditions.checkNotNull(protocolVersion);
          Preconditions.checkNotNull(suite);
          Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
              "Chose an UNKNOWN CipherSuite!");
          Preconditions.checkNotNull(ezKeyName);
        }
      } finally {
        readUnlock();
      }

      Preconditions.checkState(
          (suite == null && ezKeyName == null) ||
              (suite != null && ezKeyName != null),
          "Both suite and ezKeyName should both be null or not null");

      // Generate EDEK if necessary while not holding the lock
      edek = generateEncryptedDataEncryptionKey(ezKeyName);
      EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
    }

    // Proceed with the create, using the computed cipher suite and 
    // generated EDEK
    BlocksMapUpdateInfo toRemoveBlocks = null;
    writeLock();
    try {
      checkOperation(OperationCategory.WRITE);
      checkNameNodeSafeMode("Cannot create file" + src);
      src = dir.resolvePath(pc, src, pathComponents);
      toRemoveBlocks = startFileInternal(pc, src, permissions, holder, 
          clientMachine, create, overwrite, createParent, replication, 
          blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
      stat = dir.getFileInfo(src, false,
          FSDirectory.isReservedRawName(srcArg), true);
    } catch (StandbyException se) {
      skipSync = true;
      throw se;
    } finally {
      writeUnlock();
      // There might be transactions logged while trying to recover the lease.
      // They need to be sync'ed even when an exception was thrown.
      if (!skipSync) {
        getEditLog().logSync();
        if (toRemoveBlocks != null) {
          removeBlocks(toRemoveBlocks);
          toRemoveBlocks.clear();
        }
      }
    }

    logAuditEvent(true, "create", srcArg, null, stat);
    return stat;
  }

startFileInternal

  • FSDirectory.getInodesInPath4Write 返回了一个数据结构INodesInPath,即在目录上的所有Inode节点,默认都是INodeDirectory;
  • 这里会先check该路径上最后一个层级是否已经有目录,如果是目录重名的话抛出异常;
  • 如果是文件的话,则需要检查是否设置了overwrite,如果设置了,则先调用dir.delete删掉该文件,然后统计待remove掉的block块;
  • 如果设置了递归建立不存在目录,则执行递归建立目录;
  • 最后调用dir.addFile
  private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
      String src, PermissionStatus permissions, String holder, 
      String clientMachine, boolean create, boolean overwrite, 
      boolean createParent, short replication, long blockSize, 
      boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
      EncryptedKeyVersion edek, boolean logRetryEntry)
      throws FileAlreadyExistsException, AccessControlException,
      UnresolvedLinkException, FileNotFoundException,
      ParentNotDirectoryException, RetryStartFileException, IOException {
    assert hasWriteLock();
    // Verify that the destination does not exist as a directory already.
    final INodesInPath iip = dir.getINodesInPath4Write(src);
    final INode inode = iip.getLastINode();
    if (inode != null && inode.isDirectory()) {
      throw new FileAlreadyExistsException(src +
          " already exists as a directory");
    }

    FileEncryptionInfo feInfo = null;

    final EncryptionZone zone = dir.getEZForPath(iip);
    if (zone != null) {
      // The path is now within an EZ, but we're missing encryption parameters
      if (suite == null || edek == null) {
        throw new RetryStartFileException();
      }
      // Path is within an EZ and we have provided encryption parameters.
      // Make sure that the generated EDEK matches the settings of the EZ.
      final String ezKeyName = zone.getKeyName();
      if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
        throw new RetryStartFileException();
      }
      feInfo = new FileEncryptionInfo(suite, version,
          edek.getEncryptedKeyVersion().getMaterial(),
          edek.getEncryptedKeyIv(),
          ezKeyName, edek.getEncryptionKeyVersionName());
      Preconditions.checkNotNull(feInfo);
    }

    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
    if (isPermissionEnabled) {
      if (overwrite && myFile != null) {
        if (!dir.doPermissionCheckPatch(pc, src, FsAction.WRITE))
          checkPathAccess(pc, src, FsAction.WRITE);
      }
      /*
       * To overwrite existing file, need to check 'w' permission 
       * of parent (equals to ancestor in this case)
       */
      if (!dir.doPermissionCheckPatch(pc, src, FsAction.WRITE))
        checkAncestorAccess(pc, src, FsAction.WRITE);
    }

    if (!createParent) {
      verifyParentDir(src);
    }

    try {
      BlocksMapUpdateInfo toRemoveBlocks = null;
      if (myFile == null) {
        if (!create) {
          throw new FileNotFoundException("Can't overwrite non-existent " +
              src + " for client " + clientMachine);
        }
      } else {
        if (overwrite) {
          toRemoveBlocks = new BlocksMapUpdateInfo();
          List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
          long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now());
          if (ret >= 0) {
            incrDeletedFileCount(ret);
            removePathAndBlocks(src, null, toRemoveINodes, true);
          }
        } else {
          // If lease soft limit time is expired, recover the lease
          recoverLeaseInternal(myFile, src, holder, clientMachine, false);
          throw new FileAlreadyExistsException(src + " for client " +
              clientMachine + " already exists");
        }
      }

      checkFsObjectLimit();
      INodeFile newNode = null;

      // Always do an implicit mkdirs for parent directory tree.
      Path parent = new Path(src).getParent();
      if (parent != null && mkdirsRecursively(parent.toString(),
              permissions, true, now())) {
        newNode = dir.addFile(src, permissions, replication, blockSize,
                              holder, clientMachine);
      }

      if (newNode == null) {
        throw new IOException("Unable to add " + src +  " to namespace");
      }
      leaseManager.addLease(newNode.getFileUnderConstructionFeature()
          .getClientName(), src);

      // Set encryption attributes if necessary
      if (feInfo != null) {
        dir.setFileEncryptionInfo(src, feInfo);
        newNode = dir.getInode(newNode.getId()).asFile();
      }

      setNewINodeStoragePolicy(newNode, iip, isLazyPersist);

      // record file record in log, record new generation stamp
      getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added {}" +
          " inode {} holder {}", src, newNode.getId(), holder);
      return toRemoveBlocks;
    } catch (IOException ie) {
      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
          ie.getMessage());
      throw ie;
    }
  }

addFile这里主要调用的方法是addChild,会对目录做quota检查,然后将InodeFile添加到父亲InodeDirectory的children里,这里会将InodeFile增加UnderConstruct属性,标示是正在构建的Inode节点

  INodeFile addFile(String path, PermissionStatus permissions,
                    short replication, long preferredBlockSize,
                    String clientName, String clientMachine)
    throws FileAlreadyExistsException, QuotaExceededException,
      UnresolvedLinkException, SnapshotAccessControlException, AclException {

    long modTime = now();
    INodeFile newNode = newINodeFile(namesystem.allocateNewInodeId(),
        permissions, modTime, modTime, replication, preferredBlockSize);
    newNode.toUnderConstruction(clientName, clientMachine);

    boolean added = false;
    writeLock();
    try {
      added = addINode(path, newNode, permissions.getPermission());
    } finally {
      writeUnlock();
    }
    if (!added) {
      NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
      return null;
    }

    if(NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug("DIR* addFile: " + path + " is added");
    }
    return newNode;
  }

addChild

  private boolean addChild(INodesInPath iip, int pos, INode child,
                           FsPermission modes, boolean checkQuota)
      throws QuotaExceededException {
    final INode[] inodes = iip.getINodes();
    // Disallow creation of /.reserved. This may be created when loading
    // editlog/fsimage during upgrade since /.reserved was a valid name in older
    // release. This may also be called when a user tries to create a file
    // or directory /.reserved.
    if (pos == 1 && inodes[0] == rootDir && isReservedName(child)) {
      throw new HadoopIllegalArgumentException(
          "File name \"" + child.getLocalName() + "\" is reserved and cannot "
              + "be created. If this is during upgrade change the name of the "
              + "existing file or directory to another name before upgrading "
              + "to the new release.");
    }
    // The filesystem limits are not really quotas, so this check may appear
    // odd. It's because a rename operation deletes the src, tries to add
    // to the dest, if that fails, re-adds the src from whence it came.
    // The rename code disables the quota when it's restoring to the
    // original location becase a quota violation would cause the the item
    // to go "poof".  The fs limits must be bypassed for the same reason.
    if (checkQuota) {
      verifyMaxComponentLength(child.getLocalNameBytes(), inodes, pos);
      verifyMaxDirItems(inodes, pos);
    }
    // always verify inode name
    verifyINodeName(child.getLocalNameBytes());
    
    final Quota.Counts counts = child.computeQuotaUsage();
    updateCount(iip, pos,
        counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
    boolean isRename = (child.getParent() != null);
    final INodeDirectory parent = inodes[pos-1].asDirectory();
    boolean added;
    try {
      added = parent.addChild(child, true, iip.getLatestSnapshotId());
    } catch (QuotaExceededException e) {
      updateCountNoQuotaCheck(iip, pos,
          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
      throw e;
    }
    if (!added) {
      updateCountNoQuotaCheck(iip, pos,
          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
    } else {
      iip.setINode(pos - 1, child.getParent());
      if (!isRename) {
        copyINodeDefaultAcl(child, modes);
      }
      addToInodeMap(child);
    }
    return added;
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容