技术背景
- 优缺点分析:
SQL: 擅长数据分析和通过简单语法表示查询,表达清晰,适用于结构化数据
API: 擅长过程式处理和算法性处理,操作粒度细,适用于非结构化数据和半结构化数据 - 在spark出现之前,一个工具一向只支持SQL或者API处理,很难统一化
- 在sparksql出现之前,大数据结构化数据分析一般只能使用hive,hive的缺点导致越来越不适合业务场景
- Databricks公司采用上层使用hive的sql语法,下层将hive的物理执行计划mr替代为sparkcore来实现的一款名为shark的工具
- shark的执行计划生成严重依赖于hive,想要增加新的优化非常困难,hive是进程级别的并行,spark是线程级别的的并行,hive中很多线程不安全的代码不适用于spark,随着hive的维护升级,shark必须也做一次相应的维护升级,维护和优化困难
- 2014年databricks公司宣布停止shark的开发,将重心放在sparksql之上
设计目的
解决基于spark使用sql分析海量结构化数据的业务场景
设计思想
sparkSql将执行计划和优化交给优化器catalyst,替代之前shark的执行计划和优化是hive的
hive底层将sql转换为mr程序,sparksql底层将sql转换为RDD
技术本质
sparkSQl内置了一套简单的SQL解析器,可以不使用HQL
还引入了一套dataframe这样的DSL api,完全可以不依赖任何hive的组件
spark1.6之后,又使用dataset api来统一和结合sql的访问和命令式api的使用,
开发者可以使用sql来进行查询并筛选数据,然后使用API来进行探索式分析
核心特性
- 易整合:
sparksql无缝的将sql和api整合在一起,意味着你可以在程序中使用你所熟悉的sql或者api来进行操作 - 统一的数据访问格式:
sparksql统一了数据访问格式,意味着你可以用同一种方式访问任意数据源 - 兼容hive:
sparksql与hivesql语法兼容,以及sparksql可以访问hive仓库 - 标准的数据连接:
可以通过jdbc或odbc来连接
重点概念
DataFrame
- 背景:
DataFrame的前身是SchemaRDD,从Spark 1.3.0开始SchemaRDD更名为DataFrame,并不再直接继承自RDD,而是自己实现了RDD的绝大多数功能 - 概念:
一种带有Schema元信息的以RDD为基础的分布式数据集,类似于传统的二维表格 - 意义:
- 记录了数据集的Schema元信息,可将非结构化或半结构化数据变为结构化数据,供于sql操作
- DataFrame也是懒执行的。性能上比RDD要高,主要原因:优化的执行计划:查询计划通过Spark catalyst optimiser进行优化,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。
- 使用
- 从结构化或半结构化数据源创建
- 从RDD进行转换
- 从HiveTable进行查询返回
DataSet
- 背景:
DataSet是在Spark1.6中添加的新的接口。 - 概念:
一种带有Schema元信息的以RDD为基础的具有强类型的分布式数据集 - 意义:
- 用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
- Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
- 与DataFrame相比,保存了类型信息,是强类型的,提供了编译时类型检查,调用Dataset的方法先会生成逻辑计划,然后被spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行!
- 使用:
Row
- 概念:
可以理解为一种javaBean,可以随意的更改属性 - 使用:
- 通过构造器构建
- 调用静态方法Row.fromSeq构建
RDD/DataFrame/DataSet对比
联系
- 都是spark平台下的弹性分布式数据集,为处理的超大型数据提供便利
- 都是lazy模式,只有遇到action算子时才会触发执行
- 都会根据spark的内存情况自动缓存运算,即使数据量很大,也不用担心发生OOM
- 都有partition概念
- 有共同的函数,如filter,排序等
- 可以进行相互转化
区别
- RDD: 有泛型,没有Schema
- RDD一般和sparkMLlib同时使用
- RDD不支持SparkSQL操作
- DataFrame: 没有泛型,有Schema
- DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
- DataFrame和DataSet一般不与sparkMLlib同时使用
- DataFrame与Dataset均支持sparksql的操作,还能注册临时表/视图,进行sql语句操作
- DataFrame与Dataset支持一些特别方便的保存方式
- DataSet:有泛型有Schema
- DataSet和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同
- DataFrame也可以叫做DataSet[Row],每一行的类型是Row,不解析,
关系
- RDD - T + Schema = DataFrame
- RDD + Schema = DataSet
- DataSet[Row] = DataFrame
转化
- 将DS或DF转换为RDD,只需要调用ds或df的rdd方法即可
- 将RDD转换为DS或DF
- 使用反射: 将RDD的字段拆分封装成为一个RDD[对象]返回,然后调用tods或todf方法
- 构建schema: 将RDD的字段拆分封装成为一个RDD[Row]类型,然后利用structType构建一个schema,最后在使用spark.createDataFrame方法进行拼接
- 将DF转为DS,需要调用df的as方法,添加泛型,或者调用tods方法,导入隐式转换
- 将DS转为DF,需要调用ds的todf方法,导入隐式转换
查询方式
- 两种风格
- SQL风格:
如果想使用SQL风格的语法,需要将DataFrame注册成表,然后调用调用sql方法 - DSL风格:
DSL是由spark提供的一个领域特定语言用来方便操作结构化数据,将sql中的字段映射成为对应的方法,不需要注册成表
自定义函数
-
UDF:
- SQL风格: sparkSession.udf.register()
- DSL风格: val toUpString = udf(
(name:String) => name.toUpperCase //将传进来的名字转为大写
)
-
UDAF:
- 继承UserDefinedAggregateFunction方法
- 重写inputSchema:输入数据的类型,bufferSchema:产生中间结果的数据类型,dataType:最终返回的结果类型,deterministic:确保一致性,一般用true,initialize:指定初始值,update:每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算),merge:全局聚合(将每个分区的结果进行聚合),evaluate:计算最终的结果
-
UDTF:
- 目的:
既显示聚集前的数据,又显示聚集后的数据,在每一行的最后一列添加聚合函数的结果。 - 意义:
聚合函数是将多行变成一行;开窗函数是将一行变成多行; - 使用:
- OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。SQL标准允许将所有聚合函数用做聚合开窗函数。
- 排序开窗函数,同hive
- 目的:
Spark on Hive
SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表
数据源
普通文本、json、parquet、csv、jdbc,odbc
应用场景
- RDD主要用于处理非结构化数据、半结构化数据、结构化
- SparkSQL主要用于处理结构化数据(较为规范的半结构化数据也可以处理)
- SparkSQL 相较于 RDD 的优势在哪?
- SparkSQL 提供了更好的外部数据源读写支持
- SparkSQL 提供了直接访问列的能力
- SQL替代API的意义在于哪?
- sql处理结构化数据的能力比api处理性能更好,基于列访问,这是主要的应用意义
- sql可以简化开发,学习成本低,适用于数据分析人员,api适用于开发者