目录
一.Spark SQL简介
二.Spark SQL的特点
三.基本概念:表:(Datasets或DataFrames)
1.表 = 表结构 + 数据
2.DataFrame
3.Datasets
四.创建DataFrames
1.第一种方式:使用case class样本类创建DataFrames
2.第二种方式:使用SparkSession
3.方式三,直接读取一个带格式的文件:Json
五.操作DataFrame
1.DSL语句
2.SQL语句
3.多表查询
六.视图
1.视图是一个虚表,不存储数据
2.两种类型视图:
七.创建Datasets
1.方式一:使用序列
2.方式二:使用JSON数据
3.方式三:使用其他数据(RDD的操作和DataFrame操作结合)
八.Datasets的操作案例
1.使用emp.json 生成DataFrame
2.多表查询
一.Spark SQL简介
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
为什么要学习Spark SQL?Hive,它将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。
二.Spark SQL的特点:
1.容易整合(集成):
安装Spark的时候,已经集成好了。不需要单独安装
2.统一的数据访问方式
JDBC、JSON、Hive、parquet文件(一种列式存储文件,是SparkSQL默认的数据源)
3.兼容Hive:
可以将Hive中的数据,直接读取到Spark SQL中处理。
4.标准的数据连接:JDBC
三.基本概念:表:(Datasets或DataFrames)
1.表 = 表结构 + 数据
DataFrame = Schema(表结构) + RDD(代表数据)
2.DataFrame
DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,
例如:
- 结构化数据文件
- hive中的表
- 外部数据库或现有RDDs
DataFrame API支持的语言有Scala,Java,Python和R
从上图可以看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化
3.Datasets
Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。
四.创建DataFrames
1.第一种方式:使用case class样本类创建DataFrames
(1)定义表的Schema
注意:由于mgr和comm列中包含null值,简单起见,将对应的case class类型定义为String
scala> case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,depno:Int)
(2)读入数据
emp.csv
7369,SMITH,CLERK,7902,1980-12-17,800,0,20
7499,MARIB,SALESMAN,7901,1980-11-17,900,300,30
7450,CLARK,SALESMAN,7900,1980-10-17,900,500,30
7379,JACKE,MANAGER,7903,1980-11-11,400,0,20
7343,LARRL,CLERK,7902,1980-12-13,500,1400,30
7312,NHGJJ,SALESMAN,7904,1980-12-14,100,0,30
7343,SHJOI,MANAGER,7905,1980-12-15,200,0,10
7390,WJKLJ,ANALYST,7906,1980-12-16,300,0,20
7377,UIHKL,SALESMAN,7907,1980-12-18,400,0,10
7388,VHKJK,CLERK,7908,1980-12-19,400,0,20
//从hdfs中读入
scala> val lines = sc.textFile("hdfs://hadoop1:9000/emp.csv").map(_.split(","))
//从本地读入
scala> val lines = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))
/opt/module/datas/TestFile
(3)把每行数据映射到Emp中。把表结构和数据,关联。
scala> val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(4)生成DataFrame
scala> val allEmpDF = allEmp.toDF
//展示
scala> allEmpDF.show
2.第二种方式:使用SparkSession
(1)什么是SparkSession
Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。
(2)使用StructType,来创建Schema
import org.apache.spark.sql.types._
val myschema = StructType(
List(
StructField("empno", DataTypes.IntegerType),
StructField("ename", DataTypes.StringType),
StructField("job", DataTypes.StringType),
StructField("mgr", DataTypes.IntegerType),
StructField("hiredate", DataTypes.StringType),
StructField("sal", DataTypes.IntegerType),
StructField("comm", DataTypes.IntegerType),
StructField("deptno", DataTypes.IntegerType)))
注意,需要:import org.apache.spark.sql.types._
(3)读取文件:
val lines= sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))
(4)数据与表结构匹配
import org.apache.spark.sql.Row
val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
注意,需要:import org.apache.spark.sql.Row
(5)创建DataFrames
val df2 = spark.createDataFrame(allEmp,myschema)
df2.show
3.方式三,直接读取一个带格式的文件:Json
(1)读取文件people.json:
people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
val df3 = spark.read.json("/opt/TestFolder/people.json")
df3.show
(2)另一种方式
val df4 = spark.read.format("json").load("/opt/TestFolder/people.json")
df4.show
五.操作DataFrame
DataFrame操作也称为无类型的Dataset操作
1.DSL语句
(1)展示表与展示表结构
//展示表
df1.show
//展示表结构
df1.printSchema
(2)查询
df1.select("ename","sal").show
df1.select($"ename",$"sal",$"sal"+100).show
(3)$代表 取出来以后,再做一些操作
df1.filter($"sal">2000).show
df1.groupBy($"depno").count.show
完整的例子,请参考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset
2.SQL语句
注意:不能直接执行sql。需要生成一个视图,再执行SQL。
(1)将DataFrame注册成表(视图):
df1.createOrReplaceTempView("emp")
(2)执行查询:
spark.sql("select * from emp").show
spark.sql("select * from emp where sal > 2000").show
spark.sql("select * from emp where depno=10").show
spark.sql("select depno,count(1) from emp group by depno").show
spark.sql("select depno,sum(sal) from emp group by depno").show
df1.createOrReplaceTempView("emp12345")
spark.sql("select e.depno from emp12345 e").show
3.多表查询
dept.csv文件内容
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
命令
case class Dept(deptno:Int,dname:String,loc:String)
val lines = sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))
val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
val df2 = allDept.toDF
df2.create
df2.createOrReplaceTempView("dept")
spark.sql("select dname,ename from emp12345,dept where emp12345.depno=dept.deptno").show
六.视图
1.视图是一个虚表,不存储数据
2.两种类型视图:
(1)普通视图(本地视图):只在当前Session有效
(2)全局视图:在不同Session中都有用。全局视图创建在命名空间中:global_temp 类似于一个库。
上面使用的是一个在Session生命周期中的临时views。在Spark SQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。
(a)创建一个普通的view和一个全局的view
df1.createOrReplaceTempView("emp1")
df1.createGlobalTempView("emp2")
(b)在当前会话中执行查询,均可查询出结果。
spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show
(c)开启一个新的会话,执行同样的查询
spark.newSession.sql("select * from emp1").show //(运行出错)
spark.newSession.sql("select * from global_temp.emp2").show
七.创建Datasets
DataFrame的引入,可以让Spark更好的处理结构数据的计算,但其中一个主要的问题是:缺乏编译时类型安全。为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。
Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)
1.方式一:使用序列
(1)定义case class
scala >case class MyData(a:Int,b:String)
(2).生成序列,并创建DataSet
scala >val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
(3).查看结果
scala >ds.show
ds.collect
2.方式二:使用JSON数据
(1)定义case class
case class Person(name: String, age: BigInt)
(2)通过JSON数据生成DataFrame
val df = spark.read.format("json").load("/opt/module/datas/TestFile/people.json")
(3)将DataFrame转成DataSet
df.as[Person].show
df.as[Person].collect
3.方式三:使用其他数据(RDD的操作和DataFrame操作结合)
(1)需求:分词;查询出长度大于3的单词
(a)读取数据,并创建DataSet
val linesDS = spark.read.text("/opt/module/datas/TestFile/test_WordCount.txt").as[String]
(b)对DataSet进行操作:分词后,查询长度大于3的单词
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
(2)需求:执行WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
result.show
排序:
result.orderBy($"value").show
result.orderBy($"count(1)").show
八.Datasets的操作案例
1.使用emp.json 生成DataFrame
(1)数据:emp.json
(2)使用emp.json 生成DataFrame
val empDF = spark.read.json("/opt/module/datas/TestFile/emp.json")
emp.show
查询工资大于3000的员工
empDF.where($"sal" >= 3000).show
(3)创建case class,生成DataSets
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
val empDS = empDF.as[Emp]
(4)查询数据
//查询工资大于3000的员工
empDS.filter(_.sal > 3000).show
//查看10号部门的员工
empDS.filter(_.deptno == 10).show
2.多表查询
(1)创建部门表
val deptRDD=sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS.show
(2)创建员工表
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
val empRDD = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))
val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
empDS.show
(3)执行多表查询:等值链接
val result = deptDS.join(empDS,"deptno")
result.show
(4)另一种写法:注意有三个等号
val result1 = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
result1.show
joinWith和join的区别是连接后的新Dataset的schema会不一样
(5)多表条件查询:
val result = deptDS.join(empDS,"deptno").where("deptno==10")
result.show