Spark 开发初步

开始着手分析数据,数据是导出成csv格式的文本文件,每一行格式如下:

1701170830490656,"2014-01-06 22:31:18.608000",8.64,1

各个列的含义依次是:地点,发生时间戳,发生重量,发生类型。我们需要按地点对发生重量进行统计排序输出,按发生类型对发生重量进行统计排序输出。

由于集群上运行时需要每个工作节点都要能访问到文件,所以在此次实践中,hdfs还没有搭建成功之前,读取本地文件分析只能用spark的local模式,重点时spark rdd的初步入门。

首先运行

./bin/spark-shell --master=local

建议先在ide中写好代码,再粘贴到shell中运行

type TeamWeightEntity =  (String, Float)

type TypeWeightEntity =  (Int,Float)

以上代码:定义了两个元组,分别用来存放(地点,发生重量)和(类型,发生重量)。

def parseLine(line:String):TeamWeightEntity= {

  val a = line.split(",")

  (a(0), a(2).toFloat)

}

def  parseLineByType(line:String):TypeWeightEntity= {

  val a = line.split(",")

  (a(3).toInt, a(2).toFloat)

}

以上代码:定义了两个每行记录的解析函数。

val filePath ="/home/zhujianfeng/team_mw.csv"

val linesRDD = sc.textFile(filePath)

以上代码:指明了数据文件路径,并加载到spark中形成RDD。

我在mac上实践时,路径稍有不同,执行结果如同这样:

接下来,我们对这个rdd进行处理:

val weightRDD = linesRDD.map(parseLine).cache()

以上代码,我们得到一个存储了地点、发生重量的RDD,并将其缓存起来。

val sumWeight = weightRDD.reduceByKey((a,b)=>a+b).sortBy(e=>e._2,false)

以上代码做了两个操作,首先是根据key(也就是地点)进行reduce,相同的key的操作是值相加。接着对产生的中间结果进行排序,sortBy两个参数,一个是排序依据的函数,一个是表示倒序的布尔值。

最后组织输出:

sumWeight.collect()

同样的方式我们可以再做一次,形成类型统计的结果:

val typeWeight = linesRDD.map(parseLineByType).cache()

val sumWeight2 = typeWeight.reduceByKey((a,b)=>a+b).sortBy(e=>e._2,false)

sumWeight2.collect()



接下来使用spark sql方式分析数据,在此之前先将文件放到hdfs中。首先要定义读取存储数据记录的类。

case class TeamWeight(tid:String,recTs:String,mwWeight:Float,mwType:Int)

接着读取文件,处理并形成DataFrame。

val filePath ="hdfs://Y40/medical_waste/team_mw.csv"

import spark.sqlContext.implicits._

val teamDF = sc.textFile(filePath).map(_.split(",")).map(attributes =>TeamWeight(attributes(0), attributes(1), attributes(2).toFloat, attributes(3).toInt)).toDF()

teamDF.createOrReplaceTempView("t_team_weight")

以上代码import是必须的,否则RDD会没有toDF()方法。逻辑很简单,分析每行,创建TeamWeight
对象数组的RDD,然后转成DataFrame。

val df1 = spark.sql("SELECT tid,SUM(mwWeight) AS sumWeight FROM t_team_weight GROUP BY tid ORDER BY sumWeight DESC")

df1.show()

以上代码就是执行sql语句,并展示查询结果。

同样,类型的汇总排行查代码如下:

val df2 = spark.sql("SELECT mwType,SUM(mwWeight) AS sumWeight FROM t_team_weight GROUP BY mwType ORDER BY sumWeight DESC")

df2.show()


最后,我们实践一下部署过程,将代码编译后大成jar包,注意因为没用到spark之外的库,所以只要把代码打包就行了,IDEA的设置如下图:


将打包好的jar文件上传到集群中的任何一台机器,然后运行spark-commit:

./bin/spark-submit --master spark://Y40:7077 /home/zhujianfeng/medical_waste_spark.jar

由于这个jar打包时已经指定了main class,所以不需要再加--class参数,如果需要执行另一个类,就要加参数指定类名了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容