WordCount因果图
MapReduce中 map和reduce函数格式
MapReduce中,map和reduce函数遵循如下常规格式:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
Mapper的基类:
protected void map(KEY key, VALUE value,
Context context) throws IOException, InterruptedException {
}
Reducer的基类:
protected void reduce(KEY key, Iterable<VALUE> values,
Context context) throws IOException, InterruptedException {
}
Context是上下文对象
代码模板
wordcount 代码
代码编写依据,也就是固定写法
input-->map--->reduce->output
以下java代码实现此命令的功能bin/hdfs dfs jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar input output
package com.lizh.hadoop.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
private static Log logger = LogFactory.getLog(WordCount.class);
//step1 Mapper class
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapoutputKey = new Text();
private static final IntWritable mapOutputValues = new IntWritable(1);//全局只有一个
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stubl
String linevalue = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(linevalue);
while(stringTokenizer.hasMoreTokens()){
String workvalue = stringTokenizer.nextToken();
mapoutputKey.set(workvalue);
context.write(mapoutputKey, mapOutputValues);
logger.info("-----WordCountMapper-----"+mapOutputValues.get());
}
}
}
//step2 Reduce class
public static class WordCountReduces extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable reduceOutputValues = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> vaues,Context context)
throws IOException, InterruptedException {
int sum =0;
for(IntWritable iv:vaues){
sum=sum+iv.get();
}
reduceOutputValues.set(sum);
context.write(key, reduceOutputValues);
}
}
//step3 driver component job
public int run(String[] args) throws Exception{
//1 get configration file core-site.xml hdfs-site.xml
Configuration configuration = new Configuration();
//2 create job
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
//3 run jar
job.setJarByClass(this.getClass());
//4 set job
//input-->map--->reduce-->output
//4.1 input
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job, path);
//4.2 map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//4.3 reduce
job.setReducerClass(WordCountReduces.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//4.4 output
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
//5 submit job
boolean rv = job.waitForCompletion(true);
return rv ? 0:1;
}
public static void main(String[] args) throws Exception{
int rv = new WordCount().run(args);
System.exit(rv);
}
}
map类业务处理
map 业务处理逻辑
--------------input--------
<0,hadoop yarn>
--------------处理---------
hadoop yarn -->split->hadoop,yarn
-------------output-------
<hadoop,1>
<yarn,1>
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapoutputKey = new Text();
//全局只有一个
private static final IntWritable mapOutputValues = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stubl
String linevalue = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(linevalue);
while(stringTokenizer.hasMoreTokens()){
String workvalue = stringTokenizer.nextToken();
mapoutputKey.set(workvalue);
context.write(mapoutputKey, mapOutputValues);
logger.info("-----WordCountMapper-----"+mapOutputValues.get());
}
}
}
reduce类业务处理过程
reduce 业务处理过程 map-->shuffle-->mapreduce
------------input(map的输出结果)-----------------
<hadoop,1>
<hadoop,1>
<hadoop,1>
----------------分组----------------
将相同key的值合并到一起,放到一个集合
<hadoop,1>
<hadoop,1> -> <hadoop,list(1,1,1)>
<hadoop,1>
//step2 Reduce class
public static class WordCountReduces extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable reduceOutputValues = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> vaues,Context context)
throws IOException, InterruptedException {
int sum =0;
for(IntWritable iv:vaues){
sum=sum+iv.get();
}
reduceOutputValues.set(sum);
context.write(key, reduceOutputValues);
}
}
优化MapReduce写法
mapReduce 继承configured类, 并实现 Tool接口
tool接口类中的run方法重写
configured 提供初始化工作。
package com.lizh.hadoop.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountMapReduce extends Configured implements Tool {
private static Log logger = LogFactory.getLog(WordCountMapReduce.class);
//step1 Mapper class
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapoutputKey = new Text();
private static final IntWritable mapOutputValues = new IntWritable(1);//全局只有一个
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stubl
String linevalue = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(linevalue);
while(stringTokenizer.hasMoreTokens()){
String workvalue = stringTokenizer.nextToken();
mapoutputKey.set(workvalue);
context.write(mapoutputKey, mapOutputValues);
logger.info("-----WordCountMapper-----"+mapOutputValues.get());
}
}
}
//step2 Reduce class
public static class WordCountReduces extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable reduceOutputValues = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> vaues,Context context)
throws IOException, InterruptedException {
int sum =0;
for(IntWritable iv:vaues){
sum=sum+iv.get();
}
reduceOutputValues.set(sum);
context.write(key, reduceOutputValues);
}
}
//step3 driver component job
public int run(String[] args) throws Exception{
//1 get configration file core-site.xml hdfs-site.xml
Configuration configuration = super.getConf();//优化
//2 create job
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
//3 run jar
job.setJarByClass(this.getClass());
//4 set job
//input-->map--->reduce-->output
//4.1 input
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job, path);
//4.2 map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//4.3 reduce
job.setReducerClass(WordCountReduces.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//4.4 output
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
//5 submit job
boolean rv = job.waitForCompletion(true);//true的时候打印日志
return rv ? 0:1;
}
public static void main(String[] args) throws Exception{
//int rv = new WordCountMapReduce().run(args);
Configuration configuration = new Configuration();
//使用工具类运行
int rv = ToolRunner.run(configuration, new WordCountMapReduce(), args);
System.exit(rv);
}
}
抽象出模板
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountMapReduce extends Configured implements Tool {
/**
* Mapper Class : public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
* @param args
*/
public static class WordCountMapper extends //
Mapper<LongWritable, Text, Text, LongWritable> {
private Text mapOutputKey = new Text();
private LongWritable mapOutputValue = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
}
}
/**
* Reducer Class : public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
*
* @param args
*/
public static class WordCountReducer extends //
Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable outputValue = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
// temp sum
}
}
/**
* Driver : Create\set\submit Job
*
* @param args
* @throws Exception
*/
public int run(String[] args) throws Exception {
// 1.Get Configuration
Configuration conf = super.getConf();
// 2.Create Job
Job job = Job.getInstance(conf);
job.setJarByClass(getClass());
// 3.Set Job
// Input --> map --> reduce --> output
// 3.1 Input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// 3.2 Map class
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 3.3 Reduce class
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 3.4 Output
Path outPath = new Path(args[1]);
FileSystem dfs = FileSystem.get(conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
// 4.Submit Job
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// run job
int status = ToolRunner.run(//
conf,//
new WordCountMapReduce(),//
args);
// exit program
System.exit(status);
}
}