wordCount程序是hadoop中自带的一个程序,能够进行词数的统计。它的位置在hadoop目录下的share/hadoop/mapreduce目录下:
在那个hadoop-mapreduce-examples-2.6.5.jar包中。
在命令模式下,我们进入那个目录然后可以这样运行:
hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount /input/Input /output
来看看运行前后的结果:
接下来,我们来看看这个程序的源码实现,源码是在官网下的那个带源码的压缩文件中:
要看源码需要下载这个版本的压缩包。然后减压,进入:
一步一步找下去就可以看到了:
为了能够单独跑这个源码,我们将里面的内容复制到centOS上的eclipse里看看。在eclipse中新建一个Map/Reduce工程,将代码复制过去。
先复制内容,import进去的包不慌复制,这时能看到报很多错,全是包没引入导致的,这时一个一个的去看,我们就能看到哪些类是hadoop中提供的。导完后代码就不会报错了。
接下来再多做一件事,为了让eclipse能够跳转到hadoop提供的源码中,我们还是让eclipse关联一下hadoop的源码吧。
把解压的hadoop-2.6.5-src移动到/usr/local目录下,然后在eclipse中选择一个hadoop里的类右键查看源文件:
发现并没有,然后就开始点击Attach Source做关联:
点击OK后就一下子发现源码果然显示出来了:
这下就能愉快的看源码了。
那么下面就来看看wordCount程序里面有些啥东西。
首先我们可以看到wordCount类里面分成了三个部分:
第一部分:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()){
word.set(itr.nextToken()); //word存储被切割出来的单词
context.write(word, one);
}
}
}
这是第一个映入眼帘的内部类,它继承了Mapper类,Mapper类有几个泛型,分别定义为了<Object,Text,Text,IntWritable>。
该类map方法调用默认的LineRecordReader,得到的value值是文本文件中的一行,key值为该行首字母相对于文本文件首地址的偏移量。之后,通过StringTokenizer类将value的值分割为一个一个单词,并将<word,1>作为map方法的键值对进行输出。其中Text类和IntWritable类是Hadoop对String和Int的封装。
第二部分:
public static class IntSumReducer extends Reducer
<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
可以看到这是继承了Reducer类,对应的是Reduce过程。
Reduce函数从Map端获得形如<word,{1,1,1....}>的输出,根据这些value值累加得到该单词的出现次数并输出。
接着是第三部分main函数:
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//这个是自己加的
//conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
@SuppressWarnings("deprecation")
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
main函数中主要是作业配置过程。
整个wordCount程序就是由map和reduce还有main函数组成。也就是说,要写一个mapreduce程序,一般都需要有map和reduce过程,这两个过程的实现就是继承Mapper类和Reducer类,实现其map和reduce函数。
下面再在eclipse中运行看看,因为要输入参数:
用的Input还是前面的一个文件:
运行输出(在上面指定的output文件夹下):
下一篇准备看看hadoop中用到的数据库:HBase。