一、定义:
A Dataset is a distributed collection of data.
A DataFrame is a Dataset organized into named columns.It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
1.3版本之前还没有DataFrame的概念,之前叫做SchemaRDD
DataSet是1.6版本开始引入的
二、RDD和DF的异同点:
1.RDD和DataFrame都是分布式的数据集,支持并行运算
2.数据结构方面:和RDD相比,DataFrame包含数据和schema(RDD没有schema),可以理解为一个关系型数据库的表格,暴露的信息更多,SparkSQL可以进行更多的优化
3.API方面:DataFrame提供的API比RDD更为丰富
4.使用DF编程,不管是用什么语言,底层的执行性能是一样的;但是如果使用RDD,用不同的语言,执行性能的差别很大,因为他们是依赖于自身的运行时环境的,比如java/scala和python语言,jvm和python的运行环境是完全不同的
三、DF和DS的异同点:
1.The Dataset API is available in Scala and Java;
The DataFrame API is available in Scala, Java, Python and R
2.In the Scala API, DataFrame
is simply a type alias of Dataset[Row]
DF可看做DS的一个特例,DS是一个强类型
假设因为输入错误,在SQL中写了一句"seletc * from XXX",SQL在运行时才会报错,而对于DF和DS,因输入错误写"df.seletc("name")"或"ds.seletc("name")"在编译时就已经发现错误;
假设再因为输入错误,在SQL中写了一句"select nname from XXX",而XXX表里并没有nname这一列(其实列名为"name"),SQL在运行时才会报错;对于DF,写成"df.select("nname")",在编译时不会报错,在运行时才报错;而对于DS,写成"ds.select("nname")",在编译时就已经报错,因为DS是强类型。
Analysis Errors在一个分布式的job开始之前越早暴露越好,如果资源申请好了,运行时才发现错误,之前所做的准备工作就都白费了,强类型就有这点好处
四、SparkSQL 入口点
老版本(2.x之前):SQLContext
新版本(2.x之后):SparkSession
那么SparkSession如何构建呢?
val spark = SparkSession
.builder()
.appName("SparkSQLApp")
.master("local[2]")
.getOrCreate()
// .enableHiveSupport() 有这个就可以访问Hive里的东西了
如何创建DataFrame呢?
[hadoop@hadoop001 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar
scala> val df = spark.read.format("json").load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select('name).show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select($"name",$"age"+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.filter($"age">21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.groupBy("age").count().show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
df.createOrReplaceTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.createGlobalTempView("people")
scala> spark.sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+