1. 开始使用spark SQL
1.1 Spark SQL CLI
可以使用本地命令行的方式使用Spark SQL查询
./bin/spark-sql
【注意】Spark SQL CLI无法与Thrift JDBC服务器通信。
1.2 Spark的可编程SQL接口
可以通过任何Spark支持语言的API执行SQL。使用SparkSession的sql方法实现,返回值为一个DF,可以接着被后续处理。
spark.sql("SELECT 1+1").show //python相同
sql也是惰性操作,有些转换操作通过SQL实现要比操作DF方便。
多行SQL示例:
spark.sql(“""SELECT user_id, department, first_name FROM professors
WHERE department IN
(SELECT name FROM department WHERE created_date >= '2016-01-01')""") //python相同
更强大的是可以在SQL和DF之间实现完全的互操作。
spark.read.json("/data/flight-data/json/2015-summary.json")
.createOrReplaceTempView("some_sql_view") // DataFrame转换为SQL
spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
""")
.where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")
.count() //SQL转换为DataFrame
1.3 SparkSQL Thrift JDBC/ODBC服务器
Spark提供了一个JDBC接口,可以使用BI工具远程连接到Spark驱动器进行SQL查询。实现对应于hive中的HiveServer2,可以使用Spark后Hive的beeline脚本来测试JDBC服务器。
- 启用JDBC服务:
./sbin/start-thriftserver.sh
此服务默认监听端口:10000.
beeline测试示例:
./bin/beeline
beeline> !connect jdbc:hive2://localhost:10000
beeline将会询问用户名和密码。非安全模式下,只需要输入计算机的用户名和空白密码。安全模式下使用请查看beeline文档说明。
2. Catalog
Catalog是Spark SQL中最高级别的抽象。用于存储用户数据中的元数据和其他有用的东西,如数据库、数据表、函数、视图。它位于org.apache.spark.SQL.catalog.Catalog包中,其中还包括很多有用的函数。以下内容若要在程序中使用,请在spark.sql中执行。
2.1 数据表
数据表在逻辑上等同于DF。数据表是始终都有数据的,没有临时表或视图的概念,删除一张表可能导致数据丢失。
- Spark托管表
- 非托管表(unmanaged table):spark只管理元数的表,例如定义磁盘上若干文件为一个数据表。
- 托管表(managed table):spark及管理元数据又管理实际数据的表为托管表。例如在DF上使用saveAsTable函数创建的表,spark将跟踪托管所有信息。
- 创建表
spark允许直接从数据源创建表,不需要先定义表再加载数据。
- USING和STORED AS建表
CREATE TABLE flights_csv (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING COMMENT “remember, the US will be most prevalent",
count LONG)
USING csv OPTIONS (header true, path '/data/flight-data/csv/2015-summary.csv')
- CSAS建表
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights
--只有表不存在才能创建,可加限定防止报错
CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights
--指定分区,优化数据分布
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5
【注意】创建的表可以在整个spark会话中使用,而临时表需要创建视图才能使用。
- 创建外部表
spark SQL与HSQL完全兼容。例如创建非托管表时就可以使用hive创建外部表的方式使用external关键字。因此HSQL大部分可以直接粘贴到Spark中使用。
- 创建spark外部表
CREATE EXTERNAL TABLE hive_flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/flight-data-hive/'
- 使用select子句创建外部表
CREATE EXTERNAL TABLE hive_flights_2
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/flight-data-hive/' AS SELECT * FROM flights
- 插入表
支持标准SQL语法:
INSERT INTO flights_from_select
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20
指定分区插入数据:
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
SELECT count, ORIGIN_COUNTRY_NAME FROM flights
WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12
- 表的元数据操作
--查看表的元数据信息
DESCRIBE TABLE flights_csv
--查看分区信息
SHOW PARTITIONS partitioned_flights
--刷新元数据
REFRESH table partitioned_flights
--手动修改后,修复元数据
MSCK REPAIR TABLE partitioned_flights
- 删除表
托管表删除时,会将元数据和表数据全部删除;非托管表删除时只删除元数据,实际数据并不会删除。
DROP TABLE IF EXISTS flights_csv;
- 缓存表
可以像DF一样将表缓存后查询,以提升查询速度。
--缓存表
CACHE TABLE flights
--取消缓存
UNCACHE TABLE FLIGHTS
2.2 视图
视图是基于现有表的一组转换操作,基本上只保存了查询计划,可以方便地组织或重用逻辑。视图可以是全局的、针对某个数据库的或是针对某个会话的。
- 创建视图
几种视图的创建示例:
--创建普通视图
CREATE VIEW just_usa_view AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
--创建临时视图,只在当前会话有效
CREATE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
--创建全局视图
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
--覆盖已有视图
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
--执行视图
SELECT * FROM just_usa_view_temp
视图本质上就是一个转换操作,只在查询的时候使用它。
- 删除视图
删除视图本质上只是删除了操作逻辑,并不涉及基础数据。
DROP VIEW IF EXISTS just_usa_view;
2.3 数据库
数据库操作
--创建数据库
CREATE DATABASE some_db
--查看数据库
SHOW DATABASES
--选择数据库
USE some_db
SHOW tables
SELECT * FROM default.flights --指定库名
SELECT current_database() --查看当前使用的库
--删除数据库
DROP DATABASE IF EXISTS some_db
2.4 查询操作
查询语句符合标准的SQL语法:
SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
FROM relation[, relation, ...]
[lateral_view[, lateral_view, ...]]
[WHERE boolean_expression]
[aggregation [HAVING boolean_expression]]
[ORDER BY sort_expressions]
[CLUSTER BY expressions]
[DISTRIBUTE BY expressions]
[SORT BY sort_expressions]
[WINDOW named_window[, WINDOW named_window, ...]]
[LIMIT num_rows]
named_expression:
: expression [AS alias]
relation:
| join_relation
| (table_name|query|relation) [sample] [AS alias]
: VALUES (expressions)[, (expressions), ...]
[AS (column_name[, column_name, ...])]
expressions:
: expression[, expression, ...]
sort_expressions:
: expression [ASC|DESC][, expression [ASC|DESC], ...]
case...when...then...end查询判断
SELECT
CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
ELSE -1 END
FROM partitioned_flights
2.5 高级主题
- 复杂类型
spark SQL中支持的复杂类型:结构体(STRUCT)、列表(list)、映射(map)。
- STRUCT
spark中结构体的使用:
--创建结构体的列,使用括号包含一组列
CREATE VIEW IF NOT EXISTS nested_data AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights
--使用结构体
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data
- list
spark SQL中使用聚合函数创建列表,collect_list创建一个包含重复值的list;collect_set创建不含重复值的list。
--在聚合操作sql中生成列表
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME
--指定生成数组
SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights
--以数据下标的方式访问列表
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0] FROM flights GROUP BY
DEST_COUNTRY_NAME
--创建测试数据
CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
FROM flights GROUP BY DEST_COUNTRY_NAME
--将列表转回列
SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg
- 函数
spark中内置了许多高级函数。
--查看函数列表
SHOW FUNCTIONS
--查看系统函数列表
SHOW SYSTEM FUNCTIONS
--查看用户函数
SHOW USER FUNCTIONS
--使用通配符和LIKE关键字查找函数
SHOW FUNCTIONS "s*"
SHOW FUNCTIONS LIKE "collect*"
--DESCRIBE关键字可返回函数文档
- 用户定义函数(UDF)
spark允许用户使用所支持的语言编写函数,注册之后可以以分布式的方式使用。
def power3(number:Double):Double = number * number * number //定义函数
spark.udf.register("power3", power3(_:Double):Double) //注册函数
SELECT count, power3(count) FROM flflights
- 子查询
spark中支持在查询中指定子查询,包括相关的子查询和不相关的子查询。
- 不相关谓词子查询
SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)
此查询是不相关的,因为它不包含来自查询外部的信息,这是一个你可以自行运行的查询。
- 相关谓词子查询
相关谓词子查询允许你在内部查询中使用外部作用域的信息。
SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2
WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2
WHERE f2.dest_country_name = f1.origin_country_name)
- 不相关标量查询
使用不相关的标量查询scalar query, 可以引入一些以前可能没有的补充信息。
SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights
3. Spark SQL的配置
部分配置参考《权威指南》194-195页