Spark SQL

简介

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame(底层也是RDD)并且作为分布式SQL查询引擎的作用。

引入

Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成特殊的RDD(DataFrame),然后提交到集群执行,执行效率非常快!



数据分析的2种方式:

①:命令式:操作面向过程和算法性的处理,(可以解决非结构化数据)

②:SQL擅长数据分析和通过简单的语法查询

总结:SparSQL是一个为了支持SQL而设计的工具,同时也支持命令式的API

数据处理选型(应用场景)

Spark 的RDD主要用于处理 非结构和数据 和半结构化数据

Spark中的SQL主要用于处理 结构化数据(较为规范的半结构化数据也可以处理)

SparkSQL的数据抽象

DataFrame

DataFrame是特殊得RDD

DataFrame是一个分布式的表

DataFrame ==> RDD -(泛型) + Schema(列约束信息)+方便的SQL操作+优化

DataSet

DataSet是特殊得DataFrame,DataFrame是特殊得RDD

DataSet是一个分布式的表

DataSet ==> DataFrame+泛型

DataSet == > RDD + Schema +方便的SQL操作 +优化

区别

相互间的转换

DataFrame = RDD - 泛型 +Schema + SQL + 优化

DataSet = RDD +Schema + SQL +优化

DataSet = DataFrame +泛型 +优化

版本

RDD(Spark1.0) -> DataFrame(Saprk1.3) -> DataSet(Spark1.6)

Spark SQL 入门

●在spark2.0之后

这些都统一于SparkSession,SparkSession 封装了SqlContext及HiveContext,实现了SQLContext及HiveContext所有功能

通过SparkSession还可以获取到SparkConetxt

创建DataFrame

1.在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上。vim /root/person.txt

上传数据文件到HDFS上:

hadoop fs -put /root/person.txt  /

2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割

先执行

/opt/soft/spark/bin/spark-shell

val lineRDD= sc.textFile("hdfs://node01:8020/person.txt").map(_.split(" ")) 

3.定义case class(相当于表的schema)

case class Person(id:Int, name:String, age:Int)

4.将RDD和case class关联

val personRDD = lineRDD.map(x =>Person(x(0).toInt, x(1), x(2).toInt))        //RDD[Person]

5.将RDD转换成DataFrame

val personDF = personRDD.toDF

6.查看数据和schema

personDF.show

personDF.printSchema

7.注册表

personDF.createOrReplaceTempView("t_person")

8.执行SQL

spark.sql("select id,name from t_person

where id > 3").show

9.也可以通过SparkSession构建DataFrame

val dataFrame=spark.read.text("hdfs://node-01:9000/person.txt") //

dataFrame.show //直接读取的文本文件没有schema信息

创建DataSet

1.通过spark.createDataset创建

val fileRdd = sc.textFile("hdfs://node-01:9000/person.txt"

val ds1 = spark.createDataset(fileRdd)  //DataSet[String]读取普通文本,需要添加schema

ds1.show

2.通RDD.toDS方法生成DataSet

case class Person(name:String, age:Int)

val data = List(Person("zhangsan",20),Person("lisi",30)) 

val dataRDD = sc.makeRDD(data)

val ds2 = dataRDD.toDS 

ds2.show

3.通过DataFrame.as[泛型]转化生成DataSet

case class Person(name:String, age:Long)

val jsonDF= spark.read.json("file:///opt/soft/spark/examples/src/main/resources/people.json")

val jsonDS = jsonDF.as[Person] 

jsonDS.show

4.DataSet也可以注册成表进行查询

jsonDS.createOrReplaceTempView("t_person")

spark.sql("select * fromt_person").show

总结

不管是DataFrame还是DataSet都可以注册成表,之后就可以使用SQL进行查询了

DEA开发Spark SQL

第1种:指定列名添加Schema

第2种:通过StructType指定Schema

第3种:编写样例类,利用反射机制推断Schema

RDD、DF、DS之间的相互转换(6种)

SQL风格

DSL风格

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容