# 数据分析最佳实践 - spark Dataset/DataFrame数据存取及处理

0x01前言

官网上的spark with scala 的文档比较难理解,内容也特别少。初学遇到很多实际情况,会很迷茫怎么处理数据。
在此把自己踩的坑列一列,供初学者参考。
大牛请轻拍,有问题欢迎指教。

0x02 理解

其实spark不算难理解,但是加上一门不太熟悉的scala语言,整个人刚学的时候都是懵逼的。

Spark:

实际上它本身还是一个Map Reduce操作,Map负责对数据块操作,Reduce对结果汇总。只是Spark提出了RDD的概念,让我们不再需要关注底层的计算,而是更专注于数据集的操作。

它的操作后端会有一个自动的调度系统,帮我们完成计算的过程。我们不需要关心这个任务被分配给了多少台机器,怎样的计算过程,我们只需要得到计算完毕的结果。

Scala:

scala是一门神奇的语言,老司机的代码你可能根本看不懂,对,就是那种一行搞定一个算法的那种。它的面向对象和函数式并不只是表面的意思。目前理解不够深入,希望有缘能再研究研究..

Spark with scala:

scala作为spark的编程语言,相较python此类扩展性更强,更少出现某个问题卡住没办法解决的问题。

0x03 吐槽结束,正文来了!

两周的研究中,发现从示例程序入手最好理解!如有不用再去查询相关文档!

用它做了一个小功能,以下为本次实践中需求的场景,
读写存储部分:

  • 从mongodb中读数据,spark处理后入mongodb

数据处理部分:

  • 处理获取的mongodb数据及json数据。

读写存储数据部分:

首先,初上手的我,参考了一下mongodb针对spark的官网文档:
https://docs.mongodb.com/spark-connector/master/scala/write-to-mongodb/

这里有详细的方法去对mongodb数据进行读写。
对后续数据处理,这类方法在文档中最清晰明了,于是我使用的这类方法(参见上方链接datasets and SQL部分):

var sparkSession = SparkSession.builder().master("local[2]").appName("conn").config("spark.mongodb.input.uri", "mongodb://[username]:[password]@10.10.10.10:28018/[dbname].[collections]").config("spark.mongodb.output.uri", "mongodb://[username]:[password]@10.10.10.10:28018/[dbname].[collections]").getOrCreate()

因为习惯性将每个功能点放进每个函数里分别调用,这里遇到了第一个问题,

!SparkSession在一个任务中只能存在一个,多个会报冲突

如果你多个函数中均要使用这个sparkSession,可以放入公共区域或声明为共享变量以供调用。

共享变量了解一下:

    rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 

    解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差。

    另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接迟对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象,但在同一时间只有一个connection.send(record)执行,因为在同一个时间里,只有一个微批次的rdd.

实际情况中,在读出数据处理完毕后,并不是所有情况都如官网示例一样,需要存储的是简单的数据,更多时候是需要糅合的数据,此时我们可能要从多种类型转换成dataframe/rdd类型数据进行存储。

!多种类型数据转换成df/rdd类型
import sparkSession.implicits._

case class DealData(user:String,date:String,array_acc:ListBuffer[Map[String,String]],array_err:ListBuffer[Map[String,String]],array_fro:ListBuffer[Map[String,String]])

val seq_df = Seq(DealData(user,NowDate(),array_acc,array_err,array_fro)).toDF()
#这是一个将多类数据糅合进一个rdd的例子,DealData不是一个函数,是指明其类型,使用Seq必须在object中指明它包含的所有元素的类型。

val rdd = seq_df.rdd
#Dataframe to rdd

val testDF = rdd.map {line=>
      (line._1,line._2)
    }.toDF("col1","col2")
#rdd to dataframe,rdd是没有sql方法的,df和rdd的区别为:rdd是无字段名的。

MongoSpark.save(seq_df.write.option("collection","[dbname]").mode("Append"))
#以Dataframe格式保存进mongodb



看官网demo中读取出来数据,会发现,可以将读出的数据具象为一个数据库形式,去展示,筛选等操作。

但是实际上,除了show()函数检查数据是否具象成功,并不知道如何处理数据。下例或许会给你一点启示:


val df = MongoSpark.load(sparkSession) 

df.createOrReplaceTempView("[dbname]") 
#这里是你给这个虚拟的db起一个名字,需与下句中表名保持一致。

val df1 = sparkSession.sql("select user from [dbname] as l where cast(l.actTime as String) >= '2018/05/28 18:00:00' ")
#这是一个依据时间筛选的例子,df1的数据类型为DataFrame

val df_count0 = df1.groupBy("user").count()
#这里是按照user与每个user对应的数量做一个新的表。表中只有user和count两个字段,df_count0依旧是DataFrame类型。

val df_count = df_count0.orderBy("count")
#按照count排序,此时df_count是DataSet类型。

val users = df_count.rdd.map(x=>x(0)).collect()
#取df_count所有数据集中的第一列元素,将其转换成Array类型-数组类型

实际上还有一点google了很久都没有结果的数据处理需求,由spark的例子我们知道如何读取文件,但是如何处理有规律的文件,如,将json内容,转换成rdd或df的格式进行数据分析:

!json内容转换成RDD/dataframe类型。
import com.alibaba.fastjson.{JSON, JSONArray}

val json = JSON.parseObject(content)
val data = json.getJSONObject("DATA").get("data")
#处理两层包含的json数据。


row.replace("{","").replace("}","").split(",").map(row0=>try{
    val row0_array = row0.toString.split(":")
    array_map += (row0_array(0)->row0_array(1))
})
#实际上我们要做的事是把json数据转换成可处理的数据形式,
#我们需要先把数据规律性的处理成key:value形式,
#然后使用map函数对其进行指向,做成一个字典的形式,
#之后再使用Seq方法将其转换成dataframe/rdd类型就可以啦~

另外在环境搭建初期,遇到一些的环境错误Q&A分享一下~:

Q:
IDEA里运行代码时出现Error:scalac: error while loading JUnit4, Scala signature JUnit4 has wrong version expected: 5.0 found: 4.1 in JUnit4.class错误的解决办法
A:
删除test/scala/下的文件

Q:
Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class
A:
切换scala2.11.0

Q:
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
A:
scala版本和pom.xml版本不一致。

Q:
A master URL must be set in your configuration?
A:
没配置materURL,可在代码中加setMaster
val conf = new SparkConf().setAppName("WordCount").setMaster("local[]")*

Q:
res.withPipeline(Seq(Document.parse("{ $match: { 'errno' : '220' } }"))) Document未定义问题
A:
import org.bson.Document

Q:
ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.analysis.TypeCoercion$.findTightestCommonTypeOfTwo()Lscala/Function2;
A:
切换spark和spark-mongo版本为2.2版本。

欢迎大佬指出不足和认知错误,scala这个萝卜坑,有缘再蹲。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容