MapReduce模型
MapReduce采用“分而治之”策略,一个大规模数据集进行分片,多个Map任务并行处理。实现“计算向数据靠拢”理念,而不比大量移动数据造成网络开销。
MapReduce采用Master/Slave架构,一个Master,若干Slave。Master运行JobTracker负责作业调度,Slave运行TaskTracker负责具体作业处理。
JobTracker
1、负责任务调度与资源监控。
2、监控Job和TaskTracker的健康状态,一旦失败,相应任务就要发生转移。
3、跟踪任务进度,汇报给调度器,调度器根据在资源空闲时,分配合适的任务。
TaskTracker
1、定期使用“心跳”向JobTracker报告任务进度,同时接受新任务。
2、使用“slot”等量划分资源,调度的基本单位,一个Task只有拥有一个“slot”才能执行,调度器就是把空闲的“slot”分配给Task,分为Map slot和Reduce slot。
Task
分为Map Task和Reduce Task,都由TaskTracker启动。
MapReduce执行过程
InputFormat对HDFS中的数据进行加载,进行split(逻辑分片,HDFS中的Block是物理分片),RR(RecordReader)将各个分片的数据从HDFS中读取出来以键值对输出作为Map函数(用户程序自己编写的逻辑)进行输入,输出中间结果进行Shufflc,传给Reduce函数输出最终结果。
Split
逻辑上进行分片,分片的依据用户可以自定义,但分片的数量决定了Map任务的数量,理想分片是HDFS的块。Reduce任务的数量通常是比集群中Reduce slot槽的总量略小一点。
Shufflc
分为Map端Shufflc和Reduce端Shufflc
Map端Shufflc
每个任务配一个缓存,溢写比例0.8
1、分区默认采用哈希函数
2、排序是默认操作
3、合并不能改变最终结果,不一定发生
4、Map任务全部结束前对溢写的文件(大于预定值可以再次合并)进行归并,得到一个大的本地文件
5、JobTracker会检测Map任务进度,通知Reduce任务来处理数据
Reduce端Shufflc
来自不同Map机器的数据先写入缓存,归并数据,对溢写文件进行归并,输入给Reduce任务。数据小的话不发生溢写直接给Reduce。
MapReduce编程(重写map和reduce任务,实现词频统计。)
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
/**
*Reduce类
**/
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
/**
*Map类
**/
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
}
版权声明:本文为CSDN博主「隔壁阿源」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_41768073/java/article/details/82830833