安装类型
- 单机模式
- 伪分布式集群模式
- 多节点集群安装模式
一个MapReduce程序的组成
- Java程序客户机:一个Java程序,由集群中的一个客户端节点提交运行,这个客户端节点可以访问Hadoop集群,它经常由集群中的一个数据节点来充当,该节点仅是集群中的一台机器,并且有权限访问Hadoop
- 自定义Mapper类:除非在最简单的应用场景下,MapReduce程序中的这个Mapper类通常是一个用户自定义类,如果不是在伪集群模式下运行MapReduce程序,这个类的实例会在远程任务节点上执行,这些任务节点往往与用来提交作业程序的客户端节点不同。
- 自定义Reducer类:除非在最简单的应用场景下,MapReduce程序中的这个Reducer类通常是一个用户自定义类,与Mapper类一样,如果不是在伪集群模式下运行MapReduce程序作业,这个类的实例会在远程任务节点上执行,这些任务节点往往与用来提交作业程序的客户端节点不同。
- 客户端函数库:客户端函数库不同于Hadoop系统的标准函数库,这个函数库是在客户端运行期间使用的。客户端需要使用的Hadoop系统标准库已经安装,并且使用通过Hadoop的Clienet命令(这与客户端程序不同)配置到CLASSPATH中。我们在文件夹$HADOOP_HOME/bin中可以找到它,其名称为hadoop。就像Java命令用来执行一个Java程序,hadoop命令用来执行客户端程序,该程序会启动一个Hadoop作业,这些函数库都被配置到了环境变量HADOOP_CLASSPATH中,这个变量是一个冒号分隔的函数列表。
- 远程函数库:这个函数库是用户自定义Mapper类和Reducer类所需要的,这个远程函数库不包括Hadoop系统自带的函数库,因为Hadoop系统自带的函数库已经在每个数据节点都配置好了。举个例子,如果Mapper类需要用到一个特殊的XML解析器,包含这个解析器的函数库就必须要传输到执行这个Mapper类实例的远程数据节点。
- Java程序档案文件:Java程序以JAR文件德尔形式打包,这个JAR文件中包含了客户端Java类,以及用户自定义Mapper和Reducer类,还包括客户端Java类,Mapper类和Reducer类用到的其他自定义依赖类。
旧API编写wordcount
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class wordcount{
public static class MyMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable>output,Reporter reporter) throws IOException{
output.collect(new Text(value.toString()),new IntWritable(1));
}
}
public static class MyReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterator<IntWritable>values,OutputCollector<Text,IntWritable>output,Reporter reporter) throws IOException{
int sum=0;
while(values.hasNext())
sum+=values.next().get();
output.collect(key,new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(wordcount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapper.class);
conf.setCombinerClass(MyReducer.class);
conf.setReducerClass(MyReducer.class);
conf.setNumReduceTasks(1);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
}
}
JobConf时Hadoop框架中配置MapReduce任务的主要接口,框架会按照JobConf对象中的配置信息来执行作业
TextInputFormat向Hadoop框架声明其输入文件为文本格式,它是InputFormat的子类,现在我们知道TextInputFormat类会读取输入文件的每行作为一个记录
TextOutputFormat指定了该MapReduce作业的输出情况,举个例子,他会检测其输出目录是否存在,如果输出目录已经存在,Hadoop系统会拒绝该作业的执行。
FileInputFormat.setInputPaths(conf,new Path(args[0])向Hadoop框架向Hadoop框架声明了尧都区的文件的所在的目录,该输入目录可以包含一个或者多个文件,并且每个文件中每行含有一个单词,注意的是setInputPaths中使用的是复数,这个方法可以给Hadoop程序配置一个文件目录数组作为其输入数据路劲,这些输入目录的所有文件构成了该作业的输入数据源
FileOutputFormat.setOutputPath(conf,new Path(args[1])指定了作业的输出目录,作业的最终结果会输出保存在该目录下
conf.setOutputKeyClass和conf.setOutputValueClass指定了输出的键类和值类,他们与Reducer类相匹配,这里似乎有些多余,确实是这样的,键和值类必须要同指定的Reducer类一直,否则在作业执行的时候讲跑出RuntimeException异常
Reducer实例的执行数量默认为1,该值是可以改变的,并且常常可以用来提高程序的运行效率,调用JobConf.class的实例中setNumReduceTask(int n)方法就可以配置该参数。
当程序开始时候,Mapper函数会从输入文件夹的输入文件夹中读取数据块,一个文件的字节流会被转换成一个记录(键/值对格式),然后作为Mapper的输入,键是当前字节偏移量,值是文件中读取的一行文件数据。
Mapper发送的每行单词和整数1,要注意的是,Hadoop作业程序需要使用与Integer.class对应的系统自带的IntWritable.class类,其原型是Hadoop 系统的底层I/O传输设计。
紧接着是Hadoop的Shuffle阶段,这个过程在上面的列表中并不明显,所有的键在Shuffer/Sort阶段被排序,然后发送给Reducer。逻辑上Reducer接收到的是一个IntWritable.class的实例的迭代器,实际上Reducer使用的是相同的IntWritable.class类实例。当Reducer迭代去除键对应的一系列值,相同IntWritable.class类实例被复用,从内部运行看,Hadoop系统框架在其迭代中使用values.next()之后,调用了IntWirtable.set方法。最后当Reducer输出结果的时候,写入到指定的输出目录中,输出文件中的键和值的实例是TAB为默认分隔符,这个分隔符可以通过配置参数来修改
conf.set("mapreduce.textoutputformat.separator",",")
新APi编写wordcount
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class wordcountnew
{
public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String w=value.toString();
context.write(new Text(w),new IntWritable(1));
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException,InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
context.write(key,new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception{
Job job=Job.getInstance(new Configuration());
job.setJarByClass(wordcountnew.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean status=job.waitForCompletion(true);
if(status){
System.exit(0);
}else{
System.exit(1);
}
}
}