package mr;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyGL {
private static class MyGLMapper extends Mapper<LongWritable, Text, Text, Text>{
/*输入类型是LongWritable,Text(上下文);
输出类型是Text,Text(也就是Reduce的输入类型)*/
public void map(LongWritable k1, Text v1, Context context)
throws java.io.IOException, java.lang.InterruptedException
//map()函数是固定模式的,三个参数
{
// 1 2
String[] lines= v1.toString().split("\t");
// \t 在同一个缓冲区内横向跳8个空格(Tab键);split()方法用于把一个字符串分割
//成字符串数组;v1指的是一行,把第一行的两个单词存进lines
if(lines.length!=2 || lines[0].trim().equals("child"))
return; //child parent
String word1=lines[0].trim(); //tom ->去掉 lines[0]里面的空格符
String word2=lines[1].trim(); //lucy
context.write(new Text(word1), new Text("1"+","+word1+","+word2));
//第一个Text对应Mapper的第三个Text;第二个Text对应Mapper的第四个Text
context.write(new Text(word2), new Text("2"+","+word1+","+word2));
//tom,1+tom+lucy
System.out.println("map......"+word1+"-"+word2);
}
}
private static class MyGLReduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context)
throws java.io.IOException, java.lang.InterruptedException
//context:上下文对象,在整个wordcount运算生命周期内存活
{
List<String> grandch=new ArrayList(); //泛型
List<String> grandpa=new ArrayList();
/* lucy 2+tom+lucy
lucy 1+lucy+mary
2->split[1] tom 2的话取1
1->split[2] mary 1的话取2
k3=tom v3=mary 把这两个放在上下文
*/
Iterator<Text> it=values.iterator();
// Iterator<Text>--输进来的第二个值
while(it.hasNext()){
String lines= it.next().toString();
//2,tom,lucy(对应MyGLMapper的context.write())
String [] words=lines.split(",");
//劈开 string 数组 ["2","tom","lucy"]
if(words[0].equals("1")){
grandpa.add(words[2]);
}else if(words[0].equals("2")){
grandch.add(words[1]);
}
else
return;
}
for(String ch:grandch)
for(String pa:grandpa)
context.write(new Text(ch), new Text(pa));
System.out.println("reduce......");
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException{
}
}
private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
private static String OUTPUT_PATH="hdfs://master:9000/output/c/";
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
if(fs.exists(new Path(OUTPUT_PATH)))
fs.delete(new Path(OUTPUT_PATH));
Job job=new Job(conf,"myjob");
job.setJarByClass(MyGL.class);
job.setMapperClass(MyGLMapper.class);
job.setReducerClass(MyGLReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setCombinerClass(MyReduce.class);
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
}
当line[0]=1,line[1]=child;
当line[0]=2,line[2]=grandpa;
测试数据:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma