欢迎关注我的CSDN: https://blog.csdn.net/bingque6535
1. 编写WordCount
-
Driver端
package com.hjf.mr.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 9:49 */ public class WordCountJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 创建Configuration和Job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定Master对应的类, 就是当前所在的类 job.setJarByClass(WordCountJob.class); // 指定Mapper和Reducer对应的类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定Mapper端输出的key 和value 的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定Reducer端输出的key 和 value的类型 // 如果Mapper端和Reducer端输出的key, value类型一样, 可省略其中一组的指定 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inputPath = new Path("./Data/words.txt"); Path outputPath = new Path("./Data/result"); // 如果输出路径存在, 则删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)){ fs.delete(outputPath, true); } // 设置文件的输入路径 和 结果的返回路径 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 提交 job.waitForCompletion(true); // job.submit(); } } -
Mapper端
package com.hjf.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 9:54 * * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN: Mapper端输入key的类型, 一般都是LongWritable * VALUEIN: Mapper端输入value的类型 * KEYOUT: Mapper端输出key的类型 * VALUEOUT: Mapper端输出value的类型 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将Text型转换为String 型 String lines = value.toString(); String[] words = lines.split(" "); // word --> (word, 1) for (String word: words) { context.write(new Text(word), one); } } }
- Reducer端
package com.hjf.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 9:53 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN: reducer端输入key的类型 * VALUEIN: reducer端输入value的类型 * KEYOUT: reducer端输出key的类型 * VALUEOUT: reducer端输出value的类型 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 相同key的value值进行累加 for (IntWritable value: values) { sum += value.get(); } IntWritable count = new IntWritable(sum); context.write(key, count); } }
二. TopN
- 题目
1、统计每门课程的平均分
2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分 - 数据集
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75
1. 问题1
问题: 统计每门课程平均分
思路:
1. 先计算每名学生的每门课程的平均分
2. 再用计算得到的平均分累加计算课程平均分
-
Driver 端
package com.hjf.mr.top_n.question1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:06 * */ public class TopNJob1 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopNJob1.class); job.setMapperClass(TopNMapper1.class); job.setReducerClass(TopNReducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path inputPath = new Path("./Data/score.txt"); Path outputPath = new Path("./Data/result"); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } } -
Mapper 端
package com.hjf.mr.top_n.question1; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:12 */ public class TopNMapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); String[] split = lines.split(","); Text courseName = new Text(split[0]); double sum = 0.0; int count = split.length - 2; for (int i = 2; i < split.length; i++) { sum += Integer.parseInt(split[i]); } DoubleWritable avg = new DoubleWritable(sum / count); context.write(courseName, avg); } } -
Reducer 端
package com.hjf.mr.top_n.question1; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:13 */ public class TopNReducer1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum = 0.0; int count = 0; for (DoubleWritable value: values){ sum += value.get(); count += 1; } DoubleWritable result = new DoubleWritable(sum / count); context.write(key, result); } }
2. 问题2
-
Driver
package com.hjf.mr.top_n.question2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:06 * */ public class TopNJob2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定自定义分区类 job.setPartitionerClass(TopNPartition.class); // 指定分区数 job.setNumReduceTasks(4); job.setJarByClass(TopNJob2.class); job.setMapperClass(TopNMapper2.class); job.setReducerClass(TopNReducer2.class); job.setMapOutputKeyClass(CourseScore.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(CourseScore.class); job.setOutputValueClass(NullWritable.class); Path inputPath = new Path("./Data/score.txt"); Path outputPath = new Path("./Data/result"); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } } -
Mapper
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:12 */ public class TopNMapper2 extends Mapper<LongWritable, Text, CourseScore, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); String[] split = lines.split(","); String courseName = split[0]; String name = split[1]; double sum = 0.0; int count = split.length - 2; for (int i = 2; i < split.length; i++) { sum += Integer.parseInt(split[i]); } CourseScore courseScore = new CourseScore(courseName, name, sum / count); context.write(courseScore, NullWritable.get()); } } -
Reducer
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.text.DecimalFormat; /** * @author Jiang锋时刻 * @create 2020-05-17 11:13 */ public class TopNReducer2 extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable> { @Override protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { double sum = 0.0; for (NullWritable value: values) { // 获取 CourseScore对象中的score值, 并将其保留一位小数, // 然后再重新封装成CourseScore对象 double score = key.getScore(); DecimalFormat df = new DecimalFormat("0.0"); double format = Double.parseDouble(df.format(score)); CourseScore courseScore = new CourseScore(key.getCourse(), key.getName(), format); context.write(courseScore, value); } } } -
Partition
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @author Jiang锋时刻 * @create 2020-05-17 15:42 * * Partitioner<KEY, VALUE> * pattition对应的key, value类型应该和Mapper端输出类型保持一致 */ public class TopNPartition extends Partitioner<CourseScore, NullWritable> { @Override public int getPartition(CourseScore courseScore, NullWritable nullWritable, int numPartitions) { String course = courseScore.getCourse(); // 将不同课程名的信息保存到不同的分区中 switch (course) { case "algorithm": return 0; case "computer": return 1; case "english": return 2; default: return 3; } } } -
自定义类型
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 15:48 */ public class CourseScore implements WritableComparable<CourseScore> { private String course; private String name; private double score; // 反序列化时, 需要反射调用空参构造函数, 所以必须要有该空参构造函数 public CourseScore() { } public CourseScore(String course, String name, double score) { this.course = course; this.name = name; this.score = score; } public String getCourse() { return course; } public double getScore() { return score; } public String getName() { return name; } @Override public int compareTo(CourseScore that) { if (this.score > that.score){ return -1; } else if (this.score < that.score) { return 1; } else { return 0; } } // 重写序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeUTF(course); out.writeUTF(name); out.writeDouble(score); } // 重写反序列化方法 // 注意: 反序列化的顺序和序列化的顺序必须完全一致 @Override public void readFields(DataInput in) throws IOException { course = in.readUTF(); name = in.readUTF(); score = in.readDouble(); } @Override public String toString() { return "课程: " + course + ", 姓名: " + name + ", 分数: " + score; } }
3. 问题三
问题3只需在问题2代码的基础上修改一下输出条件即可
public class TopNReducer3 extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable> {
// 注意: 必须设置在reduce函数外部, 否则每次运行都会被重新初始化
// 设置一个变量, 用于统计输出的数据条数
int count = 0;
// 需要输出前几的数据, 这里就设置为数字几
private final int top = 3;
@Override
protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable value: values) {
double score = key.getScore();
DecimalFormat df = new DecimalFormat("0.0");
double format = Double.parseDouble(df.format(score));
CourseScore courseScore = new CourseScore(key.getCourse(), key.getName(), format);
if (count++ < top){
context.write(courseScore, value);
}
}
}
}
欢迎关注我的CSDN: https://blog.csdn.net/bingque6535