环境准备
1 本地环境:win7(64位)、Intellij IDEA 2017.1.4
2 集群环境:ambri集群,Centos6.5、hadoop2.7.3.共七个节点如下图:
3 本文以最简单的WordCount为例
配置及代码编写
1.首先将集群上的hadoop环境下载到本地,本文下载到
2.本地环境变量配置
- 去网上下载对应hadoop版本的hadoop.dll、winutils.exe,分别放到目录“C:\Windows\System32”和“$HADOOP_HOME\bin”下。
说明:hadoop.dll主要是防止插件报各种莫名错误,比如空对象引用,我本来以为intellij idea不需要安装,结果被空指针错误拖了很久。 -
修改本地hosts文件。在目录"C:\Windows\System32\drivers\etc"下
-
在intellij idea下新建maven 项目
其中,gkd.xgs.yxm是我的package,里面是WordCount.java程序。
resources是我新建的文件夹,里面需要存放hadoop集群中配置文件core-site.xml、mapred-site.xml、yarn-site.xml,此外,将log4j.properties文件也放在下面。
resources目录结构如上图。
其中core-site.xml、mapred-site.xml、yarn-site.xml,这几个文件从集群上复制过来,但是ambri集群有个问题这里需要注意一下。
这里需要加一个hdp.version,否则出错。
6.pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>gkd.xgs.yxm</groupId>
<artifactId>WordC</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>WordC</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>oschina</id>
<name>oschina maven</name>
<url>http://maven.oschina.net/content/groups/public/</url>
</repository>
<repository>
<id>central</id>
<name>central maven</name>
<url>http://repo1.maven.org/maven2/</url>
</repository>
<repository>
<id>Akka repository</id>
<url>http://repo.akka.io/releases</url>
</repository>
<repository>
<id>hadoop-pcap</id>
<url>http://dl.bintray.com/hadoop-pcap/hadoop-pcap</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>gkd.xgs.yxm.WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
7.编写mapreduce主体代码。
package gkd.xgs.yxm;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> { //4个类型分别是:输入key类型、输入value类型、输出key类型、输出value类型
//MapReduce框架读到一行数据侯以key value形式传进来,
// key默认情况下是mr框架所读到一行文本的起始偏移量(Long类型),value默认情况下是mr框架所读到的一行的数据内容(String类型)
//这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,
// 例如Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,int对应的是IntWritable。
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
//Reducer 4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型
//这里需要注意的是,reduce方法接受的是:一个字符串类型的key、一个可迭代的数据集,因为reduce任务读取到map
//任务处理结果是这样的:(good,1)(good,1)(good,1)(good,1)
//当传给reduce方法时,就变为:
//key:good
//value:(1,1,1,1)
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
//创建配置对象
Configuration conf = new Configuration();
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("mapreduce.framework.name", "yarn");
conf.set("mapreduce.job.jar","D:\\workspace\\WordC\\target\\WordC-1.0-SNAPSHOT-jar-with-dependencies.jar");
String[] ioArgs = new String[] { "hdfs://rsct0:8020/input/test1.txt", "hdfs://rsct0:8020/output_test" };
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
//创建job对象
Job job = Job.getInstance(conf, "word count");
//设置运行job的类
job.setJarByClass(WordCount.class);
//设置map、combine、reduce处理类
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出目录
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
//提交job
boolean isSuccess = job.waitForCompletion(true);
System.exit(isSuccess ? 0 : 1);
}
}