加载数据
- 创建
SaparkSession
val sparkSession =SparkSession.builder().master("local[4]").appName("test").getOrCreate()
- 加载数据方式
*
表示加载的方式
sparkSession.read.*
format指定加载数据类型
spark.read.format("…")[.option("…")].load("…")
format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"
load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据路径
option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
spark.read.format("json").load ("input/user.json").show
示例一:
sparkSession.read.format("text")
示例二:
sparkSession.read.format("json")
直接读取数据
spark.read直接读取数据:csv format jdbc json load option
options orc parquet schema table text textFile
注意:加载数据的相关参数需写到上述方法中,
如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
spark.read.json("input/user.json").show()
示例一:
sparkSession.read.text()
示例二:
sparkSession.read.textFile()
读取文本文件
方式一:format("text")
val df: DataFrame = sparkSession.read.format("text").load("path")
文本文件无法进行切分,按行读取,若要对一行的数据进行切分,需要可以使用map
或其他算子。
// 导入隐式转换,这一步很重要
import sparkSession.implicits._
// 我这里按照`_`分隔,会得到一个`Dataset`
val value: Dataset[List[String]] = df.map(_.mkString.split("_").toList)
完整代码
@Test
def readTextFile(): Unit ={
// 加载文件,sparkSession 上面有定义
val df: DataFrame = sparkSession.read.format("text").load("./src/main/resources/data/user_visit_action.txt")
// 导入隐式转换
import sparkSession.implicits._
// 对每行数据进行切分
val value: Dataset[List[String]] = df.map(_.mkString.split("_").toList)
// 输出
value.foreach(println(_))
}
若还想把文本文件转换成表
,使用sql
来操作;还需要将List
转换成'元组'
value :上面的 Dataset[List[String]]
value1 :转换出来的元组
_1-_13:类名(偷懒,也可以根据该列的定义来命名,如 name, address 等符合规范的命名方式)
// 转换成元组
val value1: Dataset[(String, String, String, String, String, String, String, String, String, String, String, String, String)] = value.map(e => {
e match {
case List(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13) => (_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13)
}
})
//注册成表
value1.createOrReplaceTempView("log")
// 编写sql
sparkSession.sql("select * from log").show()
方式二:read.text("path")
@Test
def readTextFile(): Unit ={
// 加载文件,sparkSession 上面有定义
val df: DataFrame = sparkSession.read.text("./src/main/resources/data/user_visit_action.txt")
// 导入隐式转换
import sparkSession.implicits._
// 对每行数据进行切分
val value: Dataset[List[String]] = df.map(_.mkString.split("_").toList)
// 转换成元组
val value1: Dataset[(String, String, String, String, String, String, String, String, String, String, String, String, String)] = value.map(e => {
e match {
case List(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13) => (_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13)
}
})
//注册成表
value1.createOrReplaceTempView("log")
// 编写sql
sparkSession.sql("select * from log").show()
}
结果就不展示了,(真是数据)
text 与 textFile 的区别
- 都是用于读取文本文件
- 返回的类型不同
text 返回的是DataFrame
val df: DataFrame = sparkSession.read.text("./src/main/resources/data/user_visit_action.txt")
textFile 返回的是Dataset
val df: Dataset[String] = sparkSession.read.textFile("./src/main/resources/data/user_visit_action.txt")
读取json文件
注意:读取json文件读取其实json行文件。
如像这种
{"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
{"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
{"id":2,"name":"绣花","age":16,"sex":"女","class_id":1}
{"id":3,"name":"李四","age":18,"sex":"男","class_id":1}
{"id":4,"name":"王五","age":18,"sex":"男","class_id":1}
{"id":5,"name":"翠花","age":19,"sex":"女","class_id":1}
{"id":6,"name":"张鹏","age":17,"sex":"男","class_id":1}
方式一:format("json")
@Test
def readJSONFile(): Unit ={
val df: DataFrame = sparkSession.read.format("json").load("C:\\Users\\123456\\Desktop\\student.json")
df.foreach(e=>{
println(e)
})
}
结果
[18,1,1,张三,男]
[18,1,1,张三,男]
[16,1,2,绣花,女]
[18,1,3,李四,男]
[18,1,4,王五,男]
[19,1,5,翠花,女]
[17,1,6,张鹏,男]
方式二:read.json("path")
@Test
def readJSONFile(): Unit ={
val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
df.foreach(e=>{
println(e)
})
}
若要使用sql
方式来操作需要转换成表
@Test
def readJSONFile(): Unit ={
val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
df.createOrReplaceTempView("json")
sparkSession.sql("select * from json").show()
}
+---+--------+---+----+---+
|age|class_id| id|name|sex|
+---+--------+---+----+---+
| 18| 1| 1|张三| 男|
| 18| 1| 1|张三| 男|
| 16| 1| 2|绣花| 女|
| 18| 1| 3|李四| 男|
| 18| 1| 4|王五| 男|
| 19| 1| 5|翠花| 女|
| 17| 1| 6|张鹏| 男|
+---+--------+---+----+---+
统计男女人数
@Test
def readJSONFile(): Unit ={
val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
// 注册成表
df.createOrReplaceTempView("json")
// 编写sql
sparkSession.sql("select sex,count(1) from json group by sex").show()
}
+---+--------+
|sex|count(1)|
+---+--------+
| 男| 5|
| 女| 2|
+---+--------+
与 text
相比,无需指定列字段,也不用导入隐式转换。
读取JDBC数据
导入mysql依赖包
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.25</version>
</dependency>
方式一:format("jdbc")
连接数据库,不能像加载文件的方式使用load("path")
的形式了,而是需要使用option()
的方式。
option()
采用k-v
的形式进行配置
@Test
def readJDBC(): Unit ={
val reader: DataFrameReader = sparkSession.read.format("jdbc")
// url
reader.option(JDBCOptions.JDBC_URL, "jdbc:mysql://hadoop102:3306/gmall")
// 指定表
reader.option(JDBCOptions.JDBC_TABLE_NAME,"user_info")
// 账号
reader.option("user","root")
// 密码
reader.option("password","123321")
// 连接
val df: DataFrame = reader.load()
df.show()
}
位于org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
项目中。
object JDBCOptions {
private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
private val jdbcOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
jdbcOptionNames += name.toLowerCase(Locale.ROOT)
name
}
val JDBC_URL = newOption("url")
val JDBC_TABLE_NAME = newOption("dbtable")
val JDBC_QUERY_STRING = newOption("query")
val JDBC_DRIVER_CLASS = newOption("driver")
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
val JDBC_LOWER_BOUND = newOption("lowerBound")
val JDBC_UPPER_BOUND = newOption("upperBound")
val JDBC_NUM_PARTITIONS = newOption("numPartitions")
val JDBC_QUERY_TIMEOUT = newOption("queryTimeout")
val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
val JDBC_TRUNCATE = newOption("truncate")
val JDBC_CASCADE_TRUNCATE = newOption("cascadeTruncate")
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customSchema")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
}
此种方式一直有一个分区,不适用数据量大的表
println(df.rdd.getNumPartitions) // 1
方式二:read.jdbc()
jdbc()有三个重载方法
- def jdbc(
url
: String,table
: String, properties: Properties) - def jdbc(
url
: String,table
: String,predicates
: Array[String],connectionProperties
: Properties) - def jdbc(
url
: String,table
: String,columnName
: String,lowerBound
: Long,upperBound
: Long,numPartitions
: Int,connectionProperties
: Properties)
url
:jdbc连接地址
table
:指定表名
predicates
:每个分区的 where 子句中的条件。
connectionProperties
、properties
:java.util.Properties;通常配置一些账号
及密码
等连接信息
方式一:此种方式读取mysql数据的时候只有一个分区 <TODO 只适用于数据量特别小的时候>
@Test
def readJDBC(): Unit ={
// url
val url="jdbc:mysql://hadoop102:3306/gmall"
// 表名
val table_name="user_info"
// 配置
val prop=new Properties()
prop.put("user","root")
prop.put("password","123321")
val reader: DataFrame= sparkSession.read.jdbc(url,table_name,prop)
reader.show()
// 分区数
println(reader.rdd.getNumPartitions) // 结果为 1
}
查询结果
+---+------------+---------+------+--------+-----------+--------------------+--------+----------+----------+------+-------------------+------------+------+
| id| login_name|nick_name|passwd| name| phone_num| email|head_img|user_level| birthday|gender| create_time|operate_time|status|
+---+------------+---------+------+--------+-----------+--------------------+--------+----------+----------+------+-------------------+------------+------+
| 1| 9i5x8jm| 阿河| null| 孟河|13414217211| 9i5x8jm@0355.net| null| 1|1967-06-14| M|2020-06-14 16:38:32| null| null|
| 2|613oemp403w5| 岚艺| null|令狐岚艺|13617884815|613oemp403w5@263.net| null| 1|1982-06-14| F|2020-06-14 16:38:32| null| null|
| 3| vxipycm| 羽羽| null| 邬羽|13348276946| vxipycm@yeah.net| null| 1|1975-06-14| F|2020-06-14 16:38:32| null| null|
| 4| tvl37zh3| 昭昭| null| 公孙昭|13872281627| tvl37zh3@263.net| null| 2|1992-06-14| F|2020-06-14 16:38:32| null| null|
| 5| hlh52rbbot| 超浩| null| 许超浩|13645559786| hlh52rbbot@yeah.net| null| 1|1971-06-14| M|2020-06-14 16:38:32| null| null|
| 6| k7ydt3| 馥馥| null| 茅馥|13621194624| k7ydt3@qq.com| null| 1|2005-06-14| F|2020-06-14 16:38:32| null| null|
| 7| w748bj1wb| 卿聪| null| 汤卿聪|13132783352| w748bj1wb@sohu.com| null| 1|1976-06-14| F|2020-06-14 16:38:32| null| null|
| 8| sn5bybujp9h| 阿固| null| 钱固|13842783258| sn5bybujp9h@263.net| null| 1|2003-06-14| M|2020-06-14 16:38:32| null| null|
| 9| s5iamgaydk| 云莲| null| 汪云莲|13268453518| s5iamgaydk@ask.com| null| 3|1994-06-14| F|2020-06-14 16:38:32| null| null|
| 10| ad5for2z27| 艺艺| null| 司徒艺|13513942543| ad5for2z27@163.net| null| 1|1983-06-14| F|2020-06-14 16:38:32| null| null|
| 11| zqvdkg| 璐娅| null|宇文璐娅|13226187467| zqvdkg@ask.com| null| 1|2000-06-14| F|2020-06-14 16:38:32| null| null|
| 12| 8y4v4em| 聪聪| null| 安聪|13882977875| 8y4v4em@163.net| null| 1|1987-06-14| F|2020-06-14 16:38:32| null| null|
| 13| kj47k5| 晓晓| null| 祁晓|13984996852| kj47k5@live.com| null| 1|1990-06-14| F|2020-06-14 16:38:32| null| null|
| 14| 49ungc4vxw| 昌成| null| 方昌成|13136526735|49ungc4vxw@hotmai...| null| 3|1987-06-14| M|2020-06-14 16:38:32| null| null|
| 15| w71ygayab8h| 荣爱| null| 孙荣爱|13161527545|w71ygayab8h@0355.net| null| 2|1977-06-14| F|2020-06-14 16:38:32| null| null|
| 16| ttyeksqza| 卿卿| null| 公孙卿|13221949889| ttyeksqza@163.net| null| 2|1966-06-14| F|2020-06-14 16:38:32| null| null|
| 17| 558vfl| 华慧| null| 陶华慧|13825216547| 558vfl@163.com| null| 2|1992-06-14| F|2020-06-14 16:38:32| null| null|
| 18| ws81it| 坚和| null| 余坚和|13741734718| ws81it@yahoo.com.cn| null| 1|2003-06-14| M|2020-06-14 16:38:32| null| null|
| 19| hpsvztha| 希希| null| 元希|13172412153| hpsvztha@sohu.com| null| 1|2005-06-14| F|2020-06-14 16:38:32| null| null|
| 20|i0jht7qvv1t9| 茗茗| null| 安茗|13842784759|i0jht7qvv1t9@0355...| null| 2|1977-06-14| F|2020-06-14 16:38:32| null| null|
+---+------------+---------+------+--------+-----------+--------------------+--------+----------+----------+------+-------------------+------------+------+
only showing top 20 rows
第一种方式和使用format("jdbc")
一样,只有一个分区。
方式二:此种方式,读取mysql数据的时候分区数 = predicates
元素个数 <TODO 一般不用>
@Test
def readJDBC(): Unit ={
// url
val url="jdbc:mysql://hadoop102:3306/gmall"
// 表名
val table_name="user_info"
// 配置
val prop=new Properties()
prop.put("user","root")
prop.put("password","123321")
// 指定每个分区的条件
val predicates=Array(
"LENGTH(name)=2", //将name长度为2的数据化为一个分区
"email like '%.com'", //将email 以.com结尾的划分为一个分区
"gender='M'" // 将gender为M的化为一个分区
)
val reader: DataFrame= sparkSession.read.jdbc(url,table_name,predicates,prop)
reader.show()
println(reader.rdd.getNumPartitions) // 结果为 3
}
这种方式虽然可以指定分区,但是需要通过where
条件来指定分区,如数据量比较大,需要分100个分区,难道要写100个where条件
吗?所以一般清空下不用。数据量小的清空下用方式一
,数据量大的情况下用方式三
。
方式三:此种方式,读取mysql数据的分区数 = (upperBound-lowerBound) > numPartitions? upperBound-lowerBound : numPartitions
columnName
:指定按照什么方式进行分区;字段类型必须为数字
、日期
或时间戳
类型列的名称。
lowerBound
:用于决定分区步幅的 columnName
的最小值。
upperBound
:用于决定分区步幅的 columnName
的最大值。
numPartitions
: 分区数。这与“lowerBound”(包含)、“upperBound”(不包含)一起形成分区步幅,用于生成用于均匀拆分列“columnName”的 WHERE 子句表达式。当输入小于 1 时,数字设置为 1。
@Test
def readJDBC(): Unit ={
// url
val url="jdbc:mysql://hadoop102:3306/gmall"
// 表名
val table_name="user_info"
// 分区字段
val columnName="id"
// 分区步幅的最小值
val lowerBound=1
// 分区步幅的最大值
val upperBound=10
// 分区数
val numPartitions=99
// 配置
val prop=new Properties()
prop.put("user","root")
prop.put("password","123321")
val reader: DataFrame= sparkSession.read.jdbc(url,table_name,columnName,lowerBound,upperBound,numPartitions,prop)
reader.show()
println(reader.rdd.getNumPartitions)
}
分区数为9
为什么分区数是9呢?是怎么的来了,通过公式计算
(upperBound-lowerBound) >= numPartitions? numPartitions : upperBound-lowerBound
=(10-1)>=99?99:10-1
计算步长
val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
=10/99-1/99=0.09090909090909091 (取整为0)
保存数据
语法:
df.write
.mode(SaveMode
.XX) // 数据写入模式
.写入格式
(csv,json,jdbc)
SaveMode模式:
-
Append
: 追加数据<TODO 常用,一般数据写入mysql会用> -
Overwrite
: 如果目录存在则覆盖数据<TODO 常用,一般用于数据写入HDFS> -
ErrorIfExists
: 如果目录已经存在则报错 -
Ignore
: 如果目录存在则不写入
示例:将数据写入成格式
{"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
{"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
{"id":2,"name":"绣花","age":16,"sex":"女","class_id":1}
{"id":3,"name":"李四","age":18,"sex":"男","class_id":1}
{"id":4,"name":"王五","age":18,"sex":"男","class_id":1}
{"id":5,"name":"翠花","age":19,"sex":"女","class_id":1}
{"id":6,"name":"张鹏","age":17,"sex":"男","class_id":1}
案例:读取json文件,将性别为男的信息写入到csv格式文件中
@Test
def jsonWriteCsv(): Unit ={
val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
//注册成表
df.createOrReplaceTempView("json")
//编写sql 查询所有性别为男的数据
val frame: DataFrame = sparkSession.sql(
"""
|select * from json where sex='男'
|""".stripMargin)
// 将结果写入到csv格式文件中
frame.write.mode(SaveMode.Overwrite).csv("output/10001.csv")
}
文件结果
18,1,1,张三,男
18,1,1,张三,男
18,1,3,李四,男
18,1,4,王五,男
17,1,6,张鹏,男
默认分隔符号为,
号,也可以指定其他分隔符号
使用sep
指定分隔形式
frame.write.mode(SaveMode.Overwrite).option("sep","\t").csv("output/10001.csv")
18 1 1 张三 男
18 1 1 张三 男
18 1 3 李四 男
18 1 4 王五 男
17 1 6 张鹏 男
除了保存到文件中,也可以保存到数据库
中
@Test
def jsonWriteMysql(): Unit ={
// 读取json文件
val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
// url
val url="jdbc:mysql://hadoop102:3306/gmall"
// 表名
val table_name="json_to_student"
// 配置
val prop=new Properties()
prop.put("user","root")
prop.put("password","123321")
//将数据写入到mysql数据库中
df.write.mode(SaveMode.Overwrite).jdbc(url,table_name,prop)
}
使用mysql客户端,查询json_to_student
select * from json_to_student;
注意:
- 写入到
jdbc
中中文数据乱码
解决方案,url
后面加?useUnicode=true&characterEncoding=UTF-8
val url="jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=UTF-8"
- 数据写入mysql采用追加的形式,有可能出现主键冲突,解决方式使用
foreachPartitions
df.rdd.foreachPartition()
案例演示
@Test
def jsonWriteMysql2(): Unit ={
// 读取json文件
val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
//将数据写入到mysql数据库中
//df.write.mode(SaveMode.Append).jdbc(url,table_name,prop)
df.rdd.foreachPartition(it=>{
// 注册驱动
Class.forName("com.mysql.jdbc.Driver")
// url
val url="jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=UTF-8"
// 账号
val username="root"
// m密码
val password="123321"
// 获取连接
import java.sql.DriverManager
val conn = DriverManager.getConnection(url, username, password)
// 编写sql
val sql=
"""
|insert into json_to_student(age,class_id,id,name,sex) values(?,?,?,?,?) ON DUPLICATE KEY UPDATE
|age=values(age),class_id=values(class_id),name=values(name),sex=values(sex), id=values(id)
|""".stripMargin
val statement: PreparedStatement = conn.prepareStatement(sql)
it.foreach(e=>{
val age=e(0).toString.toInt
val class_id=e(1).toString.toInt
val id=e(2).toString.toInt
val name=e(3).toString
val sex=e(4).toString
statement.setInt(1,age)
statement.setInt(2,class_id)
statement.setInt(3,id)
statement.setString(4,name)
statement.setString(5,sex)
//添加到批处理中
statement.addBatch()
})
// 执行批处理
statement.executeBatch()
// 关闭资源
statement.close()
conn.close()
})
}
注意:
- 必须指定主键
“ALTER TABLE 数据表名 ADD PRIMARY KEY(字段名/列名);”
- 使用
ON DUPLICATE KEY UPDATE
可以完成主键存在则修改,不存在则插入
功能。
与Hive 交互
shell命令的方式
在spark中默认有一个hive
- 启动Hadoop
需要先进行Hadoop与spark配置 - 进入 spark 的
bin/spark-shell
终端中。
spark-yarn]$ bin/spark-shell
- 直接执行sql
创建表(ddl不用加show
)
scala> spark.sql("create table student2 (id int,name string,age int, sex string)")
插入一条数据(必须加show
)
scala> spark.sql("insert into student2 values(1,'tom',18,'M')").show
查询表
scala> spark.sql("select * from student2").show
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1| tom| 18| M|
+---+----+---+---+
此时 spark 根目录就有两个文件(自带的hive
数据会存在这两张表中)
-
metastore_db
:存放元数据信息 -
spark-warehouse
:存放 hive 表数据
使用的自带的
hive
默认使用的是derby
数据库
derby
有个特点:不支持同时开启两个./hive的命令终端,普遍操作,将hive数据库更换成mysql
数据库或其他。
若要使用自己的hive
需要将hive-site.xml
复制 spark 的conf
目录下
spark-yarn]$ cp /opt/module/hive/conf/hive-site.xml ./conf/
除此之外还需要 mysql的驱动包,拷贝到 spark的jars
目录下。
spark-yarn]$ cp /opt/software/mysql-connector-java-5.1.27-bin.jar ./jars/
重新进入spark-shell
中,这样就可以使用我们安装的hive
了。
spark-yarn]$ bin/spark-shell
idea方式
实际开发中,我们肯定通过代码的方式去操作hive
,所以我们需要将hive整合到项目中。
- 第一步拷贝
hive-site.xml
到resources
中 - 创建SparkSession时开启
Hive
支持(.enableHiveSupport()
)
// 创建 SparkSession 时 需要开启hive支持
val sparkSession =SparkSession.builder().master("local[4]").appName("test").enableHiveSupport().getOrCreate()
- 导入hive 依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
否则会出现如下错误
java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found.
- 编写sql
@Test
def demo01: Unit ={
sparkSession.sql("select * from t_person").show()
}
-
若遇到权限问题,解决方式
在 VM options 中配置 -DHADOOP_USER_NAME=账号名称