Spark SQL:基础

目录
一.Spark SQL简介
二.Spark SQL的特点
三.基本概念:表:(Datasets或DataFrames)
    1.表 = 表结构 + 数据
    2.DataFrame
    3.Datasets
四.创建DataFrames
    1.第一种方式:使用case class样本类创建DataFrames
    2.第二种方式:使用SparkSession
    3.方式三,直接读取一个带格式的文件:Json
五.操作DataFrame
    1.DSL语句
    2.SQL语句
    3.多表查询
六.视图
    1.视图是一个虚表,不存储数据
    2.两种类型视图:
七.创建Datasets
    1.方式一:使用序列
    2.方式二:使用JSON数据
    3.方式三:使用其他数据(RDD的操作和DataFrame操作结合)
八.Datasets的操作案例
    1.使用emp.json 生成DataFrame
    2.多表查询

一.Spark SQL简介

    Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
    为什么要学习Spark SQL?Hive,它将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。

二.Spark SQL的特点:

1.容易整合(集成):

安装Spark的时候,已经集成好了。不需要单独安装

2.统一的数据访问方式

JDBC、JSON、Hive、parquet文件(一种列式存储文件,是SparkSQL默认的数据源)

3.兼容Hive:

可以将Hive中的数据,直接读取到Spark SQL中处理。

4.标准的数据连接:JDBC

三.基本概念:表:(Datasets或DataFrames)

1.表 = 表结构 + 数据

    DataFrame = Schema(表结构) + RDD(代表数据)

2.DataFrame

    DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,

例如:

  • 结构化数据文件
  • hive中的表
  • 外部数据库或现有RDDs

DataFrame API支持的语言有Scala,Java,Python和R

    从上图可以看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化

3.Datasets

    Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。

四.创建DataFrames

1.第一种方式:使用case class样本类创建DataFrames

(1)定义表的Schema
注意:由于mgr和comm列中包含null值,简单起见,将对应的case class类型定义为String

scala> case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,depno:Int)

(2)读入数据
emp.csv

7369,SMITH,CLERK,7902,1980-12-17,800,0,20
7499,MARIB,SALESMAN,7901,1980-11-17,900,300,30
7450,CLARK,SALESMAN,7900,1980-10-17,900,500,30
7379,JACKE,MANAGER,7903,1980-11-11,400,0,20
7343,LARRL,CLERK,7902,1980-12-13,500,1400,30
7312,NHGJJ,SALESMAN,7904,1980-12-14,100,0,30
7343,SHJOI,MANAGER,7905,1980-12-15,200,0,10
7390,WJKLJ,ANALYST,7906,1980-12-16,300,0,20
7377,UIHKL,SALESMAN,7907,1980-12-18,400,0,10
7388,VHKJK,CLERK,7908,1980-12-19,400,0,20
//从hdfs中读入
scala> val lines = sc.textFile("hdfs://hadoop1:9000/emp.csv").map(_.split(","))

//从本地读入
scala> val lines = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

/opt/module/datas/TestFile

(3)把每行数据映射到Emp中。把表结构和数据,关联。

scala> val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))

(4)生成DataFrame

scala> val allEmpDF = allEmp.toDF

//展示 
scala> allEmpDF.show
2.第二种方式:使用SparkSession

(1)什么是SparkSession
    Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
    在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。

(2)使用StructType,来创建Schema

import org.apache.spark.sql.types._

val myschema = StructType(
                List(
                StructField("empno", DataTypes.IntegerType), 
                StructField("ename", DataTypes.StringType),
                StructField("job", DataTypes.StringType),
                StructField("mgr", DataTypes.IntegerType),
                StructField("hiredate", DataTypes.StringType),
                StructField("sal", DataTypes.IntegerType),
                StructField("comm", DataTypes.IntegerType),
                StructField("deptno", DataTypes.IntegerType)))

注意,需要:import org.apache.spark.sql.types._

(3)读取文件:

val lines= sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

(4)数据与表结构匹配

import org.apache.spark.sql.Row

val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

注意,需要:import org.apache.spark.sql.Row
(5)创建DataFrames

val df2 = spark.createDataFrame(allEmp,myschema)

df2.show
3.方式三,直接读取一个带格式的文件:Json

(1)读取文件people.json:
people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
val df3 = spark.read.json("/opt/TestFolder/people.json")

df3.show
image.png

(2)另一种方式

val df4 = spark.read.format("json").load("/opt/TestFolder/people.json")

df4.show

五.操作DataFrame

DataFrame操作也称为无类型的Dataset操作

1.DSL语句

(1)展示表与展示表结构

//展示表
df1.show
//展示表结构
df1.printSchema

(2)查询

df1.select("ename","sal").show

df1.select($"ename",$"sal",$"sal"+100).show

(3)$代表 取出来以后,再做一些操作

df1.filter($"sal">2000).show

df1.groupBy($"depno").count.show

完整的例子,请参考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset

2.SQL语句

注意:不能直接执行sql。需要生成一个视图,再执行SQL。

(1)将DataFrame注册成表(视图):

df1.createOrReplaceTempView("emp")

(2)执行查询:

spark.sql("select * from emp").show

spark.sql("select * from emp where sal > 2000").show
spark.sql("select * from emp where depno=10").show

spark.sql("select depno,count(1) from emp group by depno").show 

spark.sql("select depno,sum(sal) from emp group by depno").show
image.png
df1.createOrReplaceTempView("emp12345")

spark.sql("select e.depno from emp12345 e").show
3.多表查询

dept.csv文件内容

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

命令

case class Dept(deptno:Int,dname:String,loc:String)

val lines = sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))

val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))

val df2 = allDept.toDF

df2.create

df2.createOrReplaceTempView("dept")

spark.sql("select dname,ename from emp12345,dept where emp12345.depno=dept.deptno").show

六.视图

1.视图是一个虚表,不存储数据
2.两种类型视图:

(1)普通视图(本地视图):只在当前Session有效

(2)全局视图:在不同Session中都有用。全局视图创建在命名空间中:global_temp 类似于一个库。
    上面使用的是一个在Session生命周期中的临时views。在Spark SQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。
(a)创建一个普通的view和一个全局的view

df1.createOrReplaceTempView("emp1")

df1.createGlobalTempView("emp2")

(b)在当前会话中执行查询,均可查询出结果。

spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show

(c)开启一个新的会话,执行同样的查询

spark.newSession.sql("select * from emp1").show     //(运行出错)
spark.newSession.sql("select * from global_temp.emp2").show

七.创建Datasets

    DataFrame的引入,可以让Spark更好的处理结构数据的计算,但其中一个主要的问题是:缺乏编译时类型安全。为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。

    Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)

1.方式一:使用序列

(1)定义case class

scala >case class MyData(a:Int,b:String)

(2).生成序列,并创建DataSet

scala >val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

(3).查看结果

scala >ds.show

ds.collect
2.方式二:使用JSON数据

(1)定义case class

case class Person(name: String, age: BigInt)

(2)通过JSON数据生成DataFrame

val df = spark.read.format("json").load("/opt/module/datas/TestFile/people.json")

(3)将DataFrame转成DataSet

df.as[Person].show
df.as[Person].collect
3.方式三:使用其他数据(RDD的操作和DataFrame操作结合)

(1)需求:分词;查询出长度大于3的单词
(a)读取数据,并创建DataSet

val linesDS = spark.read.text("/opt/module/datas/TestFile/test_WordCount.txt").as[String]

(b)对DataSet进行操作:分词后,查询长度大于3的单词

val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

words.show

words.collect

(2)需求:执行WordCount程序

val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

result.show

排序:

result.orderBy($"value").show

result.orderBy($"count(1)").show

八.Datasets的操作案例

1.使用emp.json 生成DataFrame

(1)数据:emp.json

(2)使用emp.json 生成DataFrame

val empDF = spark.read.json("/opt/module/datas/TestFile/emp.json")

emp.show

查询工资大于3000的员工

empDF.where($"sal" >= 3000).show

(3)创建case class,生成DataSets

case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)

val empDS = empDF.as[Emp]

(4)查询数据

//查询工资大于3000的员工
empDS.filter(_.sal > 3000).show

//查看10号部门的员工 
empDS.filter(_.deptno == 10).show
2.多表查询

(1)创建部门表

val deptRDD=sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

deptDS.show

(2)创建员工表

case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS

empDS.show

(3)执行多表查询:等值链接

val result = deptDS.join(empDS,"deptno")

result.show

(4)另一种写法:注意有三个等号

val result1 = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))

result1.show

joinWith和join的区别是连接后的新Dataset的schema会不一样

(5)多表条件查询:

val result = deptDS.join(empDS,"deptno").where("deptno==10") 

result.show
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352