MapTask并行度机制
Map阶段的并行取决于切片
FileInputFormat当中有getSplits方法,当中有compute
Math.max(minSize, Math.min(maxSize, blockSize));
blockSize在Hadoop2中默认是128M,minSize默认值是1。返回的就是blockSize,128M。这是默认的。
如果调整参数,使得maxSize比blockSize小的话,那么切片就会变小。minSize比blockSize大,切片比blockSize大。
但是,不论怎么调参数,都不能让多个小文件“划入”一个 split。
TextInputFormat源码:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
public TextInputFormat() {
}
public void configure(JobConf conf) {
this.compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
CompressionCodec codec = this.compressionCodecs.getCodec(file);
return null == codec ? true : codec instanceof SplittableCompressionCodec;
}
public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit)genericSplit, recordDelimiterBytes);
}
}
从上面3个图可以看到,InputFormat是一个接口,FileInputFormat抽象类实现了InputFormat接口,然后TextInputFormat类继承了FileInputFormat。
TextInputFormat类中有一个getRecordReader方法,返回了一个LineRecordReader。
最终就是由LineRecordReader这个组件来一行行读取数据的。
读取之后将一行封装成一个<K, V>,K是这一行的起始偏移量,V是这一行的内容。读一行传给一次map,map就会源源不断的对数据进行处理。
按说处理结束之后就将数据写到磁盘当中,但是为了减少IO次数,先将要写入磁盘的数据写入内存缓冲区,然后由内存缓冲区分批次写到磁盘中。
从图中可以看到,向内存缓冲区中写的是分区组件,但是默认情况下只有一个ReduceTask,虽然具有这个组件,但是不会生效。对1进行取模都是0,没有分区。所以当对ReduceTask设置,>=2才会生效。
这个内存缓冲区实际上就是数组,这个数组是有限制的,默认是100M,向里写满就会溢出。溢出比0.8。也就是说如果是默认的话,写到超过80M的时候,就会向磁盘写。之所以要留出一部分空间,是为了后边的数据可以正常的向内存缓冲区中写数据。
spiller:溢出线程。
溢出这个过程是至少执行一次的。
溢出的过程中有sort这个行为,溢出一次排序一次,所以只能保证每个溢出的文件是排序好的。最后merge成一个大文件的时候还要进行排序。即把最终在磁盘上的多个溢出文件合并成一个最终完整文件。
这个分区且排序的文件会有一个索引文件,这个索引文件中会记录偏移量offset。这时就等着ReduceTask来拉取。
Combiner会影响到求均值,中位数,默认是没有的。
map调用多少次取决于文件有多少行。