注:测试数据在最后
写一个map-reduce,输出结果和下面的Hive查询语句相同
1. select * from customers where country="USA"
public class OutMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
public void map(LongWritable key, Text line, Context context){
String[] info = line.toString().split(",");
if("USA".equals(info[3])){
try {
context.write(key, line);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class OutReducer extends Reducer<LongWritable, Text, Text, NullWritable>{
public void reduce(Text line,Iterable<Text> value, Context context){
try {
context.write(line, NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class OutRunner extends Configured implements Tool {
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new OutRunner(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "HQL");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(OutMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(OutReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}
2. select country, count(country) from customers group by country
public class OutMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text line, Context context)
throws IOException, InterruptedException {
String[] info = line.toString().split(",");
// 第四列为country列
context.write(new Text(info[3]), new IntWritable(1));
}
}
public class OutReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value: values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
public class OutRunner extends Configured implements Tool {
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new OutRunner(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "HQL");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(OutMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(OutReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}