之前从数据库中取出数据,都是dataset/dataframe进行处理的,与之相较rdd操作起来更加丝滑顺手,思路也更加清晰了。
0x01数据梳理&处理方式:
-
筛选和过滤
val textFile = sc.textFile ( "hdfs://xxxxxxxxx.net/xxx/xxx/xxx" ) var linesWithSpark = textFile.filter ( line => line.contains ( "[需要筛选包含的字符串]" ) )
-
对rdd中的数据做初步处理:
//此处处理为: //1.先对数据进行分割,根据数据组成形式,这里根据\t分割; //2.分割完的数据,对其中的数据做去无效字符处理; //3.尝试对各个字段字符做反序列化处理; val results = linesWithSpark.map ( line => line.split ( "\t" )( 5 ).substring(1,line.split ( "\t" )(5).length-1) ).map ( log => try { PhpUnserializer.parse ( log ).asInstanceOf [Map[String, Any]] } catch { case ex: Exception => println ( log ) } )
-
数据选择性重新聚合:
//此处处理为: //把重新处理好的rdd,最后过滤筛选入一个新的rdd; val array_spark = strSN_.map ( log => log.split ( "," ) ) val new_all = array_spark.filter ( _.length == 5 ).map ( x => (x ( 0 ), x ( 1 ), x ( 2 ), x ( 3 ),x(4) )).filter(_._2!="None").filter(_._3!="None")
-
数据统计&计算处理:
//此处处理为: //1.按照rdd中第五个元素进行统计数量,并按照大小排序; //2.结果设置为tuple形式展示; val Countbyxx = Count_all.groupBy ( _._5 ).sortBy ( _._2.toArray.length, false ).map ( line => (line._1, line._2.toArray.length) ) //此处处理为: //1.对rdd中某个tuple求方差并将结果加入新的rdd var stdev = CountList.collect().map(line=>(line._1->getStdev(line._2,ss)))
0x02提交集群
网上太多集群提交的方法,由数据打包提交到集群遇到的问题说,过程中共遇到三个问题:
- 找不到第三方Jar包:
spark-submit时 --jars /path/to/jar/xxx.jar可以直接将本地的jar包带入集群中提交。
- setmaster
如果是提交到公有集群的jar包,不再设置setmaster("local"),而是采用命令行中设置yarn-client的方式:
--master YARN-client
- spark集群读取本地文件:
方案1:将本地文件上传到hdfs读取,但是遇到第三方包无法直接对hdfs读取的文件进行操作的状况;
方案2:把本地文件同步到集群每台机器,如若是公司集群,此方案不可行;
方案3:(最佳方案)
在spark提交任务前文件读取到driver内存,即
在创建SparkContext或SparkSession之前,先读取文件到driver内存。
故最终方法为:
spark-submit --jars /path/to/jars/xxx.jar --class com.App.loliCount /path/to/jar/your-spark-subject.jar --master YARN-client
//注意有时--jars需要在第一个参数时才能提交时不报错。
Done.
#######spark就像一本好书,每次使用都有更深一次的理解。########