一、SparkSQL基础知识
1.SparkSQL介绍
Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。
- SparkSQL支持查询原生的RDD。 RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础。
- 能够在Scala中写SQL语句。支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用。
2.Spark on Hive 和 Hive on Spark
Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。
Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。(基本不用)
3.DataFrame
DataFrame也是一个分布式数据容器。与RDD类似,然而DataFrame更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看, DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
DataFrame的底层封装的是RDD,只不过RDD的泛型是Row类型。
4.SparkSQL的数据源
SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。
5.SparkSQL底层架构
首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划,随后经过消费模型转换成一个个的Spark任务执行。
6.谓词下推(predicate Pushdown)
二、创建DataFrame的几种方式
1.读取json格式的文件创建DataFrame
- 读取json文件
CreateDataFrameFromJsonFile.scala - 读取嵌套的json格式文件
readNestJsonFile.scala - 读取嵌套的jsonArray数组格式文件
readJsonArray.scala
2.通过json格式的RDD创建DataFrame
- Spark1.6中读取json格式的RDD,Spark2.0+只有读取json格式的DataSet
CreateDataSet.scala
DataSetWordCount.scala
CreateDataFrameFromJsonDataSet.scala
3.非json格式的RDD创建DataFrame
- 通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)
CreateDataFrameFromRDDWithReflection.scala - 动态创建Schema将非json格式的RDD转换成DataFrame
CreateDataFrameFromRDDWithSchema.sca
4.读取parquet文件创建DataFrame
CreateDataFrameFromParquet.scala
5.读取JDBC中的数据创建DataFrame(MySql为例)
CreateDataFrameFromMySQL.scala
6.读取Hive中的数据加载成DataFrame
CreateDataFrameFromHive.scala
三、Spark On Hive的配置
1.在Spark客户端配置Hive On Spark
在Spark客户端安装包下spark-1.6.0/conf中创建文件hive-site.xml:
配置hive的metastore路径
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1:9083</value>
</property>
</configuration>
2.启动Hive的metastore服务
hive --service metastore
3.启动zookeeper集群,启动HDFS集群。
4.启动SparkShell 读取Hive中的表总数,对比hive中查询同一表查询总数测试时间。
./spark-shell
--master spark://node1:7077,node2:7077
--executor-cores 1
--executor-memory 1g
--total-executor-cores 1
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("show databases").show
hc.sql("user default").show
hc.sql("select count(*) from jizhan").show
如果使用Spark on Hive 查询数据时,出现错误:
Cause by: java.net.UnknownHostException:XXX
找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径:
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
四、UDF和UDAF
1.UDF:用户自定义函数
可以自定义类实现UDFX接口
UDF.scala
2.UDAF:用户自定义聚合函数
实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类,实现8个方法,最重要三个方法initialize update merge
UDAF.scala
五、开窗函数
格式:
row_number() over (partitin by XXX order by XXX)
row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于 分组取topN
如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用HiveContext来执行,HiveContext默认情况下在本地无法创建。在MySql8之后也增加了开窗函数。
OverFunctioOnMySQL.scala
OverFunctionOnHive.scala