1 概述
Spark SQL 是Spark的一个组件,用于结构化数据的计算。Spark SQL 提供一个称为DataFrames的编程对象,DataFrams 可以充当分布式sql查询引擎。
2 DataFrames
DataFrame 是一个分布式的数据集合,该数据结合以命名列的方式进行整合,DataFrame 可以理解为关系数据库中的一张表。也可以理解为R/Python中的一个data frame .DataFrames 可以通过多种数据构造,例如 结构化的数据文件。hive 中的表,外部数据库,Spark 计算过程中生成的RDD等.
2.1 入口:SQLContext(Starting point: SQLContext)
Spark SQL 程序的主入口是SQLContext 类或它的子类。创建一个基本的SQLContext 只需要SparkContext Scala 代码如下:
val sc:SparkContext
val sqlConext=new org.apache.spark.sql.SQLConetext(sc)
除了基本的SQLContext 也可以创建HiveContext.SQLContext和HiveContext 区别联系为
1.SQLContext现在只支持SQL语法解析器(SQL-92语法)
2.Hive Context现在只支持SQL语法解析器和HiveSQL语法解析器。默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法
3. 使用HiveContext可以使用Hive 的UDF ,读写Hive表数据等Hive操作。SQLContext不可以对hive操作
HiveContext 包装了HIVE的依赖包。把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使用HiveContext 时再把Hive的各种依赖包加进来
2.2创建DataFrames
使用SQLContext ,Spark 应用程序。可以通过RDD,Hive 表,JSON 格式数据等数据源创建DataFrames .代码如下 Scala编写
val sc:SparkConetxt
val sqlContext=new org.apache.spark.SQLContext(sc)
val df=sqlContext.read.json("file:///path")
df.show()
2.3DataFrame操作 Scala 代码
val sc:SparkContext
val sqlContext=new org.apache.spark.sql.SQLContext(sc)
val df=sqlContext.read.json("path")
df.show()
df.printSchema()
df.select("name").show()
df.select(df("name"),df("age")+1).show()
2.4 运行SQL查询程序
Spark Application可以使用SQLContext的sql()方法执行SQL查询操作,sql()方法返回的查询结果为DataFrame格式。scala 代码如下:
val sqlContext = ...// An existing SQLContext
val df =sqlContext.sql("SELECT * FROM table")
2.5 DataFrames与RDDs的相互转换
Spark SQL支持两种RDDs转换为DataFrames的方式:
1.使用反射获取RDD内的Schema
当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
2.通过编程接口指定Schema
通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema
2.5.1 使用反射获取Schema
Spark SQL支持将JavaBean的RDD自动转换成DataFrame。通过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark
SQL不支持嵌套的JavaBeans和复杂数据类型(如:List、Array)。创建一个实现Serializable接口包含所有属性getters和setters的类来创建一个JavaBean。通过调用createDataFrame并提供JavaBean的Class
object,指定一个Schema给一个RDD。示例如下:
3 数据源
Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。读取Parquet文件示例如下:
val df=sqlContext.read.load("path")
df.select("name",favorite_color).write.save("name.parquet")
3.1.1 手动指定选项
当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称(json,parquet,jdbc)。通过指定的数据源格式名,可以对DataFrames进行类型转换操作。示例如下:
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name","age").write.format("parquet").save("namesAndAges.parquet")
3.1.2 存储模式
可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:
3.1.3 持久化到表
当使用HiveContext时,可以通过saveAsTable方法将DataFrames存储到表中。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。存储一个DataFrame,可以使用SQLContext的table方法。table先创建一个表,方法参数为要创建的表的表名,然后将DataFrame持久化到这个表中。
默认的saveAsTable方法将创建一个“managed table”,表示数据的位置可以通过metastore获得。当存储数据的表被删除时,managed table也将自动删除。
3.2 Parquet文件
Parquet是一种支持多种数据处理系统的柱状的数据格式,Parquet文件中保留了原始数据的模式。Spark SQL提供了Parquet文件的读写功能。
3.2.1 读取Parquet文件
读取Parquet文件示例如下:
3.2.2 解析分区信息
对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:
3.3 JSON数据集
Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。
需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:
3.4 Hive表
Spark SQL 支持对hive的读写操作,需要注意的是HIVE所依赖的包,没包含在Spark assembly 包中,增加Hive时候。需要在需要在Spark的build中添加 -Phive 和
-Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到所有的worker节点上。因为worker节点在访问Hive中数据时,会调用Hive的serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包
Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令之前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须可以被driverhe和所有的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中通过--jars选项和--file选项指定。
操作Hive时,必须创建一个HiveContext对象,HiveContext继承了SQLContext,并增加了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法可以执行HiveQL语法的查询语句。示例如下: