一个大数据的例子

最近参加了一个大数据开发的培训,整理一下在培训过程中,老师一直说的一个案例。
案例比较简单,使用MapReduce、hive、Spark等框架进行计算,对框架有一个简单的了解。
现在对这个案例进行一个简单的整理,方便后期学习。

数据

原始数据

原始数据

数据说明

数据按照逗号(,)分割,每列数据说明如下:
序号 说明 备注
1 学号
2 姓名
3 性别
4 年龄
5 上学期成绩 学科成绩以&分割,学科=成绩
6 下学期成绩 学科成绩以&分割,学科=成绩

需求

统计第一学期数学成绩及格和不及格的人数

解决方案

数据结构相对比较简单,分析每行数据中的第一学期数学成绩,
判断其中数据成绩是否及格,如果及格,则统计及格的人数,不及格统计不及格的人数。

0.准备工作

在正式开始这项工作时,首先要保证自己的环境可以正常运行,hadoop集群、mysql、hive、spark集群可以正常运行。

0.1环境准备

进入hadoop的sbin目录:
1.启动DFS

./start-dfs.sh

2.启动yarn

 ./start-yarn.sh

3.启动hive,进入hive的bin目录

#启动mysql
service mysql start
# 启动hive的metastore服务
hive --service metastore &
# 启动hive的hiveserver2服务
hive --service hiveserver2 &

4.启动spark:进入spark的sbin目录

start-all.sh

0.2 将需要分析的数据上传到服务器和HDFS中

将数据存放到student文件,进入student文件所在目录,将文件上传到HDFS的/data/目录下

    hdfs dfs -put student /data/

查看HDFS文件是否上传成功


数据上传成功

1.使用MapReduce解决

MapReduce的思路就是将所需要进行计算的数据,拆分到不同的机器上,
然后再不同的机器上计算,将不同机器上的结果汇总到一起,然后再进行计算。
在不同机器上进行计算的过程,通常称为Map阶段,汇总结果进行计算的过程,通常称为Reduce阶段。

结合MapReduce计算的基本思路,MapReduce实现一个任务主要分为Map和reduce两部分。
再实现的过程中,主要完成Map和Reduce的实现过程即可。
当然,一个MapReduce的任务,必须要有一个驱动主类将Map和Reduce调起方能执行。

1.1 Map

新建StudentScoreMapper,需要继承org.apache.hadoop.mapreduce.Mapper,并指定Map的输入输出类型,共有四个参数,前两个为输入数据类型,后两个为输出数据类型

    public class StudentScoreMapper extends Mapper<Object,Text, Text, IntWritable>

Map阶段具体的执行逻辑时再map方法中实现的,故需要重写父类的Map方法,具体思路:

  1. 获取第一学期的成绩
  2. 获取该学期的数据成绩
  3. 判断数学成绩是否及格,如果及格,则使用输出<"Math_Score_Pass",1>,否则输出<"Math_Score_Not_Pass",1>
@Override
protected void map(Object key, Text value, Context context) 
    throws IOException, InterruptedException {   
    // 1. valueIn为文件中每行的数据,使用split方法进行分割    
    String[] vals = value.toString().split(",",-1);    
    // 2.每行数据中,倒数第二列为第一学期成绩    
    if(vals.length !=6 ){return;}   
    String scoreStr = vals[5];    
    // 3.每门成绩按照&分割    
    String[] scores = scoreStr.split("&");    
    if(scores.length != 4){return;}    
    // 4.获取数学成绩,学科成绩按照=分割    
    String mathScoreStr = scores[0];    
    String[] mathScore = mathScoreStr.split("=");    
    if(mathScore.length != 2){return;}    
    String score = mathScore[1];    
    // 5.判断成绩是否及格    int temp = Integer.parseInt(score);    
    if(temp >= 60){        
        context.write(outKeyPass,new IntWritable(1));    
    }else{        
        context.write(outKeyNotPass,new IntWritable(1));    
    }
}

1.2 Reduce

新建StudentScoreReducer类,继承org.apache.hadoop.mapreduce.Reducer,同时指定Reduce的输入和输出类型,其中输入类型与Map的输出类型保持一致。

public class StudentScoreReducer extends Reducer<Text, IntWritable,Text, LongWritable>

Reduce阶段具体的处理逻辑需要重写reduce方法,具体思路:就是把Map的输出结果,累加即可。

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 
InterruptedException {    
    //1.经过shuffle之后 相同key的结果value 组成一个Iterable,作为reduce的输入参数进行计算    
    //2.遍历Iterable,将期所有的val进行累加    
    long sum = 0L;    
    for(IntWritable val : values){
        sum += val.get();    
    }   
    //3.返回计算结果    
    context.write(key,new LongWritable(sum));
}

1.3 驱动类

针对MapReduce的任务进行一些配置,设置Map类、Reduce类,任务输入输出路径等相关信息。

public static void main(String[] args) throws 
IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //设置任务的名称
    job.setJobName("StudentScoreCount");
    //设置任务执行的主类
    job.setJarByClass(StudentScoreMain.class);
    //设置reduce任务的个数
    job.setNumReduceTasks(1);
    //设置Map类
    job.setMapperClass(StudentScoreMapper.class);
    //设置combiner,在Map端执行reduce任务
    //job.setCombinerClass(StudentScoreReducer.class);
    //设置Reduce类
    job.setReducerClass(StudentScoreReducer.class);
    //设置输出key的类
    job.setOutputKeyClass(Text.class);
    //设置输出value的类
    job.setOutputValueClass(LongWritable.class);
    //如果Map和Reduce的输出类型不一致,需要单独对map设置输出key和value的类型
    //同时注释setCombinerClass方法
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    //设置Map输入路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
    //设置Reduce输处路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

1.4 执行

将编写的代码进行打包,发送到hadoop的集群,提交MapReduce任务

hadoop jar hadoop-training-1.0-SNAPSHOT.jar \
com.hadoop.mapreduce.StudentScoreMain /data/student \
/data/studentscore/

其中,/data/student为任务所需要处理的路径,/data/studentscore/ 为最终结果输出路径,需要保证这个文件在HDFS上不存在。

查看输出文件夹下的文件,其中,以part开始的文件为最终的数据结果


结果文件目录

查看任务执行结果


任务执行结果

查看任务的基本信息,执行实践为20sec


任务的基本信息

1.5 源码

具体源码可以通过Github进行下载:hadoop-training

2.使用hive解决

将数据导入到hive中,使用类SQL语句进行统计分析

2.1 创建数据库

    create database hive_2020; 

查看创建的数据库

    show databases;
数据库创建成功

2.2 创建表结构

根据文本文件的内容格式,每行数据根据逗号(,)分割,每个字段使用string类型进行存储,建立hive对应的表,建表语句如下:

create  table hive_2020.student(
    sno string,
    name string,
    gender string,
    age int,
    class string,
    semester1 String,
    semester2 String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as textfile;

建表语句中hive_2020为刚创建的数据库名称,这样可以直接看到表所在的数据库,也可以使用use hive_2020,进入到指定的数据库中,然后再执行建表语句;建表语句后面的2句分别制定了文件内容分割符号和存储文件的格式。

需要特别说明一下 :hive的分割符目前只能支持单字符,对于多字符需要特殊处理。目前通用的方式主要有2种:

  1. 使用单字符分割后,然后再使用数据时,将数据在进行额外的处理
  2. 重新实现hive的InputFormat方法,使其支持多字符分割

2.3 导入数据文件

可以将本地文件或者hdfs上的文件导入到hive,命令执行方式如下:

#将本地文件导入hive
load data local inpath '本地路径' into table student;
#将HDFS文件导入hive
load data  inpath 'HDFS路径' into table student;

为了方便,选择将本地文件直接导入hive的表中

  1. 将本地文件导入到hive中
    load data local inpath '/root/2020/01/student' into table student;

导入成功后,显示“OK”信息


hive数据导入成功
  1. 查看表中数据


    hive表中数据

2.4 准备分析

hive的语句基本上与SQL语句保持一直,如何解决上述问题就转换成:如果使用关系型数据库我们怎么解决?

  1. 将每个学生第一学期的数据成绩拆分出来,使用split函数,以&进行拆分
  2. 获取每个学生第一学期的数据成绩,使用split函数,以=进行拆分
  3. 如果数学成绩大于等于60,则赋值为1,否则赋值为0
  4. 统计1的个数作为及格的人数,使用总数减去及格的人数,作为不合格的人数

首先,放上最终Hive 执行的语句

    select 
    sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_pass , 
    count(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) - sum(if(split(split(semester1,'&')[0],'=')[1]>=60,1,0)) as math_score_not_pass
    from student;

然后,放上Hive的执行结果


hive执行结果

最后,一步一步的分析一下Hive的执行语句:

  1. 从整体上看,该语句从student表中查询两个属性;
  2. 第一个属性是count,第二个属性是sum值减去第一个属性值,从而可以简单的任务,一个是计算count值,另外一个是计算sum值;
  3. count和sum函数的计算内容相同,分析他们共同内容,if(split(split(semester1,'&')[0],'=')[1]>=60,1,0);
  4. 使用if函数和两个split函数,最内层的split函数用于拆分第一个学期学科成绩,第二个split用于获取数学成绩,if函数用于判断数学成绩是否大于等于60,如果大于等于60,则赋值为1,否则赋值为0;
  5. 则count用于计算1的个数,sum则是用于计算总数。

2.5 执行

一些hive的命令会转化成haooop的MapRecude任务去执行,查看该任务的基本执行情况


hive 转换成MR任务

PS:一些常用的命令:

查看hive中的所有指定函数:show functions;
查看hive中的指定函数使用方法:describe function 方法名

3.使用Spark解决

spark源码是由scala语言进行编码的,Spark任务是使用scala语言进行编写,没有scala语言基础的同学,需要对scala有一定的了解,才能更好的完成。

3.1 编写

def main(args: Array[String]): Unit = { 
    val spark = SparkSession
    .builder()    
    .appName("Spark Student Score")    
    .master("local[2]")    
    .getOrCreate()  
    
    val outKeyPass = "Math_Score_Pass";  
    val outKeyNotPass = "Math_Score_Not_Pass";  
    
    val sc :SparkContext = spark.sparkContext  val lines = sc.textFile(args(0))  
    // 数据格式: 200412169,gavin,male,30,0401,math=24&en=60&c=85&os=78,math=22&en=20&c=85&os=78   
    val studentScoreCount = lines    
        .map(_.split(","))    
        .filter(_.length == 7)    
        .map(t => t(5))    
        .map(_.split("&"))    
        .filter(_.length == 4)    
        .map(t => t(0))    
        .map(_.split("="))    
        .filter(_.length ==2)    
        .map(t => if (t(1).toInt >= 60) (outKeyPass,1) else (outKeyNotPass,1) )    
        .reduceByKey(_+_)    
        .saveAsTextFile(args(1))
 }

使用RDD的方式,执行思路与Hadoop的思路基本一致。使用逗号分割获取第一学期的成绩,使用&分割,获取数学成绩,判断数据成绩是否大于等于60,如果满足,则生成("Math_Score_Pass",1),否则,生成("Math_Score_Not_Pass",1),然后使用reduceByKey将所有的结果进行累加,最后保存到HDFS的指定目录。

3.2 执行

    spark-submit \
    --master local \
    --class com.spark.StudentScore \
    original-spark-training-1.0-SNAPSHOT.jar \
    /data/student /data/spark/studentscore/

3.3 查看执行结果

查看 hdfs结果结果目录::

    hdfs dfs -ls /data/spark/studentscore/
HDFS结果

查看HDFS结果内容:

    hdfs dfs -cat /data/spark/studentscore/par*
HDFS结果内容

在结果目录中,发现产生了2个part-0000* 的文件,我们要看一下 具体是什么原因产生的:
查阅相关资料,发现从hdfs中读取文件,源码默认的分区数是2,分区数决定最终的结果

在默认的textfile中,如果从hdfs中读取文件,源码中默认的分区数是2,如果想改变分区数,可以在textfile中设置第二个参数“分区数”

查看textFile源码


textFile源码

查看hadoopFile源码


hadoopFile源码

其中defaultMinPartitions为定义的默认分区数:


defaultMinPartitions 参数

3.4 源码

spark的实现代码可以在github上进行下载spark-training

参考文献

[1]. spark中saveAsTextFile如何最终生成一个文件

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352