1 切片与MapTask并行度决定机制
- MapTask并行度决定Map阶段的任务处理并发度,进而影响Job的处理速度
- MapTask并行度决定机制
数据块:blocks是hdfs在磁盘上对数据进行的划分;
数据切片:数据切片仅仅是在逻辑上对数据进行划分,不会在磁盘上对数据进改变
2.1 一个Job的Map的并行度由客户端在提交Job时的切片数决定
2.2 每的split切片分配一个MapTask并行实例处理
2.3 默认情况下,切片大小=blocksize
2.4 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
2. FileinputFormat
简单的按照文件长度和切片大小进行切片
例:file1 300M ,file2 10M;此时切片情况:file1.split1 0-128M,file1.split2 128-256M,file1.split3 256-320M,file2.split 0-10M。切成四片
切片大小的公式 MATH.max(minSize,Math.min(maxSize,blockSize));
切片大小 调大改变minSize调小改变maxSize
获取切片信息API:String Name = inputSplit.getPath().getName();
FileSplit inputSplit = (FileSplit) context.getInputSplit();
3. CombineTextInputFormat
用于小文件过多的场景,可以将多个小文件在逻辑上规划为一个大文件;将多个小文件交给一个大文件处理。
切片方法:
| 虚拟存储过程 | |
|---|---|
| a.txt 1.7M | 1.7M<4M 划分一块 |
| b.txt 5.1M | 5.1M>4M但是小于2* 4M 平均划分成两块 |
| c.txt 3.4M | 3.4M<4M 划分一块 |
| d.txt 3.4M | 6.8M>4M但是小于2* 4M 平均划分成两块 |
切片过程:(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize的值大于大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
4. TextInputFormat
TextInputFormat是FileInputFormat的默认实现类。按行读取记录;读取到的记录存在<k,v>中。
k:整个文件中每行起始字节偏移量,LongWriter类型。
v:每行的内容,不包括终止符(回车符、换行符),Text类型
5. KeyValueTextInputFormat
按照指定分隔符的形式,按行分割字符串。
如果一行当中存在多个指定分隔符,只有第一个有效。
相关代码public class KVmap extends Mapper<Text, Text, Text, IntWritable> { IntWritable v = new IntWritable(1); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, v); } }public class KVReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text k, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException { int sum = 0; for (IntWritable v : values) { sum +=v.get(); } IntWritable i = new IntWritable(sum); context.write(k, i); } }public class KVDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); Job job = Job.getInstance(conf); job.setJarByClass(KVDriver.class); job.setMapperClass(KVmap.class); job.setReducerClass(KVReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }输入文件信息:身边有人来有人走。 有人去去就回。 有人头也不回。 我会释怀。 我知道, 有人走了, 就会再有来人替代他。 多年后, 或许我还会记得, 有人在我的世界里出现过。 或许潮起潮落, 或许波澜未起。
结果:身边有人来有人走。 1
6. NLineInputFormat
如果使用NLineInputFormat,代表每个map处理inputsplit不再按block快去划分,而是按NLineInputFormat指定的行数来划分。输入文件的行数/N=切片数。
public class NLineInputFormatMap extends
Mapper<LongWritable, Text, Text, IntWritable>{
String[] strs;
IntWritable lon = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException
{
strs = value.toString().split(" ");
for (String str : strs) {
Text text = new Text(str);
context.write(text, lon );
}
}
}
public class NLineInputFormatReduce extends Reducer<Text,
IntWritable, Text, IntWritable>{
IntWritable value = new IntWritable();
@Override
protected void reduce(Text arg0,
Iterable<IntWritable> arg1,Context arg2) throws
IOException, InterruptedException {
int i = 0;
for (IntWritable IntWritable : arg1) {
i += IntWritable.get();
}
value.set(i);
arg2.write(arg0, value);
}
}
Configuration conf = new Configuration();
//1. 获取job对象
Job job = Job.getInstance(conf);
NLineInputFormat.setNumLinesPerSplit(job, 3);
//设置jar存储位置
job.setJarByClass(NLineInputFormatDriver.class);
//3.关联map和reduce
job.setMapperClass(NLineInputFormatMap.class);
job.setReducerClass(NLineInputFormatReduce.class);
//4.设置map阶段输出的key和value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终数据的输出形式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置文件的输入形式
job.setInputFormatClass(NLineInputFormat.class);
//6设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);