判断待切部分是否 < maxSplitSize,如果小于整个作为 1 part
maxSplitSize <待切部分 < maxSplitSize * 2, 将待切部分均分为2 part
待切部分 > maxSplitSize * 2, 先切出 maxSplitSize作为一部分,再循环判断!
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
Configuration conf = job.getConfiguration();
// 通过setxxxSplitSize()方法设置的参数值会覆盖掉从配置文件中读取的参数值
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode +
" cannot be larger than maximum split size " +
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
throw new IOException("Minimum split size per rack " + minSizeRack +
" cannot be larger than maximum split size " +
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node " + minSizeNode +
" cannot be larger than minimum split " +
"size per rack " + minSizeRack);
List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
if (stats.size() == 0) {
return splits;
// 迭代为每个过滤池中的文件生成切片
for (MultiPathFilter onepool : pools) {
ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
FileStatus p = iter.next();
if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
//free up rackToNodes map
return splits;
无论是满足某过滤池实例 onePool 条件的文件,还是不属于任何过滤池的文件,可以笼统地理解为 "一批文件",getMoreSplits()就是为这一批文件生成切片的。
* Return all the splits in the specified set of paths
private void getMoreSplits(JobContext job, List<FileStatus> stats,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
Configuration conf = job.getConfiguration();
OneFileInfo[] files;
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
files = new OneFileInfo[stats.size()];
if (stats.size() == 0) {
* 迭代这"一批文件",为每一个文件构建OneFileInfo对象
* OneFileInfo对象在构建过程中维护了上述三个对应关系的信息。
* 迭代完成之后,即可以认为数据块、节点、机架相互之间的对应关系已经建立完毕
* 接下来可以根据这些信息生成切片
long totLength = 0;
int i = 0;
for (FileStatus stat : stats) {
files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
totLength += files[i].getLength();
createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
maxSize, minSizeNode, minSizeRack, splits);
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<OneBlockInfo, String[]> blockToNodes,
Map<String, List<OneBlockInfo>> rackToBlocks,
long totLength,
long maxSize,
long minSizeNode,
long minSizeRack,
List<InputSplit> splits
) {
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
long curSplitSize = 0;
int totalNodes = nodeToBlocks.size();
long totalLength = totLength;
Multiset<String> splitsPerNode = HashMultiset.create();
Set<String> completedNodes = new HashSet<String>();
while(true) {
// it is allowed for maxSize to be 0. Disable smoothing load for such cases
// process all nodes and create splits that are local to a node. Generate
// one split per node iteration, and walk over nodes multiple times to
// distribute the splits across nodes.
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
String node = one.getKey();
// Skip the node if it has previously been marked as completed.
if (completedNodes.contains(node)) {
Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
while (oneBlockIter.hasNext()) {
OneBlockInfo oneblock = oneBlockIter.next();
// Remove all blocks which may already have been assigned to other
// splits.
if(!blockToNodes.containsKey(oneblock)) {
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
curSplitSize = 0;
// Remove entries from blocksInNode so that we don't walk these
// again.
// Done creating a single split for this node. Move on to the next
// node so that splits are distributed across nodes.
if (validBlocks.size() != 0) {
// This implies that the last few blocks (or all in case maxSize=0)
// were not part of a split. The node is complete.
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
// This condition also kicks in when max split size is not set. All
// blocks on a node will be grouped together into a single split.
// 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
// 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理
if (minSizeNode != 0 && curSplitSize >= minSizeNode
&& splitsPerNode.count(node) == 0) {
// haven't created any split on this machine. so its ok to add a
// smaller one for parallelism. Otherwise group it in the rack for
// balanced size create an input split and add it to the splits
// array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
// Remove entries from blocksInNode so that we don't walk this again.
// The node is done. This was the last set of blocks for this node.
} else {
// Put the unplaced blocks back into the pool for later rack-allocation.
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
curSplitSize = 0;
} else { // No in-flight blocks.
if (blocksInCurrentNode.size() == 0) {
// Node is done. All blocks were fit into node-local splits.
} // else Run through the node again.
// Check if node-local assignments are complete.
if (completedNodes.size() == totalNodes || totalLength == 0) {
// All nodes have been walked over and marked as completed or all blocks
// have been assigned. The rest should be handled via rackLock assignment.
LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
+ completedNodes.size() + ", size left: " + totalLength);
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
// Create one split for this rack before moving over to the next rack.
// Come back to this rack after creating a single split for each of the
// remaining racks.
// Process one rack location at a time, Combine all possible blocks that
// reside on this rack as one split. (constrained by minimum and maximum
// split size).
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
rackToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
List<OneBlockInfo> blocks = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
boolean createdSplit = false;
for (OneBlockInfo oneblock : blocks) {
if (blockToNodes.containsKey(oneblock)) {
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.如果数据块累积大小大于或等于maxSize,则形成一个切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
createdSplit = true;
// if we created a split, then just go to the next rack
if (createdSplit) {
curSplitSize = 0;
if (!validBlocks.isEmpty()) {
if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a minimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure
addCreatedSplit(splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
curSplitSize = 0;
assert blockToNodes.isEmpty();
assert curSplitSize == 0;
assert validBlocks.isEmpty();
assert racks.isEmpty();
for (OneBlockInfo oneblock : overflowBlocks) {
curSplitSize += oneblock.length;
// This might cause an exiting rack location to be re-added,
// but it should be ok.
for (int i = 0; i < oneblock.racks.length; i++) {
// if the accumulated split size exceeds the maximum, then
// create this split.
// 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
curSplitSize = 0;
if (!validBlocks.isEmpty()) {
addCreatedSplit(splits, getHosts(racks), validBlocks);