Hadoop6- MapReduce join

Hadoop MapReduce join

MapReduce提供了表连接操作其中包括Map端joinReduce端join还有半连接,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

1.map join

基本思路:

(1):需要join的两个文件,一个存储在HDFS中,一个使用DistributedCache.addCacheFile()将需要join的另外一个文件加入到所有Map缓存中。

(2):在Map函数里读取该文件,进行join

(3):将结果输出到reduce

(4):DistributedCache.addCacheFile()需要在作业提交前设置。

什么是DistributedCache?

DistributedCache是为了方便用户进行应用程序开发而设计的文件分发工具。它能够将只读的外部文件进行自动分发到各个节点上进行本地缓存,以便task运行时加载。

DistributedCache的使用步骤

(1):在HDFS中上传文件(文本文件、压缩文件、jar包等)

(2):调用相关API添加文件信息

(3):task运行前直接调用文件读写API获取文件。

常见API:

DistributedCache.addCacheFile();

DistributedCache.addCacheArchive();

下面我们通过一个示例来深入体会Map端join。

表一:tb_a数据如下

name    sex age depNo  
zhang   male    20  1  
li  female  25  2  
wang    female  30  3  
zhou    male    35  2  

表二:tb_b数据如下

depNo   depName  
1   sales  
2   Dev  
3   Mgt  

注意:在Map端join操作中,我们往往将较小的表添加到内存中,因为内存的资源是很宝贵的,这也说明了另外一个问题,那就是如果表的数据量都非常大则不适合使用Map端join。

public class MyMapJoin {  
    // 定义输入路径  
    private static String INPUT_PATH1 = "";  
    //加载到内存的表的路径  
    private static String INPUT_PATH2 = "";  
    // 定义输出路径  
    private static String OUT_PATH = "";  
  
    public static void main(String[] args) {  
  
        try {  
            // 创建配置信息  
            Configuration conf = new Configuration();  
            // 获取命令行的参数  
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
            // 当参数违法时,中断程序  
            if (otherArgs.length != 3) {  
                System.err.println("Usage:MyMapJoin<in1> <in2> <out>");  
                System.exit(1);  
            }  
  
            // 给路径赋值  
            INPUT_PATH1 = otherArgs[0];  
            INPUT_PATH2 = otherArgs[1];  
            OUT_PATH = otherArgs[2];  
            // 创建文件系统  
            FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
            // 如果输出目录存在,我们就删除  
            if (fileSystem.exists(new Path(OUT_PATH))) {  
                fileSystem.delete(new Path(OUT_PATH), true);  
            }  
            // 添加到内存中的文件(随便添加多少个文件)  
            DistributedCache.addCacheFile(new Path(INPUT_PATH2).toUri(), conf);  
  
            // 创建任务  
            Job job = new Job(conf, MyMapJoin.class.getName());  
            // 打成jar包运行,这句话是关键  
            job.setJarByClass(MyMapJoin.class);  
            //1.1 设置输入目录和设置输入数据格式化的类  
            FileInputFormat.setInputPaths(job, INPUT_PATH1);  
            job.setInputFormatClass(TextInputFormat.class);  
  
            //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型  
            job.setMapperClass(MapJoinMapper.class);  
            job.setMapOutputKeyClass(NullWritable.class);  
            job.setMapOutputValueClass(Emp_Dep.class);  
  
            //1.3 设置分区和reduce数量  
            job.setPartitionerClass(HashPartitioner.class);  
            job.setNumReduceTasks(0);  
  
            FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
            // 提交作业 退出  
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    public static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Emp_Dep> {  
  
        private Map<Integer, String> joinData = new HashMap<Integer, String>();  
  
        @Override  
        protected void setup(Mapper<LongWritable, Text, NullWritable, Emp_Dep>.Context context) throws IOException, InterruptedException {  
            // 预处理把要关联的文件加载到缓存中  
            Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());  
            // 我们这里只缓存了一个文件,所以取第一个即可,创建BufferReader去读取  
            BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));  
  
            String str = null;  
            try {  
                // 一行一行读取  
                while ((str = reader.readLine()) != null) {  
                    // 对缓存中的表进行分割  
                    String[] splits = str.split("\t");  
                    // 把字符数组中有用的数据存在一个Map中  
                    joinData.put(Integer.parseInt(splits[0]), splits[1]);  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            } finally{  
                reader.close();  
            }  
  
        }  
  
        @Override  
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Emp_Dep>.Context context) throws IOException,  
                InterruptedException {  
            // 获取从HDFS中加载的表  
            String[] values = value.toString().split("\t");  
            // 创建Emp_Dep对象  
            Emp_Dep emp_Dep = new Emp_Dep();  
            // 设置属性  
            emp_Dep.setName(values[0]);  
            emp_Dep.setSex(values[1]);  
            emp_Dep.setAge(Integer.parseInt(values[2]));  
            // 获取关联字段depNo,这个字段是关键  
            int depNo = Integer.parseInt(values[3]);  
            // 根据depNo从内存中的关联表中获取要关联的属性depName  
            String depName = joinData.get(depNo);  
            // 设置depNo  
            emp_Dep.setDepNo(depNo);  
            // 设置depName  
            emp_Dep.setDepName(depName);  
  
            // 写出去  
            context.write(NullWritable.get(), emp_Dep);  
        }  
    }  
}  

2.reduce join

Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程。

基本思路

(1):Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的。

(2):在reduce处理函数中,按照标识对数据进行处理。

(3):然后根据Key去join来求出结果直接输出。

数据准备

准备好下面两张表:

(1):tb_a(以下简称表A)

  1. id name
  2. 1 北京
  3. 2 天津
  4. 3 河北
  5. 4 山西
  6. 5 内蒙古
  7. 6 辽宁
  8. 7 吉林
  9. 8 黑龙江

(2):tb_b(以下简称表B)

  1. id statyear num
  2. 1 2010 1962
  3. 1 2011 2019
  4. 2 2010 1299
  5. 2 2011 1355
  6. 4 2011 3574
  7. 4 2011 3593
  8. 9 2010 2303
  9. 9 2011 2347

#需求就是以id为key做join操作(注:上面的数据都是以制表符“\t”分割)

计算模型

整个计算过程是:

(1):在Map阶段,把所有数据标记成<key,value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于A的记录,value的值为"a#"+name;来源于B的记录,value的值为"b#"+score。

(2):在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终的结果。

如下图所示:

img

代码实现如下:

public class ReduceJoinTest {  
  
        // 定义输入路径  
        private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/table_join/tb_*";  
        // 定义输出路径  
        private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
  
        public static void main(String[] args) {  
  
            try {  
                // 创建配置信息  
                Configuration conf = new Configuration();  
                  
  
                // 创建文件系统  
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
                // 如果输出目录存在,我们就删除  
                if (fileSystem.exists(new Path(OUT_PATH))) {  
                    fileSystem.delete(new Path(OUT_PATH), true);  
                }  
  
                // 创建任务  
                Job job = new Job(conf, ReduceJoinTest.class.getName());  
  
                //1.1   设置输入目录和设置输入数据格式化的类  
                FileInputFormat.setInputPaths(job, INPUT_PATH);  
                job.setInputFormatClass(TextInputFormat.class);  
  
                //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
                job.setMapperClass(ReduceJoinMapper.class);  
                job.setMapOutputKeyClass(Text.class);  
                job.setMapOutputValueClass(Text.class);  
  
                //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
                job.setPartitionerClass(HashPartitioner.class);  
                job.setNumReduceTasks(1);  
  
                //1.4   排序  
                //1.5   归约  
                //2.1   Shuffle把数据从Map端拷贝到Reduce端。  
                //2.2   指定Reducer类和输出key和value的类型  
                job.setReducerClass(ReduceJoinReducer.class);  
                job.setOutputKeyClass(Text.class);  
                job.setOutputValueClass(Text.class);  
  
                //2.3   指定输出的路径和设置输出的格式化类  
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
                job.setOutputFormatClass(TextOutputFormat.class);  
  
  
                // 提交作业 退出  
                System.exit(job.waitForCompletion(true) ? 0 : 1);  
              
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
          
    public static  class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text>{  
        @Override  
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {  
              //获取输入文件的全路径和名称  
              FileSplit fileSplit = (FileSplit) context.getInputSplit();  
              String path = fileSplit.getPath().toString();  
                
              //获取输入记录的字符串  
              String line = value.toString();  
                
              //抛弃空记录  
              if (line == null || line.equals("")){  
                  return;  
              }  
                
              //处理来自tb_a表的记录  
              if (path.contains("tb_a")){  
                  //按制表符切割  
                  String[] values = line.split("\t");  
                  //当数组长度小于2时,视为无效记录  
                  if (values.length < 2){  
                      return;  
                  }  
                  //获取id和name  
                  String id = values[0];  
                  String name = values[1];  
                    
                  //把结果写出去  
                  context.write(new Text(id), new Text("a#" + name));  
              } else if (path.contains("tb_b")){  
                  //按制表符切割  
                  String[] values = line.split("\t");  
                  //当长度不为3时,视为无效记录  
                  if (values.length < 3){  
                      return;  
                  }  
                    
                  //获取属性  
                  String id = values[0];  
                  String statyear = values[1];  
                  String num = values[2];  
                    
                  //写出去  
                  context.write(new Text(id), new Text("b#" + statyear + "  " + num));  
              }  
              
        }  
          
        public static class ReduceJoinReducer extends Reducer<Text, Text, Text, Text>{  
              
            @Override  
            protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {  
                  
                //用来存放来自tb_a表的数据  
                Vector<String> vectorA = new Vector<String>();  
                //用来存放来自tb_b表的  
                Vector<String> vectorB = new Vector<String>();  
                  
                //迭代集合数据  
                for (Text val : values){  
                    //将集合中的数据对应添加到Vector中  
                    if (val.toString().startsWith("a#")){  
                        vectorA.add(val.toString().substring(2));  
                    } else if (val.toString().startsWith("b#")){  
                        vectorB.add(val.toString().substring(2));  
                    }  
                }  
                  
                //获取两个Vector集合的长度  
                int sizeA = vectorA.size();  
                int sizeB = vectorB.size();  
                  
                //遍历两个向量将结果写出去  
                for (int i=0; i<sizeA; i++){  
                    for (int j=0; j<sizeB; j++){  
                        context.write(key, new Text("   " + vectorA.get(i) + "  " + vectorB.get(j)));  
                    }  
                }  
                  
                  
            }  
        }  
    }  
}  

程序运行的结果:

img

semi join

SemiJoin,一般称为半连接,其原理是在Map端过滤掉一些不需要join的数据,从而大大减少了reduce和Shuffle的时间,因为我们知道,如果仅仅使用Reduce端连接,那么如果一份数据,存在大量的无效数据,而这些数据在join中并不需要,但是因为没有做过预处理,所以这些数据直到真正执行reduce函数时,才被定义为无效数据,但是这个时候已经执行过了Shuffle、merge还有sort操作,所以这部分无效的数据就浪费了大量的网络IO和磁盘IO,所以在整体来讲,这是一种降低性能的表现,如果存在的无效数据越多,那么这种趋势就越明显。之所以会出现半连接,这其实是reduce端连接的一个变种,只不过是我们在Map端过滤掉了一些无效的数据,所以减少了reduce过程的Shuffle时间,所以能获取一个性能的提升。

二:技术实现

(1):利用DistributedCache将小表分发到各个节点上,在Map过程的setup()函数里,读取缓存里的文件,只将小表的连接键存储在hashSet中。

(2):在map()函数执行时,对每一条数据进行判断,如果这条数据的连接键为空或者在hashSet里不存在,那么则认为这条数据无效,使条数据也不参与reduce的过程。

注:从以上步骤就可以发现,这种做法很明显可以提升join性能,但是要注意的是小表的key如果非常大的话,可能会出现OOM的情况,这时我们就需要考虑其他的连接方式了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容