大数据学习之Hadoop——10MapReduce实现Reduce Join(多个文件联合查询)

欢迎关注我的CSDN: https://blog.csdn.net/bingque6535

一. MapReduce Join

对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。
如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。

1. 思路

1. reduce join

在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。这种方法有2个问题:

  1. map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
  2. reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
2. map join

两份数据中,如果有一份数据比较小,小数据全部加载到内存,按关键字建立索引。大数据文件作为map的输入文件,对map()函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组的,并且连接好了的数据。

这种方法,要使用hadoop中的DistributedCache把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。这种方法有明显的局限性:

  1. 有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。

3. 使用内存服务器,扩大节点的内存空间

针对map join,可以把一份数据存放到专门的内存服务器,在map()方法中,对每一个<key,value>的输入对,根据key到内存服务器中取出数据,进行连接

4. 使用BloomFilter过滤空连接的数据

对其中一份数据在内存中建立BloomFilter,另外一份数据在连接之前,用BloomFilter判断它的key是否存在,如果不存在,那这个记录是空连接,可以忽略。

5. 使用mapreduce专为join设计的包

在mapreduce包里看到有专门为join设计的包,对这些包还没有学习,不知道怎么使用,只是在这里记录下来,作个提醒。

  1. jar: mapreduce-client-core.jar
  2. package: org.apache.hadoop.mapreduce.lib.join

二. 代码实现

1. 问题:

  1. 输入文件格式说明:

    1. movies.csv


      在这里插入图片描述
    2. ratting.csv


      在这里插入图片描述
  2. 输出文件格式说明:


    在这里插入图片描述

2. 实现reduce join

题目: 将两个文件(movies.csv 和 ratings.csv)中形同电影ID的数据合并成一条数据

  1. Driver端

    package com.hjf.mr.movie;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-18 14:09
     */
    public class MovieDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(MovieDriver.class);
    
            job.setMapperClass(MovieMapper.class);
            job.setReducerClass(MovieReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            Path inputPath1 = new Path("./Data/movie/movies.csv");
            Path inputPath2 = new Path("./Data/movie/ratings.csv");
            Path outputPath = new Path("./Data/result");
    
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputPath)) {
                fs.delete(outputPath, true);
            }
    
            FileInputFormat.setInputPaths(job, inputPath1, inputPath2);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            job.waitForCompletion(true);
            
        }
    }
    
    
  2. Mapper 端

    package com.hjf.mr.movie;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-18 14:12
     */
    public class MovieMapper extends Mapper<LongWritable, Text, Text, Text> {
        Text outKey = new Text();
        Text outValue = new Text();
        StringBuilder sb = new StringBuilder();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 获取文件信息: 文件路径/文件名:0+XXX
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            // 获取文件名
            String fileName = inputSplit.getPath().getName();
            String[] split = value.toString().split(",");
            sb.setLength(0);
    
            /**
             * movies.csv文件中每条数据的格式:
             *  movieId     title               genres
             *  1           Toy Story (1995)    Adventure|Animation|Children|Comedy|Fantasy
             */
            if (fileName.equals("movies.csv")) {
                // movieId 作为key
                outKey.set(split[0]);
    
                // title    genres
                StringBuilder append = sb.append(split[1]).append("\t").append(split[2]);
                String str = "movies#" + append.toString();
                // "movie#title  genres" 作为value
                outValue.set(str);
    
                context.write(outKey, outValue);
    
            } else if(fileName.equals("ratings.csv")) {
                /**
                 * ratings.csv文件中每条数据的格式:
                 *  userId  movieId     rating  timestamp
                 *  1       1           4       964982703
                 */
                // movieId 作为key
                outKey.set(split[1]);
                // userId   rating  timestamp
                StringBuilder append = sb.append(split[0]).append("\t").append(split[2]).append("\t").append(split[3]);
                String str = "ratings#" + append.toString();
                // "ratings#userId  rating  timestamp" 作为value
                outValue.set(str);
    
                context.write(outKey, outValue);
            }           
        }
    }
    
    
  3. Reducer 端

    package com.hjf.mr.movie;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author Jiang锋时刻
     * @create 2020-05-18 15:20
     */
    public class MovieReducer extends Reducer<Text, Text, Text, Text> {
        List<String> moviesList = new ArrayList<>();
        List<String> ratingsList = new ArrayList<>();
        Text outValue = new Text();
    
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text value: values){
                // 将value中以"movies#"开头的放在moviesList列表中
                if (value.toString().startsWith("movies#")) {
                    String string = value.toString().split("#")[1];
                    moviesList.add(string);
                } else if (value.toString().startsWith("ratings#")){
                    // 将value中以"ratings#"开头的放在ratingsList列表中
                    String string = value.toString().split("#")[1];
                    ratingsList.add(string);
                }
            }
    
            // 获取两个集合的长度
            int moviesSize = moviesList.size();
            int ratingsSize = ratingsList.size();
            // 将相同movieId 的两个表中的数据进行合并
            for (int i = 0; i < moviesSize; i++) {
                for (int j = 0; j < ratingsSize; j++) {
                    outValue.set(moviesList.get(i) + "\t" + ratingsList.get(j));
                    context.write(key, outValue);
                }
            }
            // 清空列表
            moviesList.clear();
            ratingsList.clear();
        }
    }
    
    

3. 实现map join

题目: 求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

编写的代码运行不了, 等有时间了再好好分析

欢迎关注我的CSDN: https://blog.csdn.net/bingque6535

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354