Spark系列3 - Spark SQL

1 从Shark到Spark SQL

Spark SQL的前生是Shark,即Hive on Spark。Shark本质是通过Hive的HQL进行解析,将HiveQL翻译成Spark上对应的RDD操作,然后通过Hive的Metadata获取数据数据库里的元数据,并根据元数据从HDFS上读取文件,最后由Shark将获取的数据放到Spark上运算。

Shark提供了类似Hive的功能,区别是Hive将输入的HiveQL转换成MapReduce作业,而Shark将HiveQL转换成Spark作业。Shark复用了Hive中的HiveQL解析、逻辑执行计划翻译、执行计划优化等,可以近似的认为,Shark仅将物理计划从MapReduce作业替换成Spark作业。

Shark继承了大量的Hive代码,给优化和维护带来了很大的不变,特别是基于MapReduce设计的部分,成为整个项目的瓶颈,因此2014年7月,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。Spark SQL允许开发人员可以直接操作RDD,同时也可查询在Hive上存放的外部数据,因此Spark SQL在使用SQL进行外部查询的同时,也能进行更复杂的数据分析。Hive、Shark和Spark SQL的架构对比下图,

Hive、Shark和Spark SQL架构对比

Spark SQL在Shark原有架构的基础上,重写了逻辑执行计划的优化部分,解决了Shark存在的问题。Spark SQL引入了DataFrame,用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以来自Hive、HDFS、Cassandra等。

2 RDD和DataFrame

RDD是分布式的Java对象的集合,对象的内部结构对于RDD而言却是不可知的。DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息,DataFrame是带有schema的RDD。二者的对比如下图,

RDD和DataFrame

Spark2.0之后,Spark使用了全新的SparkSession,替换了Spark1.6中的SQLContext和HiveContext。

3 DataFrame编程

DataFrame的创建

import org.apache.spark.sql.SparkSession // spark-shell中默认已导入

读取json格式的数据,创建DataFrame

import spark.implicits._
val df = spark.read.json("hdfs://master:8020/data/people.json")
df.show()
val df1 = spark.read.format("json").load("hdfs://master:8020/data/people.json")
df1.show()

DataFrame的保存

val df = spark.read.json("hdfs://master:8020/data/people.json")
df.write.json("hdfs://master:8020/data/people_new.json")
df.write.parquet("hdfs://master:8020/data/people_new.parquet")
df.write.csv("hdfs://master:8020/data/people_new.csv")

df.write.format("json").save("hdfs://master:8020/data/people_new1.json")
df.write.format("parquet").save("hdfs://master:8020/data/people_new1.parquet")
df.write.format("csv").save("hdfs://master:8020/data/people_new1.csv")

注意:save保存文件时对应的参数是一个路径而不是文件名。

DataFrame的常用操作

df.printSchema() 打印DataFrame的模式
df.select() 从DataFrame中选择部分字段

df.select("name","Age").show()
df.select(df("name"), df("Age")+1).show() // 修改字段值
df.select(df("name").as("username"), df("Age").as("userage")).show() // 给字段重命名

df.filter() 实现条件查询,获取满足条件的记录

df.filter(df("age") > 20).show()

df.groupBy() 对记录进行分组

df.groupBy("age").count().show()

df.sort() 对记录进行排序

df.sort(df("name").asc).show
df.sort(df("age").desc).show
df.sort(df("age").desc, df("name")).show

df.map(func) 对记录安装func进行转换

从RDD转换到DataFrame

注意:这里的转换主要指无格式的文本文件、MySQL、Hive等无法直接生成DataFrame的数据进行转换,对于json、csv、parquet格式的文件,会自动的进行隐式转换,不需要进行这样的转换。从RDD转换到DataFrame有两种转换方法:
方法1:利用反射机制推断RDD模式,适用于对已知数据结构的转换;
方法2:使用编程方式定义RDD模式,使用编程结构构造一个schema,并将其应用的已知的RDD上;

利用反射机制推断RDD

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._

case class Person(name: String, age: Long)
val people = spark.sparkContext.textFile("hdfs://master:8020/data/people.txt")
val peopleDf = people.map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDf.createOrReplaceTempView("people") //必须转成临时表才能供查询使用
val peopleRDD = spark.sql("select name,age from people where age > 20")
peopleRDD.map(t => {"Name: "+t(0) +", age: "+t(1)}).show()
  • 上述代码中定义了一个case class,只有case class才能被Spark隐式转换成DataFrame,因此定义了一个cass class;
  • 生成DataFrame之后,Spark要求必须把DataFrame注册为临时表,才能供后面的查询使用,上述临时表的名称为people;

采用编程方式定义RDD模式

当无法定义case class时,就需要采用编程方式定义RDD模式,具体需要3步:

  • 第一步:制作表头,表头也就是表的模式,包括字段名,类型和是否允许为空等信息
  • 第二步:制作表中记录;
  • 第三步:把表头和表中记录拼装在一起;
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// generate Field
val fields = Array(StructField("name", StringType, true), StructField("age", IntegerType, true)) 
val schema = StructType(fields)

// generate Record
val peopleRDD = spark.sparkContext.textFile("hdfs://master:8020/data/people.txt")
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))

// combine
val peopleDF = spark.createDataFrame(rowRDD, schema)

peopleDf.createOrReplaceTempView("people") //必须转成临时表才能供查询使用
val peopleRecord = spark.sql("select name,age from people")
peopleRecord.show()

4 Spark SQL读写MySQL

通过JDBC连接数据库

① 在mysql库中创建表,并插入数据
② 拷贝mysql-connector-java-5.1.40-bin.jar 到各个spark节点($SPAK_HOME/jars)
③ 启动spark-shell
spark-shell --jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.40-bin.jar
--driver-class-path /root/software/spark-2.2.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.40-bin.jar

说明:若环境变量配置完整,启动spark-shell不带上述参数,也是可以直接使用JDBC的。本文在实验的过程中,启动spark-shell未带任何参数。

从MySQL中读取数据

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object ReadMySql {
    def main(args: Array[String]) = {
        val spark = SparkSession.builder().getOrCreate()
        val jdbcDF = spark.read.format("jdbc").
            option("url", "jdbc:mysql://192.168.2.180:3306/spark").
            option("driver", "com.mysql.jdbc.Driver").
            option("dbtable","student").
            option("user","root").
            option("password", "hunter").
            load()
        jdbcDF.show()
    }
}

向MySQL中写数据(在spark-shell中交互操作)

import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object InsertMySql {
    def main(args: Array[String]) = {
        val spark = SparkSession.builder().getOrCreate()
        val studentRDD = spark.sparkContext.parallelize(Array("7,TianQiQi,M,30","8,GouDan,M,18")).map(_.split(","))
        val fields = Array(
            StructField("id", IntegerType, true),
            StructField("name", StringType, true), 
            StructField("gender", StringType, true), 
            StructField("age", IntegerType, true)) 
        val schema = StructType(fields)
        
        val rowRDD = studentRDD.map(p => {Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt)})
        val studentDF = spark.createDataFrame(rowRDD, schema)
        
        val prop = new Properties()
        prop.put("user","root")
        prop.put("password", "hunter")
        prop.put("driver", "com.mysql.jdbc.Driver")
        
        studentDF.write.mode("append").jdbc("jdbc:mysql://192.168.2.180:3306/spark", "spark.student", prop)
    }
}

5 SparkSQL 读写 Hive

Hive的部署和安装参考:apache-hive-1.2.2安装

初始配置

检查当前的Spark版本是否包含Hive支持,在spark-shell交互界面输入:

import org.apache.spark.sql.hive.HiveContext

若引入不报错,则说明当前的spark已经支持Hive。
在Hive中创建库和表并插入记录

create database if not exists sparkhive;
show databases;

use sparkhive;

create table if not exists sparkhive.student(
    id int, name string, gender string, age int
);

insert into student values(1, "ZhangSan", "F", 23);
insert into student values(2, "LiSi", "M", 24);

select * from student;

设置Spark的环境变量。为了让Spark能够顺利的访问Hive,需要在spark-env.sh中添加如下变量:

export HIVE_HOME=/root/software/apache-hive-1.2.1-bin
export HIVE_CONF_DIR=$HIVE_HOME/conf
export SPARK_CLASSPATH=$HBASE_HOME/lib:$HIVE_HOME/lib/mysql-connector-java-5.1.40-bin.jar

其他节点分发:

cd $HBASE_HOME/conf 
scp hbase-env.sh slave1:/root/software/hbase-1.2.1/conf
scp hbase-env.sh slave2:/root/software/hbase-1.2.1/conf
scp hbase-env.sh slave3:/root/software/hbase-1.2.1/conf
cd $HIVE_HOME
scp -r lib/mysql-connector-java-5.1.40-bin.jar conf/ slave1:$HIVE_HOME
scp -r lib/mysql-connector-java-5.1.40-bin.jar conf/ slave2:$HIVE_HOME
scp -r lib/mysql-connector-java-5.1.40-bin.jar conf/ slave3:$HIVE_HOME

拷贝hive-site.xml到$SPARK_HOME/conf目录下

cp $HIVE_HOME/conf/hive-site.xml   $SPARK_HOME/conf

注意:如果不将hive-site.xml拷贝到spark的conf目录下,会出现找不到表的异常。

从hive中读数据

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object ReadHive {
    def main(args: Array[String]) = {
        val warehouseLocation = "spark-warehouse"
        val spark = SparkSession.builder().
            appName("Spark Hive Example").
            config("spark.sql.warehouse.dir", warehouseLocation).
            enableHiveSupport().getOrCreate()
            
        import spark.implicits._
        import spark.sql
        sql("select * from sparkhive.student").show()
    }
}

向hive中写数据

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object InsertHive {
    def main(args: Array[String]) = {
        val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
        import spark.sql
        import spark.implicits._
        
        val studentRDD = spark.sparkContext.parallelize(Array("30,TianQi,M,30","40,ErGouZi,F,18")).map(_.split(","))
        val fields = Array(
            StructField("id", IntegerType, true),
            StructField("name", StringType, true), 
            StructField("gender", StringType, true), 
            StructField("age", IntegerType, true)) 
        val schema = StructType(fields)
        
        val rowRDD = studentRDD.map(p => {Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt)})
        
        val studentDF = spark.createDataFrame(rowRDD, schema)
        
        studentDF.registerTempTable("tempTable")
        
        sql("insert into sparkhive.student select * from tempTable")
    }
}

错误和异常的解决方法

yarn cluster模式使用SQL找不到表 报错:
org.apache.spark.sql.AnalysisException: Table or view not found:
at org.apache.spark.sql.catalyst.analysis.packageAnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.AnalyzerResolveRelations$.getTable(Analyzer.scala:306)

解决方法:
1 Using SparkSession.enableHiveSupport() instead of deprecated SQLContext or HiveContext.
2 copy hive-site.xml into SPARK CONF (/usr/lib/spark/conf) directory
3 Adding the same directory to the classpath while executing the jar

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容