185、Spark 2.0之SparkSession、Dataframe、Dataset开发入门

Spark SQL介绍

Spark SQL是Spark的一个模块,主要用于处理结构化的数据。与基础的Spark RDD API不同的是,Spark SQL的接口会向提供更多的信息,包括数据结构以及要执行的计算操作等。在Spark SQL内部,会使用这些信息执行一些额外的优化。使用Spark SQL有两种方式,包括SQL语句以及Dataset API。但是在计算的时候,无论你是用哪种接口去进行计算,它们使用的底层执行引擎是完全一模一样的。这种底层执行机制的统一,就意味着我们可以在不同的方式之间任意来回切换,只要我们可以灵活地运用不同的方式来最自然地表达我们要执行的计算操作就可以了。

Spark SQL之SQL介绍

Spark SQL的一个主要的功能就是执行SQL查询语句。Spark 2.0开始,最大的一个改变,就是支持了SQL 2003标准语法,还有就是支持子查询。Spark SQL也可以用来从Hive中查询数据。当我们使用某种编程语言开发的Spark作业来执行SQL时,返回的结果是Dataframe/Dataset类型的。当然,我们也可以通过Spark SQL的shell命令行工具,或者是JDBC/ODBC接口来访问。

Spark SQL之Dataframe/Dataset介绍

Dataset是一个分布式的数据集。Dataset是Spark 1.6开始新引入的一个接口,它结合了RDD API的很多优点(包括强类型,支持lambda表达式等),以及Spark SQL的优点(优化后的执行引擎)。Dataset可以通过JVM对象来构造,然后通过transformation类算子(map,flatMap,filter等)来进行操作。Scala和Java的API中支持Dataset,但是Python不支持Dataset API。不过因为Python语言本身的天然动态特性,Dataset API的不少feature本身就已经具备了(比如可以通过row.columnName来直接获取某一行的某个字段)。R语言的情况跟Python也很类似。

Dataframe就是按列组织的Dataset。在逻辑概念上,可以大概认为Dataframe等同于关系型数据库中的表,或者是Python/R语言中的data frame,但是在底层做了大量的优化。Dataframe可以通过很多方式来构造:比如结构化的数据文件,Hive表,数据库,已有的RDD。Scala,Java,Python,R等语言都支持Dataframe。在Scala API中,Dataframe就是Dataset[Row]的类型别名。在Java中,需要使用Dataset<Row>来代表一个Dataframe。

新的入口,SparkSession

从Spark 2.0开始,一个最大的改变就是,Spark SQL的统一入口就是SparkSession,SQLContext和HiveContext未来会被淘汰。可以通过SparkSession.builder()来创建一个SparkSession,如下代码所示。SparkSession内置就支持Hive,包括使用HiveQL语句查询Hive中的数据,使用Hive的UDF函数,以及从Hive表中读取数据等。

val spark = SparkSession
  .builder()
  .appName("Spark SQL Example")
  .master("local") 
  .config("spark.sql.warehouse.dir", "C:\\Users\\Administrator\\Desktop\\spark-warehouse")  
  .getOrCreate()

import spark.implicits._

Dataframe的untyped操作

有了SparkSession之后,就可以通过已有的RDD,Hive表,或者其他数据源来创建Dataframe,比如说通过json文件来创建。Dataframe提供了一种domain-specific language来进行结构化数据的操作,这种操作也被称之为untyped操作,与之相反的是基于强类型的typed操作。

val df = spark.read.json("people.json")
df.show()
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()

SparkSession:运行SQL查询

SparkSession的sql()函数允许我们执行SQL语句,得到的结果是一个Dataframe。
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

Dataset的typed操作

Dataset与RDD比较类似,但是非常重要的一点不同是,RDD的序列化机制是基于Java序列化机制或者是Kryo的,而Dataset的序列化机制基于一种特殊的Encoder,来将对象进行高效序列化,以进行高性能处理或者是通过网络进行传输。Dataset除了Encoder,也同时支持Java序列化机制,但是encoder的特点在于动态的代码生成,同时提供一种特殊的数据格式,来让spark不将对象进行反序列化,即可直接基于二进制数据执行一些常见的操作,比如filter、sort、hash等。

case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect()

val path = "people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

Hive操作

在Spark 2.0中,是支持读写hive中存储的数据的。但是,因为hive有较多的依赖,所以默认情况下,这些依赖没有包含在spark的发布包中。如果hive依赖可以在classpath路径中,那么spark会自动加载这些依赖。这些hive依赖必须在所有的worker node上都放一份,因为worker node上运行的作业都需要使用hive依赖的序列化与反序列化包来访问hive中的数据。

只要将hive-site.xml、hdfs-site.xml和core-site.xml都放入spark/conf目录下即可。

如果要操作Hive,那么构建SparkSession的时候,就必须启用Hive支持,包括连接到hive的元数据库,支持使用hive序列化与反序列化包,以及支持hive udf函数。如果我们没有安装hive,也是可以启用hive支持的。如果我们没有放置hive-site.xml到spark/conf目录下,SparkSession就会自动在当前目录创建元数据库,同时创建一个spark.sql.warehouse.dir参数设置的目录,该参数的值默认是当前目录下的spark-warehouse目录。在spark 2.0中,hive.metastore.warehouse.dir属性已经过时了,现在使用 spark.sql.warehouse.dir属性来指定hive元数据库的位置。

case class Record(key: Int, value: String)
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql


sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'kv1.txt' INTO TABLE src")
sql("SELECT * FROM src").show()
sql("SELECT COUNT(*) FROM src").show()

val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()

val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

其中,上述用到的数据如下

people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

kv1.txt

238 val_238
86 val_86
311 val_311
27 val_27
165 val_165
409 val_409
255 val_255
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容