Spark-SQL

1. 开始使用spark SQL

1.1 Spark SQL CLI

可以使用本地命令行的方式使用Spark SQL查询

./bin/spark-sql

【注意】Spark SQL CLI无法与Thrift JDBC服务器通信。

1.2 Spark的可编程SQL接口

可以通过任何Spark支持语言的API执行SQL。使用SparkSession的sql方法实现,返回值为一个DF,可以接着被后续处理。

spark.sql("SELECT 1+1").show //python相同

sql也是惰性操作,有些转换操作通过SQL实现要比操作DF方便。
多行SQL示例:

spark.sql(“""SELECT user_id, department, first_name FROM professors
 WHERE department IN
 (SELECT name FROM department WHERE created_date >= '2016-01-01')""") //python相同

更强大的是可以在SQL和DF之间实现完全的互操作。

spark.read.json("/data/flight-data/json/2015-summary.json")
 .createOrReplaceTempView("some_sql_view") // DataFrame转换为SQL
spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
""")
 .where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")
 .count() //SQL转换为DataFrame

1.3 SparkSQL Thrift JDBC/ODBC服务器

Spark提供了一个JDBC接口,可以使用BI工具远程连接到Spark驱动器进行SQL查询。实现对应于hive中的HiveServer2,可以使用Spark后Hive的beeline脚本来测试JDBC服务器。

  • 启用JDBC服务:
./sbin/start-thriftserver.sh

此服务默认监听端口:10000.
beeline测试示例:

./bin/beeline
beeline> !connect jdbc:hive2://localhost:10000

beeline将会询问用户名和密码。非安全模式下,只需要输入计算机的用户名和空白密码。安全模式下使用请查看beeline文档说明。

2. Catalog

Catalog是Spark SQL中最高级别的抽象。用于存储用户数据中的元数据和其他有用的东西,如数据库、数据表、函数、视图。它位于org.apache.spark.SQL.catalog.Catalog包中,其中还包括很多有用的函数。以下内容若要在程序中使用,请在spark.sql中执行。

2.1 数据表

数据表在逻辑上等同于DF。数据表是始终都有数据的,没有临时表或视图的概念,删除一张表可能导致数据丢失。

  1. Spark托管表
  • 非托管表(unmanaged table):spark只管理元数的表,例如定义磁盘上若干文件为一个数据表。
  • 托管表(managed table):spark及管理元数据又管理实际数据的表为托管表。例如在DF上使用saveAsTable函数创建的表,spark将跟踪托管所有信息。
  1. 创建表
    spark允许直接从数据源创建表,不需要先定义表再加载数据。
  • USING和STORED AS建表
CREATE TABLE flights_csv (
 DEST_COUNTRY_NAME STRING,
 ORIGIN_COUNTRY_NAME STRING COMMENT “remember, the US will be most prevalent",
 count LONG)
USING csv OPTIONS (header true, path '/data/flight-data/csv/2015-summary.csv')
  • CSAS建表
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights
--只有表不存在才能创建,可加限定防止报错
CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights
--指定分区,优化数据分布
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5

【注意】创建的表可以在整个spark会话中使用,而临时表需要创建视图才能使用。

  1. 创建外部表
    spark SQL与HSQL完全兼容。例如创建非托管表时就可以使用hive创建外部表的方式使用external关键字。因此HSQL大部分可以直接粘贴到Spark中使用。
  • 创建spark外部表
CREATE EXTERNAL TABLE hive_flights (
 DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/flight-data-hive/'
  • 使用select子句创建外部表
CREATE EXTERNAL TABLE hive_flights_2
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/flight-data-hive/' AS SELECT * FROM flights
  1. 插入表
    支持标准SQL语法:
INSERT INTO flights_from_select
 SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20

指定分区插入数据:

INSERT INTO partitioned_flights
 PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
 SELECT count, ORIGIN_COUNTRY_NAME FROM flights
 WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12
  1. 表的元数据操作
--查看表的元数据信息
DESCRIBE TABLE flights_csv
--查看分区信息
SHOW PARTITIONS partitioned_flights
--刷新元数据
REFRESH table partitioned_flights
--手动修改后,修复元数据
MSCK REPAIR TABLE partitioned_flights
  1. 删除表
    托管表删除时,会将元数据和表数据全部删除;非托管表删除时只删除元数据,实际数据并不会删除。
DROP TABLE IF EXISTS flights_csv;
  1. 缓存表
    可以像DF一样将表缓存后查询,以提升查询速度。
--缓存表
CACHE TABLE flights
--取消缓存
UNCACHE TABLE FLIGHTS

2.2 视图

视图是基于现有表的一组转换操作,基本上只保存了查询计划,可以方便地组织或重用逻辑。视图可以是全局的、针对某个数据库的或是针对某个会话的。

  1. 创建视图
    几种视图的创建示例:
--创建普通视图
CREATE VIEW just_usa_view AS
 SELECT * FROM flights WHERE dest_country_name = 'United States'
--创建临时视图,只在当前会话有效
CREATE TEMP VIEW just_usa_view_temp AS
 SELECT * FROM flights WHERE dest_country_name = 'United States'
 --创建全局视图
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
 SELECT * FROM flights WHERE dest_country_name = 'United States'
--覆盖已有视图
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
 SELECT * FROM flights WHERE dest_country_name = 'United States'
--执行视图
SELECT * FROM just_usa_view_temp

视图本质上就是一个转换操作,只在查询的时候使用它。

  1. 删除视图
    删除视图本质上只是删除了操作逻辑,并不涉及基础数据。
DROP VIEW IF EXISTS just_usa_view;

2.3 数据库

数据库操作

--创建数据库
CREATE DATABASE some_db
--查看数据库
SHOW DATABASES
--选择数据库
USE some_db
SHOW tables
SELECT * FROM default.flights --指定库名
SELECT current_database() --查看当前使用的库
--删除数据库
DROP DATABASE IF EXISTS some_db

2.4 查询操作

查询语句符合标准的SQL语法:

SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
 FROM relation[, relation, ...]
 [lateral_view[, lateral_view, ...]]
 [WHERE boolean_expression]
 [aggregation [HAVING boolean_expression]]
 [ORDER BY sort_expressions]
 [CLUSTER BY expressions]
 [DISTRIBUTE BY expressions]
 [SORT BY sort_expressions]
 [WINDOW named_window[, WINDOW named_window, ...]]
 [LIMIT num_rows]

named_expression:
 : expression [AS alias]
relation:
 | join_relation
 | (table_name|query|relation) [sample] [AS alias]
 : VALUES (expressions)[, (expressions), ...]
 [AS (column_name[, column_name, ...])]
expressions:
 : expression[, expression, ...]
sort_expressions:
 : expression [ASC|DESC][, expression [ASC|DESC], ...]

case...when...then...end查询判断

SELECT
 CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
 WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
 ELSE -1 END
FROM partitioned_flights

2.5 高级主题

  1. 复杂类型
    spark SQL中支持的复杂类型:结构体(STRUCT)、列表(list)、映射(map)。
  • STRUCT
    spark中结构体的使用:
--创建结构体的列,使用括号包含一组列
CREATE VIEW IF NOT EXISTS nested_data AS
 SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights
--使用结构体
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data
  • list
    spark SQL中使用聚合函数创建列表,collect_list创建一个包含重复值的list;collect_set创建不含重复值的list。
--在聚合操作sql中生成列表
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
 collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME
--指定生成数组
SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights
--以数据下标的方式访问列表
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0] FROM flights GROUP BY 
DEST_COUNTRY_NAME
--创建测试数据
CREATE OR REPLACE TEMP VIEW flights_agg AS
 SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
 FROM flights GROUP BY DEST_COUNTRY_NAME
--将列表转回列
SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg
  1. 函数
    spark中内置了许多高级函数。
--查看函数列表
SHOW FUNCTIONS
--查看系统函数列表
SHOW SYSTEM FUNCTIONS
--查看用户函数
SHOW USER FUNCTIONS
--使用通配符和LIKE关键字查找函数 
SHOW FUNCTIONS "s*"
SHOW FUNCTIONS LIKE "collect*"
--DESCRIBE关键字可返回函数文档
  • 用户定义函数(UDF)
    spark允许用户使用所支持的语言编写函数,注册之后可以以分布式的方式使用。
def power3(number:Double):Double = number * number * number //定义函数
spark.udf.register("power3", power3(_:Double):Double) //注册函数
SELECT count, power3(count) FROM flflights
  1. 子查询
    spark中支持在查询中指定子查询,包括相关的子查询和不相关的子查询。
  • 不相关谓词子查询
SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights
 GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)

此查询是不相关的,因为它不包含来自查询外部的信息,这是一个你可以自行运行的查询。

  • 相关谓词子查询
    相关谓词子查询允许你在内部查询中使用外部作用域的信息。
SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2
 WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2
 WHERE f2.dest_country_name = f1.origin_country_name)
  • 不相关标量查询
    使用不相关的标量查询scalar query, 可以引入一些以前可能没有的补充信息。
SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights

3. Spark SQL的配置

部分配置参考《权威指南》194-195页

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容