一、创建DataFrame的几种方式
1. 通过Seq生成
// Seq生成DataFrame
val df = spark.createDataFrame(Seq(
("数学", "张三", 88), ("语文", "张三", 92),
("英语", "张三", 77), ("数学", "王五", 65),
("语文", "王五", 87), ("英语", "王五", 90),
("数学", "李雷", 67), ("语文", "李雷", 33),
("英语", "李雷", 24), ("数学", "宫九", 77),
("语文", "宫九", 87), ("英语", "宫九", 90))).toDF("科目", "姓名", "分数")
// 显示数据
df.orderBy("科目").show()
+----+----+----+
|科目|姓名|分数|
+----+----+----+
|数学|张三| 88|
|数学|王五| 65|
|数学|宫九| 77|
|数学|李雷| 67|
|英语|张三| 77|
|英语|王五| 90|
|英语|李雷| 24|
|英语|宫九| 90|
|语文|张三| 92|
|语文|宫九| 87|
|语文|王五| 87|
|语文|李雷| 33|
+----+----+----+
2. DataSet生成
Dataset与DataFrame的区别是DataFrame的一行记录中没有指定特定的数据类型,而 Dataset 的一行中的数据都是明确类型的。
import org.apache.spark.sql.Encoders
// 指定类型为Encoders.STRING
val dataSet = spark.createDataset(Array(
"李明,20,15552211521", "王红,19,13287994007", "刘三,21,15552211523"
))(Encoders.STRING)
spark.read.csv(dataSet).toDF("name", "age", "phone").show()
+----+---+-----------+
|name|age| phone|
+----+---+-----------+
|李明| 20|15552211521|
|王红| 19|13287994007|
|刘三| 21|15552211523|
+----+---+-----------+
3. 动态创建schema
import java.util
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("phone", LongType, true)
))
val dataList = new util.ArrayList[Row]()
dataList.add(Row("李明",20,15552211521L))
dataList.add(Row("王红",19,13287994007L))
dataList.add(Row("刘三",21,15552211523L))
spark.createDataFrame(dataList,schema).show()
+----+---+-----------+
|name|age| phone|
+----+---+-----------+
|李明| 20|15552211521|
|王红| 19|13287994007|
|刘三| 21|15552211523|
+----+---+-----------+
二、DataFrame API基本操作
1. 数据
people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
2. 基本操作
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object MySpark {
def main(args: Array[String]) {
// 定义应用名称
val conf = new SparkConf().setAppName("mySpark0")
conf.setMaster("spark://master:7077")
conf.setJars(Seq("/root/SparkTest.jar"))
// 创建SparkSession对象
val spark = SparkSession.builder()
.appName("DataFrameAPP")
.config(conf)
.getOrCreate()
// 将json文件加载成一个DataFrame
val peopleDF = spark.read.format("json")
.load("hdfs://192.168.71.130:9000/people.json") // 添加hdfs路径
// 1、输出DataFrame对应的schema信息
peopleDF.printSchema()
// 2、输出DataFrame里面的数据,不加参数默认输出前20条
peopleDF.show(100)
// 3、查询DataFrame里面某一列数据:select name from table;
peopleDF.select("name").show()
// 4、查询某几列所有数据,并对列进行计算:select name,age+10 from table;
peopleDF.select(peopleDF.col("name"), peopleDF.col("age") + 10).show
// 5、起个别名:age+10 ==> age2
peopleDF.select(peopleDF.col("name"),(peopleDF.col("age") + 10)
.as("age2")).show()
// 6、过滤,输出年龄大于19岁的
peopleDF.filter(peopleDF.col("age") > 19).show()
// 7、根据某一列进行分组,然后再进行聚合操作:select age,count(1) from table group by age
peopleDF.groupBy("age").count().show()
//关闭资源
spark.stop()
}
}
3. show方法
org.apache.spark.sql.Dataset类的方法:
功能:展示数据
1)show()
show方法有四种调用方式,分别为:
默认,只显示前20条记录
df.show()
2)show(numRows: Int)
显示指定行数:
df.show(1)
3)show(numRows: Int)
显示指定行数:
df.show(1)
4)show(truncate: Boolean)
是否截断长字符串。如果为true,则超过20个字符的字符串将被截断,并且所有单元格都将右对齐。
scala> df.show()
+----+---+------+---+----------------------+-----+
|addr|age|deptid| id| name|score|
+----+---+------+---+----------------------+-----+
|天津| 20| 10| 1|赵伟123456789098765...|80.92|
scala> df.show(false)
+----+---+------+---+-----------------------+-----+
|addr|age|deptid|id |name |score|
+----+---+------+---+-----------------------+-----+
|天津|20 |10 |1 |赵伟1234567890987654321|80.92|
4)show(numRows: Int, truncate: Int, vertical: Boolean)
scala> df.show(1,3,true)
-RECORD 0-------
addr | 天津
age | 20
deptid | 10
id | 1
name | 赵伟1
score | 80.
only showing top 1 row
二、 Spark Scala Sql
数据:
{"id":1,"name":"赵伟","age":20,"addr":"天津","score":80.92,"deptid":10}
{"id":2,"name":"钱枫","age":30,"addr":"北京","score":90,"deptid":20}
{"id":3,"name":"孙斌","age":19,"addr":"上海","score":88,"deptid":30}
{"id":4,"name":"李浩楠","age":19,"addr":"上海","score":87,"deptid":40}
{"id":5,"name":"刘文娟","age":19,"addr":null,"score":96,"deptid":20}
1、查询
// 按id 与 姓名查询
df.select("id", "name").show
+---+------+
| id| name|
+---+------+
| 1| 赵伟|
| 2| 钱枫|
| 3| 孙斌|
| 4|李浩楠|
+---+------+
// 选择字段,并进行计算,必须指定字段别名 as
df.select(df("name"), (df("age")+ 1).as("age1")).show()
+------+----+
| name|age1|
+------+----+
| 赵伟| 21|
| 钱枫| 31|
| 孙斌| 20|
|李浩楠| 20|
+------+----+
// selectExpr 可以对指定字段进行特殊处理 round 进行四舍五入
df.selectExpr("id", "name as username", "round(score,1)").show(false)
+---+--------+---------------+
|id |username|round(score, 1)|
+---+--------+---------------+
|1 |赵伟 |80.9 |
|2 |钱枫 |90.0 |
|3 |孙斌 |88.0 |
|4 |李浩楠 |88.0 |
+---+--------+---------------+
// 获取指定字段(只能一个,返回为Column)对象
val id1 = df.apply("id")
val id2 = df("id")
2、查询指定字段
col:获取指定字段
只能获取一个字段,返回对象为Column类型。
val idCol = df.col("id")
apply:获取指定字段
只能获取一个字段,返回对象为Column类型
// 获取指定字段(只能一个,返回为Column)
val id1 = df.apply("id")
val id2 = df("id")
// 选出两列,把其中一列加1
df.select(id1, id2 + 1).show
+---+--------+
| id|(id + 1)|
+---+--------+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
+---+--------+
3、选择指定行
// first获取第一行
val row = df.first()
print(row.get(0) + "," + row.getAs("name"))
# 执行结果
天津,赵伟
// head获取第一行,head(n: Int)获取前n行记录
val rows = df.head(2)
for(row<-rows){
print(row.get(0)+"\t")
print(row.get(1)+"\t")
print(row.get(2))
println()
}
# 执行结果
天津 20 10
北京 30 20
// take(n: Int)获取前n行数据
val rows = df.take(2)
for(row<-rows){
print(row.get(0)+"\t")
print(row.get(1)+"\t")
print(row.get(2))
println()
}
# 执行结果
天津 20 10
北京 30 20
// takeAsList(n: Int)获取前n行数据,并以List的形式展现
val rows = df.takeAsList(2)
// 查看输出数据
println(rows)
// 读取0行 0列数据
println(rows.get(0).get(0))
val iterator = rows.iterator()
while(iterator.hasNext){
val row=iterator.next()
print(row.get(0)+" "+row.get(1)+" "+row.get(2))
println()
}
# 执行结果
[[天津,20,10,1,赵伟,80.92], [北京,30,20,2,钱枫,90.0]]
天津
天津 20 10
北京 30 20
以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。
take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError
4、条件查询
where(conditionExpr: String):SQL语言中where关键字后的条件,传入筛选条件表达式,可以用and和or。得到DataFrame类型的返回结果。
// 条件为id等于1,姓名为刘文娟
df.where("id = 1 or name = '刘文娟'" ).show()
+----+---+------+---+------+-----+
|addr|age|deptid| id| name|score|
+----+---+------+---+------+-----+
|天津| 20| 10| 1| 赵伟|80.92|
|null| 19| 20| 5|刘文娟| 96.0|
+----+---+------+---+------+-----+
filter:根据字段进行筛选,传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同
// 条件为id等于1,姓名为刘文娟
df.filter("id = 1 or name = '刘文娟'" ).show()
+----+---+------+---+------+-----+
|addr|age|deptid| id| name|score|
+----+---+------+---+------+-----+
|天津| 20| 10| 1| 赵伟|80.92|
|null| 19| 20| 5|刘文娟| 96.0|
+----+---+------+---+------+-----+
5、 limit
limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。
df.limit(3).show
+----+---+------+---+----+-----+
|addr|age|deptid| id|name|score|
+----+---+------+---+----+-----+
|天津| 20| 10| 1|赵伟|80.92|
|北京| 30| 20| 2|钱枫| 90.0|
|上海| 19| 30| 3|孙斌| 88.0|
+----+---+------+---+----+-----+
6、删除操作
删除指定字段(列),保留其他字段
// 删除id字段
df.drop("id").show()
+----+---+------+------+-----+
|addr|age|deptid| name|score|
+----+---+------+------+-----+
|天津| 20| 10| 赵伟|80.92|
|北京| 30| 20| 钱枫| 90.0|
|上海| 19| 30| 孙斌| 88.0|
|上海| 19| 40|李浩楠| 88.0|
+----+---+------+------+-----+
// 删除name字段
df.drop(df("name")).show()
+----+---+------+---+-----+
|addr|age|deptid| id|score|
+----+---+------+---+-----+
|天津| 20| 10| 1|80.92|
|北京| 30| 20| 2| 90.0|
|上海| 19| 30| 3| 88.0|
|上海| 19| 40| 4| 88.0|
+----+---+------+---+-----+
// 删除带有空值的行
df.na.drop().show()
+----+---+------+---+------+-----+
|addr|age|deptid| id| name|score|
+----+---+------+---+------+-----+
|天津| 20| 10| 1| 赵伟|80.92|
|北京| 30| 20| 2| 钱枫| 90.0|
|上海| 19| 30| 3| 孙斌| 88.0|
|上海| 19| 40| 4|李浩楠| 87.0|
+----+---+------+---+------+-----+
7、排序查询
orderBy|sort($"列名")
升序排列
orderBy|sort($"列名".desc)
降序排列
orderBy|sort($"列1" , $"列2".desc)
按两列排序
// 升序排列,只对数字类型和日期类型生效
df.select("id","name").orderBy(df("id")).show
+---+----+
| id|name|
+---+----+
| 1|赵伟|
| 2|钱枫|
| 3|孙斌|
+---+----+
// 降序排列,只对数字类型和日期类型生效
df.select("id","name").orderBy(df("id").desc).show
df.select("id","name").orderBy(-df("id")).show
+---+------+
| id| name|
+---+------+
| 4|李浩楠|
| 3| 孙斌|
| 2| 钱枫|
| 1| 赵伟|
+---+------+
// sort降序,sort用法和orderBy相同,可以直接替换
df.select("id","name").sort(df("id").desc).show
// 一列升序,一列降序,同时调整两列
df.select("age","deptid").sort(df("age").desc,df("deptid").asc).show
+---+------+
|age|deptid|
+---+------+
| 30| 20|
| 20| 10|
| 19| 30|
| 19| 40|
+---+------+
// sortWithinPartitions 和sort类似,但是是使用Partition来对其他字段排序
df.sortWithinPartitions("deptid").show
+----+---+------+---+------+-----+
|addr|age|deptid| id| name|score|
+----+---+------+---+------+-----+
|天津| 20| 10| 1| 赵伟|80.92|
|北京| 30| 20| 2| 钱枫| 90.0|
|null| 19| 20| 5|刘文娟| 96.0|
|上海| 19| 30| 3| 孙斌| 88.0|
|上海| 19| 40| 4|李浩楠| 87.0|
+----+---+------+---+------+-----+
8、 去重
(1)distinct:返回一个不包含重复记录的Dataframe(整体去重),结果和dropDuplicates不传入指定字段的结果相同。
返回当前不重复的row(行)记录df.distinct()。
(2)dropDuplicates:根据指定字段去重
根据指定字段去重。
加入重复数据:
{"id":4,"name":"李浩楠","age":19,"addr":"上海","score":87,"deptid":40}
去重:
df.distinct().show
+----+---+------+---+------+-----+
|addr|age|deptid| id| name|score|
+----+---+------+---+------+-----+
|null| 19| 20| 5|刘文娟| 96.0|
|北京| 30| 20| 2| 钱枫| 90.0|
|上海| 19| 40| 4|李浩楠| 87.0|
|上海| 19| 30| 3| 孙斌| 88.0|
|天津| 20| 10| 1| 赵伟|80.92|
+----+---+------+---+------+-----+
指定字段去重
df.dropDuplicates(Seq("age")).show
+----+---+------+---+----+-----+
|addr|age|deptid| id|name|score|
+----+---+------+---+----+-----+
|上海| 19| 30| 3|孙斌| 88.0|
|北京| 30| 20| 2|钱枫| 90.0|
|天津| 20| 10| 1|赵伟|80.92|
+----+---+------+---+----+-----+
9、分组查询--聚合(groupBy&agg)
groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。
groupBy("列名", ...).max(列名) 求最大值
groupBy("列名", ...).min(列名) 求最小值
groupBy("列名", ...).avg(列名) 求平均值
groupBy("列名", ...).sum(列名) 求和
groupBy("列名", ...).count() 求个数
groupBy("列名", ...).agg 可以将多个方法进行聚合
// 按年龄分组
df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 19| 2|
| 30| 1|
| 20| 1|
+---+-----+
// 分组计数(各个年龄的人出现了多少次)
df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 19| 4|
| 30| 1|
| 20| 1|
+---+-----+
GroupedData对象:
该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,
max(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
min(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
mean(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段(avg也可)
sum(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
count()方法,获取分组中的元素个数
agg方法,可以对指定字段进行聚合操作。
// 按年龄分组,找出第组最高分、
df.groupBy("age").max("score").show
+---+----------+
|age|max(score)|
+---+----------+
| 19| 96.0|
| 30| 90.0|
| 20| 80.92|
+---+----------+
// 按年龄分组,找出每组平均分、人数
df.groupBy("age").agg(Map("score" -> "max", "score" -> "mean","*" -> "count")).show()
// 如果有相同的列,使用别名将全显示
scala> df.groupBy("age").agg(max("score").alias("max_score"),avg("score").alias("avg_score")).show()
+---+---------+-----------------+
|age|max_score| avg_score|
+---+---------+-----------------+
| 19| 96.0|90.33333333333333|
| 30| 90.0| 90.0|
| 20| 80.92| 80.92|
+---+---------+-----------------+
注意:
如果聚合中出现相同的字段,只显示最后一个字段。
10、union 合并查询
unionAll方法(已过时,使用union相同):对两个DataFrame进行组合,类似于SQL中的UNION ALL操作。
// 合并不去重
df.union(df.limit(1)).show()
+----+---+------+---+------+-----+
|addr|age|deptid| id| name|score|
+----+---+------+---+------+-----+
|天津| 20| 10| 1| 赵伟|80.92|
|北京| 30| 20| 2| 钱枫| 90.0|
|上海| 19| 30| 3| 孙斌| 88.0|
|上海| 19| 40| 4|李浩楠| 87.0|
|上海| 19| 40| 4|李浩楠| 87.0|
|null| 19| 20| 5|刘文娟| 96.0|
|天津| 20| 10| 1| 赵伟|80.92|
+----+---+------+---+------+-----+
11、join 连接查询
添加部门数据:
{"deptid":10,"dname":"财务部"}
{"deptid":20,"dname":"研发部"}
{"deptid":30,"dname":"事业部"}
{"deptid":40,"dname":"人力部"}
{"deptid":60,"dname":"项目部"}
1)笛卡尔积
在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。
// 将json文件加载成一个DataFrame
val df = spark.read.format("json")
.load("hdfs://192.168.71.130:9000/people.json") // 添加hdfs路径
val df1 = spark.read.format("json")
.load("hdfs://192.168.71.130:9000/depart.json") // 添加hdfs路径
// 笛卡尔积
df.join(df1).show()
+----+---+------+---+------+-----+--------------------+------+------+
|addr|age|deptid| id| name|score| _corrupt_record|deptid| dname|
+----+---+------+---+------+-----+--------------------+------+------+
|天津| 20| 10| 1| 赵伟|80.92| null| 10|财务部|
|天津| 20| 10| 1| 赵伟|80.92|{"deptid":20"dnam...| null| null|
|天津| 20| 10| 1| 赵伟|80.92|{"deptid":30"dnam...| null| null|
|天津| 20| 10| 1| 赵伟|80.92|{"deptid":40"dnam...| null| null|
|北京| 30| 20| 2| 钱枫| 90.0| null| 10|财务部|
|北京| 30| 20| 2| 钱枫| 90.0|{"deptid":20"dnam...| null| null|
|北京| 30| 20| 2| 钱枫| 90.0|{"deptid":30"dnam...| null| null|
|北京| 30| 20| 2| 钱枫| 90.0|{"deptid":40"dnam...| null| null|
|上海| 19| 30| 3| 孙斌| 88.0| null| 10|财务部|
|上海| 19| 30| 3| 孙斌| 88.0|{"deptid":20"dnam...| null| null|
|上海| 19| 30| 3| 孙斌| 88.0|{"deptid":30"dnam...| null| null|
|上海| 19| 30| 3| 孙斌| 88.0|{"deptid":40"dnam...| null| null|
|上海| 19| 40| 4|李浩楠| 87.0| null| 10|财务部|
|上海| 19| 40| 4|李浩楠| 87.0|{"deptid":20"dnam...| null| null|
|上海| 19| 40| 4|李浩楠| 87.0|{"deptid":30"dnam...| null| null|
|上海| 19| 40| 4|李浩楠| 87.0|{"deptid":40"dnam...| null| null|
|上海| 19| 40| 4|李浩楠| 87.0| null| 10|财务部|
|上海| 19| 40| 4|李浩楠| 87.0|{"deptid":20"dnam...| null| null|
|上海| 19| 40| 4|李浩楠| 87.0|{"deptid":30"dnam...| null| null|
|上海| 19| 40| 4|李浩楠| 87.0|{"deptid":40"dnam...| null| null|
+----+---+------+---+------+-----+--------------------+------+------+
only showing top 20 rows
常见问题:
A. Either: use the CROSS JOIN syntax to allow cartesian products between these
解决方案:设置spark.sql.crossJoin.enabled=true
在配置中添加参数。
// 定义应用名称
val conf = new SparkConf().setAppName("mySpark0")
conf.setMaster("spark://master:7077")
conf.setJars(Seq("/root/SparkTest.jar"))
conf.set("spark.sql.crossJoin.enabled", "true")
B. Spark-Shell中设置
spark.conf.set("spark.sql.crossJoin.enabled", "true")
2)using一个字段形式
join类似于table-a join table-b using column1的形式,需要两个DataFrame中有相同的一个列名
// 将json文件加载成一个DataFrame
val df = spark.read.format("json")
.load("hdfs://192.168.71.130:9000/people.json") // 添加hdfs路径
val df1 = spark.read.format("json")
.load("hdfs://192.168.71.130:9000/depart.json") // 添加hdfs路径
// 使用deptid进行关联,using一个字段形式
df.join(df1, "deptid").show()
+------+----+---+---+------+-----+------+
|deptid|addr|age| id| name|score| dname|
+------+----+---+---+------+-----+------+
| 10|天津| 20| 1| 赵伟|80.92|财务部|
| 20|北京| 30| 2| 钱枫| 90.0|研发部|
| 30|上海| 19| 3| 孙斌| 88.0|事业部|
| 40|上海| 19| 4|李浩楠| 87.0|人力部|
| 40|上海| 19| 4|李浩楠| 87.0|人力部|
| 20|null| 19| 5|刘文娟| 96.0|研发部|
+------+----+---+---+------+-----+------+
using多个字段形式:
除了上面这种using一个字段的情况外,还可以using多个字段
df.join(df1, Seq("deptid","name")).show()
3) 指定join类型
两个DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型。在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型。
// 内连接
df.join(df1, Seq("deptid"),"inner").show()
inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrmae时,可以不用关心哪个是左表,哪个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。
// 左外连接:
df.join(df1, Seq("deptid"),"left_outer").show()
left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。我们在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。
// 右外连接
df.join(df1, Seq("deptid"),"right_outer").show()
right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。所以说,右表是streamIter,左表是buildIter,我们在写sql语句或者使用DataFrmae时,一般让大表在右边,小表在左边。
// 全连接
df.join(df1, Seq("deptid"),"outer").show()
full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,但是又不能简单地先left outer join,再right outer join,最后union得到最终结果,因为这样最终结果中就存在两份inner join的结果了。因为既然完成left outer join又要完成right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter。
// 半连接
df.join(df1, Seq("deptid"),"leftsemi").show()
// 半连接
df.join(df1, Seq("deptid"),"leftanti").show()
说明:
left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回null。
left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录。
4)使用Column类型来join
如果不用using模式,灵活指定join字段的话,可以使用如下形式:
df.join(df1, df("deptid")=== df1( "deptid")).show()
+----+---+------+---+------+-----+------+------+
|addr|age|deptid| id| name|score|deptid| dname|
+----+---+------+---+------+-----+------+------+
|天津| 20| 10| 1| 赵伟|80.92| 10|财务部|
|北京| 30| 20| 2| 钱枫| 90.0| 20|研发部|
|上海| 19| 30| 3| 孙斌| 88.0| 30|事业部|
|上海| 19| 40| 4|李浩楠| 87.0| 40|人力部|
|上海| 19| 40| 4|李浩楠| 87.0| 40|人力部|
|null| 19| 20| 5|刘文娟| 96.0| 20|研发部|
+----+---+------+---+------+-----+------+------+
// 在指定join字段同时指定join类型
df.join(df1, df("deptid")=== df1( "deptid"),"inner").show()
12、获取指定字段统计信息
stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。
根据score字段,统计该字段值出现频率在30%以上的内容。在df中字段score的出现频率,大于0.3
df.stat.freqItems(Seq ("score") , 0.3).show()
+---------------+
|score_freqItems|
+---------------+
| [96.0, 87.0]|
+---------------+
freqItems用于计算一列或几列中出现频繁的的值的集合。
13、获取两个DataFrame中共有的记录(交集)
数据:
student1
{"id":2,"name":"钱枫","age":30,"addr":"北京","score":90,"deptid":20}
{"id":3,"name":"孙斌","age":19,"addr":"上海","score":88,"deptid":30}
{"id":4,"name":"李浩楠","age":19,"addr":"上海","score":87,"deptid":40}
{"id":5,"name":"刘文娟","age":19,"addr":null,"score":96,"deptid":20}
intersect方法可以计算出两个DataFrame中相同的记录,
df.intersect(df1.limit(1)).show(false)
14、获取一个DataFrame中有另一个DataFrame中没有的记录(差集)
数据同:13
df.except(df1.limit(1)).show(false)
15、操作字段名
(1)withColumnRenamed:重命名DataFrame中的指定字段名
如果指定的字段名不存在,不进行任何操作。下面示例中将df中的id字段重命名为idx。
scala> df.withColumnRenamed( "id" , "idx" ).show()
+----+---+------+---+------+-----+
|addr|age|deptid|idx| name|score|
+----+---+------+---+------+-----+
|天津| 20| 10| 1| 赵伟|80.92|
+----+---+------+---+------+-----+
(2)withColumn:往当前DataFrame中新增一列
whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。
以下代码往df中新增一个名为id2的列:
scala> df.withColumn("id2", df("id")).show( false)
+----+---+------+---+------+-----+---+
|addr|age|deptid|id |name |score|id2|
+----+---+------+---+------+-----+---+
|天津|20 |10 |1 |赵伟 |80.92|1 |
+----+---+------+---+------+-----+---+
16、行、列转换
数据:
val df = spark.createDataFrame(Seq(
("数学", "张三", 88), ("语文", "张三", 92),
("英语", "张三", 77), ("数学", "王五", 65),
("语文", "王五", 87), ("英语", "王五", 90),
("数学", "李雷", 67), ("语文", "李雷", 33),
("英语", "李雷", 24), ("数学", "宫九", 77),
("语文", "宫九", 87), ("英语", "宫九", 90))).toDF("科目", "姓名", "分数")
1)行转列:
用PIVOT透视函数即可实现行转列(重塑数据)
// 创建临时视图
df.createOrReplaceTempView("scores")
// 注意:分数的符号为`,而不是单引号
// 说明:按分数进行分组(内容) 显示表头为姓名
val sql ="select * from scores pivot (sum(`分数`) for `姓名` in ('张三','王五','李雷','宫九'))"
// 数据查询显示
spark.sql(sql).show()
// 执行结果
+----+----+----+----+----+
|科目|张三|王五|李雷|宫九|
+----+----+----+----+----+
|英语| 77| 90| 24| 90|
|语文| 92| 87| 33| 87|
|数学| 88| 65| 67| 77|
+----+----+----+----+----+
2)列转行:(逆透视Unpivot)
Spark没有提供内置函数来实现unpivot操作,不过我们可以使用Spark SQL提供的stack函数来间接实现需求。
stack()
stack(n, expr1, ..., exprk) - 会将expr1, ..., exprk 分割为n行.
// stack函数
// 创建临时视图
df.createOrReplaceTempView("scores")
// 注意:分数的符号为`,而不是单引号
// 说明:按分数进行分组(内容) 显示表头为姓名
val sql ="select * from scores pivot (sum(`分数`) for `姓名` in ('张三','王五','李雷','宫九'))"
// 数据查询显示
val pivot=spark.sql(sql)
// 创建视图
pivot.createOrReplaceTempView("v_pivot")
// 4 为分割行数 '张三'为姓名显示内容 `张三`为引用值即为分数, as 内容为显示表头
val stack_sql="select `科目`,stack(4, '张三', `张三`, '王五', `王五`, '李雷', `李雷`, '宫九', `宫九`) as (`姓名`, `分数` )" +
" from v_pivot "
// 执行SQL
val df_stack = spark.sql(stack_sql)
df_stack.show()
// 执行结果
+----+----+----+
|科目|姓名|分数|
+----+----+----+
|英语|张三| 77|
|英语|王五| 90|
|英语|李雷| 24|
|英语|宫九| 90|
|语文|张三| 92|
|语文|王五| 87|
|语文|李雷| 33|
|语文|宫九| 87|
|数学|张三| 88|
|数学|王五| 65|
|数学|李雷| 67|
|数学|宫九| 77|
+----+----+----+
17、数据拆分与合并
1) 数据合并
concat_ws() 函数
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(
("张三", 20, "篮球"), ("王五", 22, "游泳")))
.toDF("姓名", "年龄", "爱好")
// 数据合并,三列合一列
val ds = df.select(concat_ws(",", df("姓名"), df("年龄"), df("爱好"))
.cast(StringType).as("value"))
ds.show()
// 执行结果
+------------+
| value|
+------------+
|张三,20,篮球|
|王五,22,游泳|
+------------+
2) 数据拆分
explode函数可以把数组分割为多行
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(
("张三", 20, "篮球,羽毛球"), ("王五", 22, "游泳,慢跑,编程")))
.toDF("姓名", "年龄", "爱好")
// 数据分割
val dataset = df.withColumn("爱好",
explode(split(df.col("爱好"), ",")))
dataset.show()
// 执行结果
+----+----+------+
|姓名|年龄| 爱好|
+----+----+------+
|张三| 20| 篮球|
|张三| 20|羽毛球|
|王五| 22| 游泳|
|王五| 22| 慢跑|
|王五| 22| 编程|
+----+----+------+