编程环境:
Ubuntu16.4 uklin
Hadoop3.2.0
openjdk version "1.8.0_191"
完整代码已经更新至GitHub,欢迎fork~GitHub链接
声明:创作不易,未经授权不得复制转载
statement:No reprinting without authorization
二、在本地编写程序和调试
1、mapper设计:
输入:
<string line> ---------读入文档的每行字符串
处理过程1:
<进行token,正则化去除英文数字外的字符,转为小写,利用空格符分词> ----------得到一个个独立的英文单词
处理过程2:
<得到文档的文件名,加在每隔单词的后面结合成输出的key值(中间用特殊字符分隔),这样设计方便统计每个单词在每篇文档中的词频信息>
处理过程3:
<将每个key对应的value值设为“1”>
输出:
<<key1,1>,<key2,1>,<key3,1>...>
示例:
//倒排索引mapper类
public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
private static Text keyInfo = new Text();// 存储单词和文档名组合 eg: hello
private static final Text valueInfo = new Text("1");// 存储词频,初始化为1
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//去除标点token操作
line = line.replaceAll("[^a-zA-Z0-9]", " ");
line = line.replaceAll("\\s{2,}", " ").trim();
line = line.toLowerCase();
String[] fields = line.split(" ");// 得到字段数组
FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到这行数据所在的文件切片
String fileName = fileSplit.getPath().getName();// 根据文件切片得到文件名
for (String field : fields) {
if(field!=null){
// key值由单词和URL组成,如“MapReduce:file1”
keyInfo.set(field + "," + fileName);
context.write(keyInfo, valueInfo);
}
}
}
}
2、Combine设计
通过一个Reduce过程无法同时完成词频统计和生成文档列表,所以必须增加一个Combine过程完成词频统计
输入:
<key,valuelist<1...>> -----eg:<word+’,’+filename, <1,1,1>>
处理过程:
<根据特殊的字符将key进行拆分,将key设置为单词,并统计词频信息,将value list中的每个1相加,将文档名和词频信息组合成新的value输出,同样用特殊的字符分隔>
输出:
<newKey,newValue> --------eg:<word, filename+’,’+countNumber>
//倒排索引combiner类
public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private static Text info = new Text();
// 输入: <MapReduce:file3 {1,1,...}>
// 输出:<MapReduce file3:2>
@Override //伪代码,表示重写 系统可以帮你检查正确性
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;// 统计词频
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(",");
// 重新设置 value 值由 URL 和词频组成
info.set(key.toString().substring(splitIndex + 1) + "," + sum);
// 重新设置 key 值为单词
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
key-value经过map和combine后的变化示例:
3、停用词处理设计
设计字符串使用config.set()进行传递,在程序map-reduce工作前设计方法,得到停用词列表存储入string sword中,而后在reducer中重载setup函数,config.get()函数取出停用词:
public static String catStopWords(Configuration conf, String remoteFilePath) {
Path remotePath = new Path(remoteFilePath);
//String Swords[] = new String[100];
//ArrayList<String> strArray = new ArrayList<String> ();
String sword = "";
try (FileSystem fs = FileSystem.get(conf);
FSDataInputStream in = fs.open(remotePath);
BufferedReader d = new BufferedReader(new InputStreamReader(in));) {
String line;
while ((line = d.readLine()) != null) {
line = line.replaceAll("[^a-zA-Z0-9]", "");
if(line!=null)
sword+=line+",";
//strArray.add(line);
}
} catch (IOException e) {
e.printStackTrace();
}
return sword;
}
4、reducer设计:
经过combiner之后,Reduce过程只需将相同key值的value值组合成倒排索引文件所需的格式即可,利用value中的词频信息,分割相加得到total的词频数,注意对于传入的key值,如果在停用词列表中出现,则不将其输出写入context中,剩下的事情就可以直接交给MapReduce框架进行处理了。
示例输出:
//倒排索引reducer类
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
private static Text result = new Text();
private static String[] fields;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
try {
//从全局配置获取配置参数
Configuration conf = context.getConfiguration();
String Str = conf.get("swords"); //这样就拿到了
fields = Str.split(",");// 得到字段数组
} catch (Exception e) {
e.printStackTrace();
}
}
// 输入:<MapReduce file3,2>
// 输出:<MapReduce file1,1;file2,1;file3,2;>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 生成文档列表
String fileList = new String();
int totalNum = 0;
for (Text value : values) {
String va = value.toString();
int index = va.indexOf(',');
String subva = va.substring(index+1);
int num = Integer.valueOf(subva);
totalNum += num;
fileList += "<" + va + ">;";
}
fileList += "<total," + String.valueOf(totalNum)+">.";
result.set(fileList);
//去除停用词
String k = key.toString();
k = k.replaceAll("[^a-z0-9]", "");
if(k!=null){
boolean tag = true;
for(String tmp:fields){
//System.out.println(tmp);
if(tmp.equals(k)){
tag = false;
break;
}
}
if(tag){
context.write(key, result);
}
}
}
}