开始着手分析数据,数据是导出成csv格式的文本文件,每一行格式如下:
1701170830490656,"2014-01-06 22:31:18.608000",8.64,1
各个列的含义依次是:地点,发生时间戳,发生重量,发生类型。我们需要按地点对发生重量进行统计排序输出,按发生类型对发生重量进行统计排序输出。
由于集群上运行时需要每个工作节点都要能访问到文件,所以在此次实践中,hdfs还没有搭建成功之前,读取本地文件分析只能用spark的local模式,重点时spark rdd的初步入门。
首先运行
./bin/spark-shell --master=local
建议先在ide中写好代码,再粘贴到shell中运行
type TeamWeightEntity = (String, Float)
type TypeWeightEntity = (Int,Float)
以上代码:定义了两个元组,分别用来存放(地点,发生重量)和(类型,发生重量)。
def parseLine(line:String):TeamWeightEntity= {
val a = line.split(",")
(a(0), a(2).toFloat)
}
def parseLineByType(line:String):TypeWeightEntity= {
val a = line.split(",")
(a(3).toInt, a(2).toFloat)
}
以上代码:定义了两个每行记录的解析函数。
val filePath ="/home/zhujianfeng/team_mw.csv"
val linesRDD = sc.textFile(filePath)
以上代码:指明了数据文件路径,并加载到spark中形成RDD。
我在mac上实践时,路径稍有不同,执行结果如同这样:
接下来,我们对这个rdd进行处理:
val weightRDD = linesRDD.map(parseLine).cache()
以上代码,我们得到一个存储了地点、发生重量的RDD,并将其缓存起来。
val sumWeight = weightRDD.reduceByKey((a,b)=>a+b).sortBy(e=>e._2,false)
以上代码做了两个操作,首先是根据key(也就是地点)进行reduce,相同的key的操作是值相加。接着对产生的中间结果进行排序,sortBy两个参数,一个是排序依据的函数,一个是表示倒序的布尔值。
最后组织输出:
sumWeight.collect()
同样的方式我们可以再做一次,形成类型统计的结果:
val typeWeight = linesRDD.map(parseLineByType).cache()
val sumWeight2 = typeWeight.reduceByKey((a,b)=>a+b).sortBy(e=>e._2,false)
sumWeight2.collect()
接下来使用spark sql方式分析数据,在此之前先将文件放到hdfs中。首先要定义读取存储数据记录的类。
case class TeamWeight(tid:String,recTs:String,mwWeight:Float,mwType:Int)
接着读取文件,处理并形成DataFrame。
val filePath ="hdfs://Y40/medical_waste/team_mw.csv"
import spark.sqlContext.implicits._
val teamDF = sc.textFile(filePath).map(_.split(",")).map(attributes =>TeamWeight(attributes(0), attributes(1), attributes(2).toFloat, attributes(3).toInt)).toDF()
teamDF.createOrReplaceTempView("t_team_weight")
以上代码import是必须的,否则RDD会没有toDF()方法。逻辑很简单,分析每行,创建TeamWeight
对象数组的RDD,然后转成DataFrame。
val df1 = spark.sql("SELECT tid,SUM(mwWeight) AS sumWeight FROM t_team_weight GROUP BY tid ORDER BY sumWeight DESC")
df1.show()
以上代码就是执行sql语句,并展示查询结果。
同样,类型的汇总排行查代码如下:
val df2 = spark.sql("SELECT mwType,SUM(mwWeight) AS sumWeight FROM t_team_weight GROUP BY mwType ORDER BY sumWeight DESC")
df2.show()
最后,我们实践一下部署过程,将代码编译后大成jar包,注意因为没用到spark之外的库,所以只要把代码打包就行了,IDEA的设置如下图:
将打包好的jar文件上传到集群中的任何一台机器,然后运行spark-commit:
./bin/spark-submit --master spark://Y40:7077 /home/zhujianfeng/medical_waste_spark.jar
由于这个jar打包时已经指定了main class,所以不需要再加--class参数,如果需要执行另一个类,就要加参数指定类名了。