实验目的
- 通过实验掌握基本的MapReduce编程方法。
- 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。
- 通过操作MapReduce的实验,模仿实验内容,深入理解MapReduce的过程,和shuffle的具体意义。
实验平台
- 操作系统:Ubuntu-16.04
- Hadoop版本:2.6.0
- JDK版本:1.8
- IDE:Eclipse
实验内容和要求
一,编程实现文件二次排序:
-
现有一个输入文件,包含两列数据,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。下面是输入文件和输出文件的一个样例供参考。
- 输入文件secondarysort.txt的样例如下:
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
-
输出文件的样例如下:
实验过程:
-
创建文件secondarysort.txt
-
在HDFS建立secondarysort_input文件夹(执行这步之前要开启hadoop相关进程)
-
上传输入文件到HDFS中的secondarysort_input文件夹
- 接着打开eclipse
Eclipse的使用-
点开上次实验的项目,找到 src 文件夹,右键选择 New -> Class
-
输入 Package 和 Name,然后Finish
-
写好Java代码(给的代码里要修改HDFS和本地路径),右键选择 Run As -> Run on Hadoop,结果在HDFS系统中查看
-
实验代码:
package cn.edu.zucc.mapreduce2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LoggerJob {
public static class LoggerMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable counter = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String result = handleLog(line);
if (result != null && result.length() > 0) {
context.write(new Text(result), counter);
}
}
private String handleLog(String line) {
StringBuffer sBuffer = new StringBuffer();
try {
if (line.length() > 0) {
if (line.indexOf("GET") > 0) {
String tmp = line.substring(line.indexOf("GET"),
line.indexOf("HTTP/1.0"));
sBuffer.append(tmp.trim());
} else if (line.indexOf("POST") > 0) {
String tmp = line.substring(line.indexOf("POST"),
line.indexOf("HTTP/1.0"));
sBuffer.append(tmp.trim());
}
} else {
return null;
}
} catch (Exception e) {
e.printStackTrace();
}
return sBuffer.toString();
}
}
public static class LoggerReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[]{"loggerjob_input",
"loggerjob_output"};
if (otherArgs.length != 2) {
System.err.println("Usage: loggerjob <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "loggerjob");
job.setJarByClass(LoggerJob.class);
job.setMapperClass(LoggerMapper.class);
job.setReducerClass(LoggerReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
模仿上题完成以下内容:对于输入文件,要求依次按照顺序对字段进行排序,如果第一个字段排好后再根据第二个字段的升序排序最后在根据第三个字段进行排序,对下面是输入文件和输出文件的一个样例供参考。
- 输入文件的样例如下:
a1,b2,c5
a4,b1,c3
a1,b2,c4
a2,b2,c4
a2,b1,c4
a4,b1,c2
- 输出文件的结果为:
a1 b2,c4
a1 b2,c5
a2 b1,c4
a2 b2,c4
a4 b1,c2
a4 b1,c3
二,编写程序实现对非结构化日志文件处理:
-
现有一个输入文件,要求区分不同的HTTP请求对文件进行输出。下面是输入文件和输出文件的一个样例供参考。
- 输入文件的样例如下:
127.0.0.1 - - [03/Jul/2014:23:53:32 +0800] "POST /service/addViewTimes_544.htm HTTP/1.0" 200 2 0.004
127.0.0.1 - - [03/Jul/2014:23:54:53 +0800] "GET /html/20140620/900.html HTTP/1.0" 200 151770 0.054
127.0.0.1 - - [03/Jul/2014:23:57:42 +0800] "GET /html/20140620/872.html HTTP/1.0" 200 52373 0.034
127.0.0.1 - - [03/Jul/2014:23:58:17 +0800] "POST /service/addViewTimes_900.htm HTTP/1.0" 200 2 0.003
127.0.0.1 - - [03/Jul/2014:23:58:51 +0800] "GET / HTTP/1.0" 200 70044 0.057
127.0.0.1 - - [03/Jul/2014:23:58:51 +0800] "GET / HTTP/1.0" 200 70044 0.057
- 输出文件的结果为:
GET / 2
GET /html/20140620/872.html 1
GET /html/20140620/900.html 1
POST /service/addViewTimes_544.htm 1
POST /service/addViewTimes_900.htm 1
实验过程:
-
创建文件
-
在HDFS建立loggerjob_input文件夹
-
上传样例到HDFS中的loggerjob_input文件夹
- 到eclipse上执行代码
实验代码:
package cn.edu.zucc.mapreduce2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LoggerJob {
public static class LoggerMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable counter = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String result = handleLog(line);
if (result != null && result.length() > 0) {
context.write(new Text(result), counter);
}
}
private String handleLog(String line) {
StringBuffer sBuffer = new StringBuffer();
try {
if (line.length() > 0) {
if (line.indexOf("GET") > 0) {
String tmp = line.substring(line.indexOf("GET"),
line.indexOf("HTTP/1.0"));
sBuffer.append(tmp.trim());
} else if (line.indexOf("POST") > 0) {
String tmp = line.substring(line.indexOf("POST"),
line.indexOf("HTTP/1.0"));
sBuffer.append(tmp.trim());
}
} else {
return null;
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(line);
}
return sBuffer.toString();
}
}
public static class LoggerReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[] { "loggerjob_input",
"loggerjob_output" };
if (otherArgs.length != 2) {
System.err.println("Usage: loggerjob <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "loggerjob");
job.setJarByClass(LoggerJob.class);
job.setMapperClass(LoggerMapper.class);
job.setReducerClass(LoggerReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
模仿上题完成以下内容:对于气象数据,要求获取数据中的最高温度和最低温度。下面是输入文件和输出文件的一个样例供参考。
- 输入文件的样例如下:
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
数据说明:
第15-19个字符是year
第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据。
第50位值只能是0、1、4、5、9几个数字
- 输出文件结果为:
1949 111
1950 3
三,编写程序实现对输入文件进行倒排索引:
-
现有三个文本文档,需要根据单词查找文档,并且还要考虑权重问题。下面是输入文件和输出文件的一个样例供参考。
- 输入文件的d1.txt的样例如下:
Mapreduce is simple is easy
- 输入文件的d2.txt的样例如下:
Mapreduce is powerful is userful
- 输入文件的d3.txt的样例如下:
Hello Mapreduce Bye Mapreduce
- 输出文件的结果为:
Bye hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:1;
Hello hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:1;
Mapreduce hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:2;hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
easy hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
is hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:2;hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:2;
powerful hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;
simple hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
userful hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;
-
创建文件
-
在HDFS建立invertedindex_input文件夹
-
上传样例到HDFS中的invertedindex_input文件夹
- 到eclipse上执行代码
实验代码:
package cn.edu.zucc.mapreduce2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndex {
public static class InvertedIndexMap extends Mapper<Object, Text, Text, Text> {
private static final Text one = new Text("1");
private Text keyInfo = new Text();
private FileSplit split;
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
split = (FileSplit) context.getInputSplit();
String line = value.toString();
String[] strings = line.split(" ");
for (String s : strings) {
keyInfo.set(s + ":" + this.split.getPath().toString());
context.write(keyInfo, one);
}
}
}
public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
Text info = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context contex) throws IOException, InterruptedException {
Integer sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
key.set(key.toString().substring(0, splitIndex));
contex.write(key, info);
}
}
public static class InvertedIndexReduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String fileList = "";
for (Text value : values) {
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[]{"invertedindex_input",
"invertedindex_output"};
if (otherArgs.length != 2) {
System.err.println("Usage: InvertedIndex <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
模仿上题完成以下内容:对于输入文件,请编写MapReduce程序,统计文件中的通话记录情况。下面是输入文件和输出文件的一个样例供参考。
- 输入文件 A 的样例如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
- 输出文件 B 的结果为:
110 15699807656|18987655436|
112 011-678987|2543789|13588888888|
13509098987 13678987879|