18.例:MapReduce 案例之倒排索引

1. 倒排索引

倒排索引是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。 它主要是用来存储某个单词(或词组) 在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引( Inverted Index)。

2. 实例描述

通常情况下,倒排索引由一个单词(或词组)以及相关的文档列表组成,文档列表中的文档或者是标识文档的 ID 号,或者是指文档所在位置的 URL。如下图所示:

image

从上图可以看出,单词 1 出现在{文档 1,文档 5,文档 13, ……}中,单词 2 出现在{文档 2,文档 3,文档 5, ……}中,而单词 3 出现在{文档 2,文档 10,文档 16, ……}中。在实际应用中,还需要给每个文档添加一个权值,用来指出每个文档与搜索内容的相关度,如下图所示:

image

最常用的是使用词频作为权重,即记录单词在文档中出现的次数。以英文为例,如下图所示,索引文件中的“ MapReduce”一行表示:“ MapReduce”这个单词在文本 T0 中 出现过 1 次,T1 中出现过 1 次,T2 中出现过 2 次。

image

3. 设计思路

3.1 Map过程

首先使用默认的 TextInputFormat 类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然, Map 过程首先必须分析输入的key/value对,得到倒排索引中需要的三个信息:单词、文档 URL 和词频,如下图所示。

image

这里存在两个问题:第一, key/value对只能有两个值,需要根据情况将其中两个值合并成一个值,作为 key 或 value 值;
第二,通过一个 Reduce 过程无法同时完成词频统计和生成文档列表,所以必须增加一个 Combine 过程完成词频统计。
这里将单词和 URL 组成 key 值(如“ MapReduce: file1.txt”),将词频作为value,这样做的好处是可以利用 MapReduce 框架自带的Map 端排序,将同一文档的相同单词的词频组成列表,传递给 Combine 过程,实现类似于 WordCount 的功能。

3.2 Combine 过程

经过 map 方法处理后, Combine 过程将 key 值相同 value 值累加,得到一个单词在文档中的词频。 如果直接将图所示的输出作为 Reduce 过程的输入,在 Shuffle 过程时将面临一个问题:所有具有相同单词的记录(由单词、 URL 和词频组成)应该交由同一个Reducer 处理,但当前的 key 值无法保证这一点,所以必须修改 key 值和 value 值。这次将单词作为 key 值, URL 和词频组成 value 值(如“ file1.txt: 1”)。这样做的好处是可以利用 MapReduce 框架默认的 HashPartitioner 类完成 Shuffle 过程,将相同单词的所有记录发送给同一个 Reducer 进行处理。

image

3.3 Reduce 过程

经过上述两个过程后, Reduce 过程只需将相同 key 值的 value 值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给 MapReduce 框架进行处理了。

image

3.4 程序代码

  • pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.itcast</groupId>
  <artifactId>invertedIndex</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>invertedIndex</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.6.4</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <classpathPrefix>lib/</classpathPrefix>
              <mainClass>cn.itcast.hadoop.mrwc.WordCountDriver</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

  • Map程序

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

    private static Text keyInfo = new Text();// 存储单词和 URL 组合  
    private static final Text valueInfo = new Text("1");// 存储词频,初始化为1  

    @Override  
    protected void map(LongWritable key, Text value, Context context)  
            throws IOException, InterruptedException {  

        String line = value.toString();  
        String[] fields = line.split(" ");// 得到字段数组  

        FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到这行数据所在的文件切片  
        String fileName = fileSplit.getPath().getName();// 根据文件切片得到文件名  

        for (String field : fields) {  
            // key值由单词和URL组成,如“MapReduce:file1”  
            keyInfo.set(field + ":" + fileName);  
            context.write(keyInfo, valueInfo);  
        }  
    }  

}

  • combine程序

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{

    private static Text info = new Text();  

    // 输入: <MapReduce:file3 {1,1,...}>  
    // 输出:<MapReduce file3:2>  
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException {  
        int sum = 0;// 统计词频  
        for (Text value : values) {  
            sum += Integer.parseInt(value.toString());  
        }  

        int splitIndex = key.toString().indexOf(":");  
        // 重新设置 value 值由 URL 和词频组成  
        info.set(key.toString().substring(splitIndex + 1) + ":" + sum);  
        // 重新设置 key 值为单词  
        key.set(key.toString().substring(0, splitIndex));  

        context.write(key, info);  
    }  

}

  • reduce程序

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
    private static Text result = new Text();  

    // 输入:<MapReduce file3:2>  
    // 输出:<MapReduce file1:1;file2:1;file3:2;>  
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException {  
        // 生成文档列表  
        String fileList = new String();  
        for (Text value : values) {  
            fileList += value.toString() + ";";  
        }  

        result.set(fileList);  
        context.write(key, result);  
    }  

}

  • 主程序

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndexRunner {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(InvertedIndexRunner.class);

        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\ziliao\\data\\InvertedIndex\\input"));
        // 指定处理完成之后的结果所保存的位置
        FileOutputFormat.setOutputPath(job, new Path("D:\\ziliao\\data\\InvertedIndex\\output"));

        // 向 yarn 集群提交这个 job
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);
    }

}

按权重排序


/**
 * Created by Administrator on 2018/8/15.
 */
public class FileCount implements Comparable<FileCount> {

    private String filename;
    private long count;

    //按照总流量倒序排
    public int compareTo(FileCount bean) {
        return bean.count>this.count?1:-1;
    }

    public FileCount(String filename, long count) {
        this.filename = filename;
        this.count = count;
    }

    @Override
    public String toString() {
        return filename + ":" + count;
    }

}

新reduce程序

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
    private static Text result = new Text();

    // 输入:<MapReduce file3:2>
    // 输出:<MapReduce file1:1;file2:1;file3:2;>
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // 生成文档列表
        String fileList = new String();
        List<FileCount> FileCountList = new ArrayList<FileCount>();

        for (Text value : values) {
            String[] arr = value.toString().split(":");
            FileCount FileCount = new FileCount(arr[0],Long.parseLong(arr[1]));
            FileCountList.add(FileCount);
        }

        Collections.sort(FileCountList);

        for(FileCount FileCount : FileCountList)
        {
            fileList += FileCount.toString() + ";";
        }
        result.set(fileList);
        context.write(key, result);
    }

}

谢谢

也可以在Reduce类里利用TreeMap直接排序

//Mapper类
public class WordcountMapper extends Mapper<LongWritable,Text,Text,Text>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        String name = inputSplit.getPath().getName();
        String[] split = value.toString().split(" ");
        for(String s:split){
            context.write(new Text(s+"#"+name),new Text("1"));
        }
    }
}
//Combiner类
public class WordcountCombiner extends Reducer<Text, Text, Text, Text>{
    //<mapreduce#a.txt,List(1,1)>
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int sum=0;
        for(Text t:values){
            sum+=Integer.parseInt(t.toString());
        }
        String[] split = key.toString().split("#");
        String word=split[0];
        String path=split[1];
        context.write(new Text(word),new Text(path+":"+sum));
    }
}
//Reducer类
public class WordcountReducer extends Reducer<Text, Text, Text, Text>{
    //<mapreduce,List(a.txt:1,c.txt:2)>
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String str="";
        Map<Integer,String> map=new TreeMap<>(new Comparator<Integer>() {
            //自定义TreeMap的排序规则(降序排列),并且如果Key一样我们不返回0,这样当key相同时候value不会被覆盖(HashMap默认以HashCode和equels方法验证key是否重复,但是treemap以Compare方法)
            @Override
            public int compare(Integer o1, Integer o2) {
                if((o1-o2)>0){
                    return -1;
                }else if((o1-o2)<0){
                    return 1;
                }else {
                    return 1;
                }
            }
        });
        for(Text t:values){
            String[] split = t.toString().split(":");
            map.put(Integer.parseInt(split[1]),split[0]);
        }
        for(Map.Entry<Integer,String> i:map.entrySet()){
            str+=i.getValue()+":"+i.getKey()+",";
        }
        String substring = str.substring(0, str.length()-1);

        context.write(key,new Text(substring));
    }
}
//Driver类
public class WordcountDriver {

    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "root") ;
        System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
        if (args == null || args.length == 0) {
            return;
        }
        FileUtil.deleteDir(args[1]);
        //该对象会默认读取环境中的 hadoop 配置。当然,也可以通过 set 重新进行配置
        Configuration conf = new Configuration();

        //job 是 yarn 中任务的抽象。
        Job job = Job.getInstance(conf);

        /*job.setJar("/home/hadoop/wc.jar");*/
        //指定本程序的jar包所在的本地路径
        job.setJarByClass(WordcountDriver.class);

        //指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        //指定mapper输出数据的kv类型。需要和 Mapper 中泛型的类型保持一致
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //指定最终输出的数据的kv类型。这里也是 Reduce 的 key,value类型。
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setCombinerClass(WordcountCombiner.class);
        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        /*job.submit();*/
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容