背景:学校的学生的是一个非常大的生成数据的集体,比如每次考试的成绩
现有一个班级的学生一个月的考试成绩数据。
科目 姓名 分数
需求:求出每门成绩中属于甲级的学生人数和总人数
乙级的学生人数和总人数
丙级的学生人数和总人数
甲级(90及以上)乙级(80到89)丙级(0到79)
处理数据结果:
课程\t甲级\t学生1,学生2,...\t总人数
课程\t乙级\t学生1,学生2,...\t总人数
课程\t丙级\t学生1,学生2,...\t总人数
示例数据:
English,liudehua,80
English,lijing,79
English,nezha,85
English,jinzha,60
English,muzha,71
English,houzi,99
English,libai,88
English,hanxin,66
English,zhugeliang,95
Math,liudehua,74
Math,lijing,72
Math,nezha,95
Math,jinzha,61
Math,muzha,37
Math,houzi,37
Math,libai,84
Math,hanxin,89
Math,zhugeliang,93
Computer,liudehua,54
Computer,lijing,73
Computer,nezha,86
Computer,jinzha,96
Computer,muzha,76
Computer,houzi,92
Computer,libai,73
Computer,hanxin,82
Computer,zhugeliang,100
代码如下:
StudentMapper.java
package top.gujm.student;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class StudentMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//判断学生等级
int score = Integer.parseInt(fields[2]);
//获取等级
String level = parseLevel(score);
//使用学科\t成绩等级作为key,学生姓名作为value输出
context.write(new Text(fields[0]+"\t"+level),new Text(fields[1]));
}
/**
* 判断学生成绩对应的等级
* @param score
* @return
*/
private String parseLevel(int score) {
// 如果这里直接输出等级名称,则排序后丙在前,甲在后
// 所以为了让Mapper排序后甲在前,丙在后,这里输出{index},在reduce中再替换成等级名称
if(score < 80){
return "{2}";
}else if (score < 90){
return "{1}";
}else {
return "{0}";
}
}
}
StudentReduce.java
package top.gujm.student;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class StudentReduce extends Reducer<Text, Text, Text, Text> {
/**
* 统计对应学科对应等级的学生
* @param key 学科\t学生等级
* @param values 对应学科对应等级的学生集合
* @param context 剩下文
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//替换等级
String level = key.toString()
.replace("{0}", "甲级")
.replace("{1}", "乙级")
.replace("{2}", "丙级");
//保存所有学生
StringBuffer sb = new StringBuffer();
//计数学生数量
int count = 0;
for (Text student : values){
sb.append(student+",");
count += 1;
}
//去除最后一个逗号,将最后一个字符替换为空字符串
sb.replace(sb.length() - 1, sb.length(), "");
context.write(new Text(level), new Text(sb.toString() + "\t" + count));
}
}
运行结果
运行结果.png