最近参加了一个大数据开发的培训,整理一下在培训过程中,老师一直说的一个案例。
案例比较简单,使用MapReduce、hive、Spark等框架进行计算,对框架有一个简单的了解。
现在对这个案例进行一个简单的整理,方便后期学习。
数据
原始数据
数据说明
数据按照逗号(,)分割,每列数据说明如下:
序号 | 说明 | 备注 |
---|---|---|
1 | 学号 | |
2 | 姓名 | |
3 | 性别 | |
4 | 年龄 | |
5 | 上学期成绩 | 学科成绩以&分割,学科=成绩 |
6 | 下学期成绩 | 学科成绩以&分割,学科=成绩 |
需求
统计第一学期数学成绩及格和不及格的人数
解决方案
数据结构相对比较简单,分析每行数据中的第一学期数学成绩,
判断其中数据成绩是否及格,如果及格,则统计及格的人数,不及格统计不及格的人数。
0.准备工作
在正式开始这项工作时,首先要保证自己的环境可以正常运行,hadoop集群、mysql、hive、spark集群可以正常运行。
0.1环境准备
进入hadoop的sbin目录:
1.启动DFS
./start-dfs.sh
2.启动yarn
./start-yarn.sh
3.启动hive,进入hive的bin目录
#启动mysql
service mysql start
# 启动hive的metastore服务
hive --service metastore &
# 启动hive的hiveserver2服务
hive --service hiveserver2 &
4.启动spark:进入spark的sbin目录
start-all.sh
0.2 将需要分析的数据上传到服务器和HDFS中
将数据存放到student文件,进入student文件所在目录,将文件上传到HDFS的/data/目录下
hdfs dfs -put student /data/
查看HDFS文件是否上传成功
1.使用MapReduce解决
MapReduce的思路就是将所需要进行计算的数据,拆分到不同的机器上,
然后再不同的机器上计算,将不同机器上的结果汇总到一起,然后再进行计算。
在不同机器上进行计算的过程,通常称为Map阶段,汇总结果进行计算的过程,通常称为Reduce阶段。
结合MapReduce计算的基本思路,MapReduce实现一个任务主要分为Map和reduce两部分。
再实现的过程中,主要完成Map和Reduce的实现过程即可。
当然,一个MapReduce的任务,必须要有一个驱动主类将Map和Reduce调起方能执行。
1.1 Map
新建StudentScoreMapper,需要继承org.apache.hadoop.mapreduce.Mapper,并指定Map的输入输出类型,共有四个参数,前两个为输入数据类型,后两个为输出数据类型
public class StudentScoreMapper extends Mapper<Object,Text, Text, IntWritable>
Map阶段具体的执行逻辑时再map方法中实现的,故需要重写父类的Map方法,具体思路:
- 获取第一学期的成绩
- 获取该学期的数据成绩
- 判断数学成绩是否及格,如果及格,则使用输出<"Math_Score_Pass",1>,否则输出<"Math_Score_Not_Pass",1>
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 1. valueIn为文件中每行的数据,使用split方法进行分割
String[] vals = value.toString().split(",",-1);
// 2.每行数据中,倒数第二列为第一学期成绩
if(vals.length !=6 ){return;}
String scoreStr = vals[5];
// 3.每门成绩按照&分割
String[] scores = scoreStr.split("&");
if(scores.length != 4){return;}
// 4.获取数学成绩,学科成绩按照=分割
String mathScoreStr = scores[0];
String[] mathScore = mathScoreStr.split("=");
if(mathScore.length != 2){return;}
String score = mathScore[1];
// 5.判断成绩是否及格 int temp = Integer.parseInt(score);
if(temp >= 60){
context.write(outKeyPass,new IntWritable(1));
}else{
context.write(outKeyNotPass,new IntWritable(1));
}
}
1.2 Reduce
新建StudentScoreReducer类,继承org.apache.hadoop.mapreduce.Reducer,同时指定Reduce的输入和输出类型,其中输入类型与Map的输出类型保持一致。
public class StudentScoreReducer extends Reducer<Text, IntWritable,Text, LongWritable>
Reduce阶段具体的处理逻辑需要重写reduce方法,具体思路:就是把Map的输出结果,累加即可。
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
//1.经过shuffle之后 相同key的结果value 组成一个Iterable,作为reduce的输入参数进行计算
//2.遍历Iterable,将期所有的val进行累加
long sum = 0L;
for(IntWritable val : values){
sum += val.get();
}
//3.返回计算结果
context.write(key,new LongWritable(sum));
}
1.3 驱动类
针对MapReduce的任务进行一些配置,设置Map类、Reduce类,任务输入输出路径等相关信息。
public static void main(String[] args) throws
IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置任务的名称
job.setJobName("StudentScoreCount");
//设置任务执行的主类
job.setJarByClass(StudentScoreMain.class);
//设置reduce任务的个数
job.setNumReduceTasks(1);
//设置Map类
job.setMapperClass(StudentScoreMapper.class);
//设置combiner,在Map端执行reduce任务
//job.setCombinerClass(StudentScoreReducer.class);
//设置Reduce类
job.setReducerClass(StudentScoreReducer.class);
//设置输出key的类
job.setOutputKeyClass(Text.class);
//设置输出value的类
job.setOutputValueClass(LongWritable.class);
//如果Map和Reduce的输出类型不一致,需要单独对map设置输出key和value的类型
//同时注释setCombinerClass方法
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Map输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置Reduce输处路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
1.4 执行
将编写的代码进行打包,发送到hadoop的集群,提交MapReduce任务
hadoop jar hadoop-training-1.0-SNAPSHOT.jar \
com.hadoop.mapreduce.StudentScoreMain /data/student \
/data/studentscore/
其中,/data/student为任务所需要处理的路径,/data/studentscore/ 为最终结果输出路径,需要保证这个文件在HDFS上不存在。
查看输出文件夹下的文件,其中,以part开始的文件为最终的数据结果
查看任务执行结果
查看任务的基本信息,执行实践为20sec
1.5 源码
具体源码可以通过Github进行下载:hadoop-training
2.使用hive解决
将数据导入到hive中,使用类SQL语句进行统计分析
2.1 创建数据库
create database hive_2020;
查看创建的数据库
show databases;
2.2 创建表结构
根据文本文件的内容格式,每行数据根据逗号(,)分割,每个字段使用string类型进行存储,建立hive对应的表,建表语句如下:
create table hive_2020.student(
sno string,
name string,
gender string,
age int,
class string,
semester1 String,
semester2 String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as textfile;
建表语句中hive_2020为刚创建的数据库名称,这样可以直接看到表所在的数据库,也可以使用use hive_2020,进入到指定的数据库中,然后再执行建表语句;建表语句后面的2句分别制定了文件内容分割符号和存储文件的格式。
需要特别说明一下 :hive的分割符目前只能支持单字符,对于多字符需要特殊处理。目前通用的方式主要有2种:
- 使用单字符分割后,然后再使用数据时,将数据在进行额外的处理
- 重新实现hive的InputFormat方法,使其支持多字符分割
2.3 导入数据文件
可以将本地文件或者hdfs上的文件导入到hive,命令执行方式如下:
#将本地文件导入hive
load data local inpath '本地路径' into table student;
#将HDFS文件导入hive
load data inpath 'HDFS路径' into table student;
为了方便,选择将本地文件直接导入hive的表中
- 将本地文件导入到hive中
load data local inpath '/root/2020/01/student' into table student;
导入成功后,显示“OK”信息
-
查看表中数据
2.4 准备分析
hive的语句基本上与SQL语句保持一直,如何解决上述问题就转换成:如果使用关系型数据库我们怎么解决?
- 将每个学生第一学期的数据成绩拆分出来,使用split函数,以&进行拆分
- 获取每个学生第一学期的数据成绩,使用split函数,以=进行拆分
- 如果数学成绩大于等于60,则赋值为1,否则赋值为0
- 统计1的个数作为及格的人数,使用总数减去及格的人数,作为不合格的人数
首先,放上最终Hive 执行的语句
select
sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_pass ,
count(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) - sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_not_pass
from student;
然后,放上Hive的执行结果
最后,一步一步的分析一下Hive的执行语句:
- 从整体上看,该语句从student表中查询两个属性;
- 第一个属性是count,第二个属性是sum值减去第一个属性值,从而可以简单的任务,一个是计算count值,另外一个是计算sum值;
- count和sum函数的计算内容相同,分析他们共同内容,if(split(split(semester1,'&')[0],'=')[1]>=60,1,0);
- 使用if函数和两个split函数,最内层的split函数用于拆分第一个学期学科成绩,第二个split用于获取数学成绩,if函数用于判断数学成绩是否大于等于60,如果大于等于60,则赋值为1,否则赋值为0;
- 则count用于计算1的个数,sum则是用于计算总数。
2.5 执行
一些hive的命令会转化成haooop的MapRecude任务去执行,查看该任务的基本执行情况
PS:一些常用的命令:
查看hive中的所有指定函数:show functions;
查看hive中的指定函数使用方法:describe function 方法名
3.使用Spark解决
spark源码是由scala语言进行编码的,Spark任务是使用scala语言进行编写,没有scala语言基础的同学,需要对scala有一定的了解,才能更好的完成。
3.1 编写
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark Student Score")
.master("local[2]")
.getOrCreate()
val outKeyPass = "Math_Score_Pass";
val outKeyNotPass = "Math_Score_Not_Pass";
val sc :SparkContext = spark.sparkContext val lines = sc.textFile(args(0))
// 数据格式: 200412169,gavin,male,30,0401,math=24&en=60&c=85&os=78,math=22&en=20&c=85&os=78
val studentScoreCount = lines
.map(_.split(","))
.filter(_.length == 7)
.map(t => t(5))
.map(_.split("&"))
.filter(_.length == 4)
.map(t => t(0))
.map(_.split("="))
.filter(_.length ==2)
.map(t => if (t(1).toInt >= 60) (outKeyPass,1) else (outKeyNotPass,1) )
.reduceByKey(_+_)
.saveAsTextFile(args(1))
}
使用RDD的方式,执行思路与Hadoop的思路基本一致。使用逗号分割获取第一学期的成绩,使用&分割,获取数学成绩,判断数据成绩是否大于等于60,如果满足,则生成("Math_Score_Pass",1),否则,生成("Math_Score_Not_Pass",1),然后使用reduceByKey将所有的结果进行累加,最后保存到HDFS的指定目录。
3.2 执行
spark-submit \
--master local \
--class com.spark.StudentScore \
original-spark-training-1.0-SNAPSHOT.jar \
/data/student /data/spark/studentscore/
3.3 查看执行结果
查看 hdfs结果结果目录::
hdfs dfs -ls /data/spark/studentscore/
查看HDFS结果内容:
hdfs dfs -cat /data/spark/studentscore/par*
在结果目录中,发现产生了2个part-0000* 的文件,我们要看一下 具体是什么原因产生的:
查阅相关资料,发现从hdfs中读取文件,源码默认的分区数是2,分区数决定最终的结果
在默认的textfile中,如果从hdfs中读取文件,源码中默认的分区数是2,如果想改变分区数,可以在textfile中设置第二个参数“分区数”
查看textFile源码
查看hadoopFile源码
其中defaultMinPartitions为定义的默认分区数:
3.4 源码
spark的实现代码可以在github上进行下载spark-training