(一)单表关联--mapreduce关联性操作

package mr;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   

public class MyGL {
    private static class MyGLMapper  extends  Mapper<LongWritable, Text, Text, Text>{  
        /*输入类型是LongWritable,Text(上下文);
                               输出类型是Text,Text(也就是Reduce的输入类型)*/
         public void map(LongWritable k1, Text v1, Context context)   
                             throws java.io.IOException, java.lang.InterruptedException
                                 //map()函数是固定模式的,三个参数
         {
             // 1 2
            String[]  lines= v1.toString().split("\t");  
                                              // \t 在同一个缓冲区内横向跳8个空格(Tab键);split()方法用于把一个字符串分割
                                              //成字符串数组;v1指的是一行,把第一行的两个单词存进lines
            if(lines.length!=2 || lines[0].trim().equals("child"))      
                return;    //child parent
        
            String word1=lines[0].trim();   //tom       ->去掉 lines[0]里面的空格符
            String word2=lines[1].trim();   //lucy  
            
            context.write(new Text(word1), new Text("1"+","+word1+","+word2));
                                          //第一个Text对应Mapper的第三个Text;第二个Text对应Mapper的第四个Text
            context.write(new Text(word2), new Text("2"+","+word1+","+word2));
            
            //tom,1+tom+lucy
             System.out.println("map......"+word1+"-"+word2);
         }
        
    }
    
    private static class  MyGLReduce extends Reducer<Text, Text, Text, Text>{
         public void reduce(Text key, Iterable<Text> values, Context context) 
                                       throws java.io.IOException, java.lang.InterruptedException
                                  //context:上下文对象,在整个wordcount运算生命周期内存活
         {
              List<String> grandch=new ArrayList();   //泛型
              List<String> grandpa=new ArrayList();
             
                                        /*  lucy 2+tom+lucy
             lucy 1+lucy+mary
             
             2->split[1]    tom     2的话取1
             1->split[2]    mary    1的话取2
             
             k3=tom  v3=mary   把这两个放在上下文
             */

             Iterator<Text>  it=values.iterator();  
                                                // Iterator<Text>--输进来的第二个值
             while(it.hasNext()){ 
                String lines= it.next().toString();      
                                                                         //2,tom,lucy(对应MyGLMapper的context.write())
                String [] words=lines.split(",");    
                                                                          //劈开 string 数组 ["2","tom","lucy"]
                if(words[0].equals("1")){
                    grandpa.add(words[2]);
                }else if(words[0].equals("2")){
                    grandch.add(words[1]);
                }
                else
                    return;
                
             }
              for(String ch:grandch)
             for(String pa:grandpa)  
             context.write(new Text(ch), new Text(pa));    
             
             System.out.println("reduce......");
         }
         protected void cleanup(Context context) 
                                      throws java.io.IOException, java.lang.InterruptedException{
             
         }
            
    }

    private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
    private static String OUTPUT_PATH="hdfs://master:9000/output/c/";

    public static void main(String[] args) throws Exception {   
        
        Configuration  conf=new Configuration();
        FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
     
        if(fs.exists(new Path(OUTPUT_PATH)))
                fs.delete(new Path(OUTPUT_PATH));
        
        Job  job=new Job(conf,"myjob");
        
        job.setJarByClass(MyGL.class);
        job.setMapperClass(MyGLMapper.class);
        job.setReducerClass(MyGLReduce.class);
        
         
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //job.setCombinerClass(MyReduce.class);
         
        
        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        
        job.waitForCompletion(true);

    }

}
image.png

当line[0]=1,line[1]=child;
当line[0]=2,line[2]=grandpa;
测试数据:

child   parent 
Tom Lucy
Tom Jack
Jone    Lucy
Jone    Jack
Lucy    Mary
Lucy    Ben
Jack    Alice
Jack    Jesse
Terry   Alice
Terry   Jesse
Philip  Terry
Philip  Alma
Mark    Terry
Mark    Alma
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 背景 一年多以前我在知乎上答了有关LeetCode的问题, 分享了一些自己做题目的经验。 张土汪:刷leetcod...
    土汪阅读 14,352评论 0 33
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,497评论 19 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,958评论 18 399
  • 当阳英雄阅读 838评论 0 0
  • 4月的太湖半程马拉松让我很享受跑步带来的愉悦感和成就感,赛后意犹未尽,感觉“半马不是马,全马才是马”,好像不跑...
    胡林海阅读 4,266评论 5 8