1.数据准备
2.上传HDFS
3.执行Mapreduce分布式并行计算
3.1业务逻辑处理。
业务理解:通俗理解,我先查到单词,或者是字符串,更具这字符串,我可这到记录单词或者字符串的地址,然后在统计词频。与正序索引截然相反.
先使用map函数对文件切分成若干个split,若干个split对应若干个map任务,比如在路径D:/text/test.txt.的文件,这里的value值就是url+单词或者字符串。key是每一行的起始位置。
经过map处理后,进行reduce的分组处理。此时的key便只是url,然后相同的key生成一个values集合。
然后在使用写reduce对其进行统计词频。即是对values集合中相同元素的数目求和。输出结果
3.2具体操作步骤。
写InvertedIndex类使其继承Mapper类重写它的Map方法,对其做map处理,在这里写出对文件的分割逻辑。
然后在InvertedIndex类里面写InvertedIndexCombiner类继承Reduce类实现分组。把具有相同的key的value放在一起。
接着在InvertedIndex类里面写InvertedIndexreduce类继承Reduce类覆盖reduce方法,实现对单词的计数,此时key是单词或者字符串,value是url和词频。
接着写一个主函数,主函数里面写一个job的驱动用来加载map和reduce函数。执行mapreduce分布式并行计算。并把生成的文件写入到指定文件下。
4运行结果
控制台输出
5.原代码
package sort;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndex {
public static classInvertedIndexMap extends Mapper{
private Text valueInfo =new Text();
private Text keyInfo =new Text();
private FileSplit split;
public void map(Objectkey, Text value,Context context)
throwsIOException, InterruptedException {
//获取对所属的FileSplit对象
split = (FileSplit)context.getInputSplit();
StringTokenizer stk =new StringTokenizer(value.toString());
while(stk.hasMoreElements()) {
//key值由(单词:URI)组成
keyInfo.set(stk.nextToken()+":"+split.getPath().toString());
//词频
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
public static classInvertedIndexCombiner extends Reducer{
Text info = new Text();
public void reduce(Textkey, Iterable values,Context contex)
throws IOException, InterruptedException {
int sum = 0;
for (Text value :values) {
sum +=Integer.parseInt(value.toString());
}
int splitIndex =key.toString().indexOf(":");
//重新设置value值由(URI+:词频组成)
info.set(key.toString().substring(splitIndex+1) +":"+ sum);
//重新设置key值为单词
key.set(key.toString().substring(0,splitIndex));
contex.write(key,info);
}
}
public static classInvertedIndexReduce extends Reducer{
private Text result = newText();
public void reduce(Textkey, Iterable values,Context contex)
throwsIOException, InterruptedException {
//生成文档列表
String fileList = newString();
for (Text value :values) {
fileList +=value.toString()+";";
}
result.set(fileList);
contex.write(key, result);
}
}
public static voidmain(String[] args) throws IOException, InterruptedException,ClassNotFoundException {
Configuration conf = newConfiguration();
Job job = newJob(conf,"InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, newPath("D:/text/test.txt"));
FileOutputFormat.setOutputPath(job,new Path("E:/out.txt"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
6.总结:
mapreduce的计算思想很简单,无非是先把任务分割,让多个tasktracker处理,然后tasktracker个自处理自己任务,等到把任务处理完,然后汇总给jobtracker,进行分组,计数排序等操作。整个流程中最难的也是最重要的,是业务理解,业务分析,还涉及到算法,比如这个例子中涉及的哈西算法。
所以mapreduce的精髓在于对业务的处理能力和思维逻辑能力。