切片逻辑之CombineFileInputFormat --用于输入文件含有大量小文件的情况

将文件按照maxSplitSize进行逻辑切片,划分为若干部分!(在minSplitSizeNode(同一节点的数据块)=0&minSplitSizeRac(同一机架的数据块)=0的情况下)
    判断待切部分是否  <  maxSplitSize,如果小于整个作为 1 part
    maxSplitSize <待切部分 < maxSplitSize * 2, 将待切部分均分为2 part
    待切部分  > maxSplitSize * 2, 先切出 maxSplitSize作为一部分,再循环判断!
    将多个part进行组合,只要大小超过maxSplitSize,这些part就作为1片!
如果minSplitSizeNode&minSplitSizeRack不为零,则还要另行做判断

1.getSplit()

@Override
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    long minSizeNode = 0;
    long minSizeRack = 0;
    long maxSize = 0;
    Configuration conf = job.getConfiguration();

    // 通过setxxxSplitSize()方法设置的参数值会覆盖掉从配置文件中读取的参数值
    if (minSplitSizeNode != 0) {
      minSizeNode = minSplitSizeNode;
    } else {
      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
    }
    if (minSplitSizeRack != 0) {
      minSizeRack = minSplitSizeRack;
    } else {
      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
    }
    if (maxSplitSize != 0) {
      maxSize = maxSplitSize;
    } else {

      //如果maxSize没有配置,整个Node生成一个Split
      maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
    
    }
    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
      throw new IOException("Minimum split size pernode " + minSizeNode +
                            " cannot be larger than maximum split size " +
                            maxSize);
    }
    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
      throw new IOException("Minimum split size per rack " + minSizeRack +
                            " cannot be larger than maximum split size " +
                            maxSize);
    }
    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
      throw new IOException("Minimum split size per node " + minSizeNode +
                            " cannot be larger than minimum split " +
                            "size per rack " + minSizeRack);
    }

    //获取输入路径中的所有文件
    List<FileStatus> stats = listStatus(job);
    List<InputSplit> splits = new ArrayList<InputSplit>();
    if (stats.size() == 0) {
      return splits;    
    }

    // 迭代为每个过滤池中的文件生成切片
   //一个切片中的数据块只可能来自于同一个过滤池,但可以来自同一个过滤池中的不同文件
    for (MultiPathFilter onepool : pools) {
      ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();

      
      //获取满足当前过滤池实例onepool的所有文件myPaths
      for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
        FileStatus p = iter.next();
        if (onepool.accept(p.getPath())) {
          myPaths.add(p); // add it to my output set
          iter.remove();
        }
      }
      //为mypaths中的文件生成切片
      getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
    }

    //为不属于任何过滤池的文件生成切片
    getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);

    //free up rackToNodes map
    rackToNodes.clear();
    return splits;    
  }

2.源码:getMoreSplits()
无论是满足某过滤池实例 onePool 条件的文件,还是不属于任何过滤池的文件,可以笼统地理解为 "一批文件",getMoreSplits()就是为这一批文件生成切片的。

/**
   * Return all the splits in the specified set of paths
   */
  private void getMoreSplits(JobContext job, List<FileStatus> stats,
                             long maxSize, long minSizeNode, long minSizeRack,
                             List<InputSplit> splits)
    throws IOException {
    Configuration conf = job.getConfiguration();

    //OneFileInfo类:代表一个文件 
    OneFileInfo[] files;

  
    //rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;
    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
                              new HashMap<String, List<OneBlockInfo>>();

    //blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点
    HashMap<OneBlockInfo, String[]> blockToNodes = 
                              new HashMap<OneBlockInfo, String[]>();

    //nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;
    HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
                              new HashMap<String, Set<OneBlockInfo>>();
  
    files = new OneFileInfo[stats.size()];
    if (stats.size() == 0) {
      return; 
    }

   /**
    * 迭代这"一批文件",为每一个文件构建OneFileInfo对象
    * OneFileInfo对象在构建过程中维护了上述三个对应关系的信息。
    * 迭代完成之后,即可以认为数据块、节点、机架相互之间的对应关系已经建立完毕
    * 接下来可以根据这些信息生成切片
    */
    long totLength = 0;
    int i = 0;
    for (FileStatus stat : stats) {
      files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
                                 rackToBlocks, blockToNodes, nodeToBlocks,
                                 rackToNodes, maxSize);
      totLength += files[i].getLength();
    }

    //切片的形成过程
    createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
                 maxSize, minSizeNode, minSizeRack, splits);
  }

3.源码:createSplits()

  @VisibleForTesting
  void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
                     Map<OneBlockInfo, String[]> blockToNodes,
                     Map<String, List<OneBlockInfo>> rackToBlocks,
                     long totLength,
                     long maxSize,
                     long minSizeNode,
                     long minSizeRack,
                     List<InputSplit> splits                     
                    ) {

    //保存当前切片所包含的数据块
    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();

    //保存当前切片的大小
    long curSplitSize = 0;
    
    int totalNodes = nodeToBlocks.size();
    long totalLength = totLength;

    Multiset<String> splitsPerNode = HashMultiset.create();
    Set<String> completedNodes = new HashSet<String>();
    
    while(true) {
      // it is allowed for maxSize to be 0. Disable smoothing load for such cases

      //逐个节点(数据块)形成切片
      // process all nodes and create splits that are local to a node. Generate
      // one split per node iteration, and walk over nodes multiple times to
      // distribute the splits across nodes. 
      for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
          .entrySet().iterator(); iter.hasNext();) {
        Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
        
        String node = one.getKey();
        
        // Skip the node if it has previously been marked as completed.
        if (completedNodes.contains(node)) {
          continue;
        }

        Set<OneBlockInfo> blocksInCurrentNode = one.getValue();

        // for each block, copy it into validBlocks. Delete it from
        // blockToNodes so that the same block does not appear in
        // two different splits.
        Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
        while (oneBlockIter.hasNext()) {
          OneBlockInfo oneblock = oneBlockIter.next();
          
          // Remove all blocks which may already have been assigned to other
          // splits.
          if(!blockToNodes.containsKey(oneblock)) {
            oneBlockIter.remove();
            continue;
          }
        
          validBlocks.add(oneblock);
          blockToNodes.remove(oneblock);
          curSplitSize += oneblock.length;

          // if the accumulated split size exceeds the maximum, then
          // create this split.

          //如果数据块累积大小大于或等于maxSize,则形成一个切片
          if (maxSize != 0 && curSplitSize >= maxSize) {
            // create an input split and add it to the splits array
            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
            totalLength -= curSplitSize;
            curSplitSize = 0;

            splitsPerNode.add(node);

            // Remove entries from blocksInNode so that we don't walk these
            // again.
            blocksInCurrentNode.removeAll(validBlocks);
            validBlocks.clear();

            // Done creating a single split for this node. Move on to the next
            // node so that splits are distributed across nodes.
            break;
          }

        }
        if (validBlocks.size() != 0) {
          // This implies that the last few blocks (or all in case maxSize=0)
          // were not part of a split. The node is complete.
          
          // if there were any blocks left over and their combined size is
          // larger than minSplitNode, then combine them into one split.
          // Otherwise add them back to the unprocessed pool. It is likely
          // that they will be combined with other blocks from the
          // same rack later on.
          // This condition also kicks in when max split size is not set. All
          // blocks on a node will be grouped together into a single split.

          // 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
       // 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理

          if (minSizeNode != 0 && curSplitSize >= minSizeNode
              && splitsPerNode.count(node) == 0) {
            // haven't created any split on this machine. so its ok to add a
            // smaller one for parallelism. Otherwise group it in the rack for
            // balanced size create an input split and add it to the splits
            // array
            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
            totalLength -= curSplitSize;
            splitsPerNode.add(node);
            // Remove entries from blocksInNode so that we don't walk this again.
            blocksInCurrentNode.removeAll(validBlocks);
            // The node is done. This was the last set of blocks for this node.
          } else {
            // Put the unplaced blocks back into the pool for later rack-allocation.
            for (OneBlockInfo oneblock : validBlocks) {
              blockToNodes.put(oneblock, oneblock.hosts);
            }
          }
          validBlocks.clear();
          curSplitSize = 0;
          completedNodes.add(node);
        } else { // No in-flight blocks.
          if (blocksInCurrentNode.size() == 0) {
            // Node is done. All blocks were fit into node-local splits.
            completedNodes.add(node);
          } // else Run through the node again.
        }
      }

      // Check if node-local assignments are complete.
      if (completedNodes.size() == totalNodes || totalLength == 0) {
        // All nodes have been walked over and marked as completed or all blocks
        // have been assigned. The rest should be handled via rackLock assignment.
        LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
            + completedNodes.size() + ", size left: " + totalLength);
        break;
      }
    }
    //逐个机架(数据块)形成切片
    // if blocks in a rack are below the specified minimum size, then keep them
    // in 'overflow'. After the processing of all racks is complete, these 
    // overflow blocks will be combined into splits.
    //overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块
    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
    Set<String> racks = new HashSet<String>();

    // Process all racks over and over again until there is no more work to do.
    while (blockToNodes.size() > 0) {

      // Create one split for this rack before moving over to the next rack. 
      // Come back to this rack after creating a single split for each of the 
      // remaining racks.
      // Process one rack location at a time, Combine all possible blocks that
      // reside on this rack as one split. (constrained by minimum and maximum
      // split size).

      //依次处理每个机架 
      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
           rackToBlocks.entrySet().iterator(); iter.hasNext();) {

        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
        racks.add(one.getKey());
        List<OneBlockInfo> blocks = one.getValue();

        // for each block, copy it into validBlocks. Delete it from 
        // blockToNodes so that the same block does not appear in 
        // two different splits.
        boolean createdSplit = false;

        //依次处理该机架的每个数据块
        for (OneBlockInfo oneblock : blocks) {
          if (blockToNodes.containsKey(oneblock)) {
            validBlocks.add(oneblock);
            blockToNodes.remove(oneblock);
            curSplitSize += oneblock.length;
      
            // if the accumulated split size exceeds the maximum, then 
            // create this split.如果数据块累积大小大于或等于maxSize,则形成一个切片
            if (maxSize != 0 && curSplitSize >= maxSize) {
              // create an input split and add it to the splits array
              addCreatedSplit(splits, getHosts(racks), validBlocks);
              createdSplit = true;
              break;
            }
          }
        }

        // if we created a split, then just go to the next rack
        if (createdSplit) {
          curSplitSize = 0;
          validBlocks.clear();
          racks.clear();
          continue;
        }

        if (!validBlocks.isEmpty()) {

          //如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片
          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
            // if there is a minimum size specified, then create a single split
            // otherwise, store these blocks into overflow data structure
            addCreatedSplit(splits, getHosts(racks), validBlocks);
          } else {
            // There were a few blocks in this rack that 
            // remained to be processed. Keep them in 'overflow' block list. 
            // These will be combined later.
  
            //如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks
            overflowBlocks.addAll(validBlocks);
          }
        }
        curSplitSize = 0;
        validBlocks.clear();
        racks.clear();
      }
    }

    assert blockToNodes.isEmpty();
    assert curSplitSize == 0;
    assert validBlocks.isEmpty();
    assert racks.isEmpty();

    //遍历并累加剩余数据块
    for (OneBlockInfo oneblock : overflowBlocks) {
      validBlocks.add(oneblock);
      curSplitSize += oneblock.length;

      // This might cause an exiting rack location to be re-added,
      // but it should be ok.
      for (int i = 0; i < oneblock.racks.length; i++) {
        racks.add(oneblock.racks[i]);
      }

      // if the accumulated split size exceeds the maximum, then 
      // create this split.
      // 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片
      if (maxSize != 0 && curSplitSize >= maxSize) {
        // create an input split and add it to the splits array
        addCreatedSplit(splits, getHosts(racks), validBlocks);
        curSplitSize = 0;
        validBlocks.clear();
        racks.clear();
      }
    }

    //剩余数据块形成一个切片
    if (!validBlocks.isEmpty()) {
      addCreatedSplit(splits, getHosts(racks), validBlocks);
    }
  }

参考文献:
解读:CombineFileInputFormat类

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,295评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,928评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,682评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,209评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,237评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,965评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,586评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,487评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,016评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,136评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,271评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,948评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,619评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,139评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,252评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,598评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,267评论 2 358