DataFrame
DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame只知道每一列的类型是什么,每一行的类型是不知道的,不管每一行
创建SparkSession
val sparkSession =SparkSession.builder().master("local[4]").appName("test").getOrCreate()
创建样例类
case class Student(id:Int,name:String,age:Int,sex:String)
使用toDF
必须进行隐式转换
import sparkSession.implicits._
为了方便测试,单独把sparkSession
提出去,使用它 Junit
的方式进行测试运行。
@Test
def demo01: Unit ={
// 数据准备
val list=List(
Student(1,"张三",18,"男"),
Student(2,"绣花",16,"女"),
Student(3,"李四",18,"男"),
Student(4,"王五",18,"男"),
Student(5,"翠花",19,"女"),
Student(6,"张鹏",17,"男")
)
// 使用`toDF`必须进行隐式转换
import sparkSession.implicits._
val df: DataFrame = list.toDF()
// 执行,类似于 select * from table;
df.show()
}
API参考:https://blog.csdn.net/dabokele/article/details/52802150
DataFrame对象上Action操作
show
:展示数据
- show() 展示所有数据
val df: DataFrame = list.toDF()
df.show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 2|绣花| 16| 女|
| 3|李四| 18| 男|
| 4|王五| 18| 男|
| 5|翠花| 19| 女|
| 6|张鹏| 17| 男|
+---+----+---+---+
- show(numRows: Int) 展示指定条数数据
val df: DataFrame = list.toDF()
df.show(2)
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 2|绣花| 16| 女|
+---+----+---+---+
- show(truncate: Boolean):是否截断长字符串。如果为 true,超过 20 个字符的字符串将被截断,所有单元格将右对齐
- show(numRows: Int, truncate: Boolean):展示指定条数数据并指定是否截断长字符串。
- show(numRows: Int, truncate: Int):numRows展示条数,truncat<=0 左对齐,truncate>0 右对齐
- show(numRows: Int, truncate: Int, vertical: Boolean):vertical 如果设置为 true,则垂直打印输出行(每列值一行)。
collect
:获取所有数据到数组
不同于前面的show方法,这里的collect方法会将jdbcDF中的所有数据都获取到,并返回一个Array对象。
val df: DataFrame = list.toDF()
val rows: Array[Row] = df.collect()
rows.foreach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]
collectAsList
:获取所有数据到List
功能和collect类似,只不过将返回结构变成了List对象,使用方法如下
val df: DataFrame = list.toDF()
val rows: util.List[Row] = df.collectAsList()
rows.forEach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]
describe(cols: String*)
:获取指定字段的统计信息
这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count
, mean
, stddev
, min
, max
等。
val df: DataFrame = list.toDF()
val frame: DataFrame = df.describe("name", "age")
frame.show()
+-------+----+------------------+
|summary|name| age|
+-------+----+------------------+
| count| 6| 6|
| mean|null|17.666666666666668|
| stddev|null|1.0327955589886446|
| min|张三| 16|
| max|翠花| 19|
+-------+----+------------------+
first
, head
, take
, takeAsList
:获取若干行记录
这里列出的四个方法比较类似,其中
-
first
获取第一行记录
val df: DataFrame = list.toDF()
val row: Row = df.first()
println(row)
[1,张三,18,男]
-
head
获取第一行记录,head(n: Int)
获取前n行记录
val df: DataFrame = list.toDF()
val row: Row = df.head()
println(row)
[1,张三,18,男]
取前n
行记录
val df: DataFrame = list.toDF()
val rows: Array[Row] = df.head(3)
rows.foreach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
-
take(n: Int)
获取前n行数据
val df: DataFrame = list.toDF()
val rows: Array[Row] = df.take(3)
rows.foreach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
-
takeAsList(n: Int)
获取前n行数据,并以List的形式展现
val df: DataFrame = list.toDF()
val rows: util.List[Row] = df.takeAsList(3)
rows.forEach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。
take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError
DataFrame对象上的条件查询和join等操作
新增一些数据
val list=List(
Student(1,"张三",18,"男"),
Student(2,"绣花",16,"女"),
Student(3,"李四",18,"男"),
Student(4,"王五",18,"男"),
Student(5,"翠花",19,"女"),
Student(7,"张鹏",14,"男"),
Student(8,"刘秀",13,"男"),
Student(9,"王菲菲",20,"女"),
Student(10,"乐乐",21,"男"),
Student(11,"小惠",23,"女"),
Student(12,"梦雅",25,"女"),
)
where条件相关
where(conditionExpr: String):
SQL语言中where关键字后的条件
传入筛选条件表达式,可以用and
和or
。得到DataFrame类型的返回结果,
查询性别为男
的学生信息
import sparkSession.implicits._
val df: DataFrame = list.toDF()
df.where("sex='男'").show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 3|李四| 18| 男|
| 4|王五| 18| 男|
| 7|张鹏| 14| 男|
| 8|刘秀| 13| 男|
| 10|乐乐| 21| 男|
+---+----+---+---+
and
查询年龄小于18
岁,并且性别为女
的学生信息
val df: DataFrame = list.toDF()
df.where("age<18 and sex='女'").show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 2|绣花| 16| 女|
+---+----+---+---+
or
查询年龄>18
或者性别为女
的学生信息
val df: DataFrame = list.toDF()
df.where("age>18 or sex='女'").show()
+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 2| 绣花| 16| 女|
| 5| 翠花| 19| 女|
| 9|王菲菲| 20| 女|
| 10| 乐乐| 21| 男|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+
filter
:根据字段进行筛选
传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同
查询性别不为男
的学生信息
val df: DataFrame = list.toDF()
df.filter("sex!='男'").show()
+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 2| 绣花| 16| 女|
| 5| 翠花| 19| 女|
| 9|王菲菲| 20| 女|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+
filter
中也可以使用or
和and
。
val df: DataFrame = list.toDF()
df.filter("sex!='男' and age >20").show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 11|小惠| 23| 女|
| 12|梦雅| 25| 女|
+---+----+---+---+
查询指定字段
select
:获取指定字段值
- select(cols: Column*) :根据传入的String类型字段名,获取指定字段的值,以DataFrame类型返回
val df: DataFrame = list.toDF()
df.select("name","age").show()
+------+---+
| name|age|
+------+---+
| 张三| 18|
| 绣花| 16|
| 李四| 18|
| 王五| 18|
| 翠花| 19|
| 张鹏| 14|
| 刘秀| 13|
|王菲菲| 20|
| 乐乐| 21|
| 小惠| 23|
| 梦雅| 25|
+------+---+
还有一个重载的select
方法,不是传入String
类型参数,而是传入Column
类型参数。可以实现select id, id+1 from test这种逻辑。
val df: DataFrame = list.toDF()
df.select(df("id"),df("id")+1).show()
+---+--------+
| id|(id + 1)|
+---+--------+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 7| 8|
| 8| 9|
| 9| 10|
| 10| 11|
| 11| 12|
| 12| 13|
+---+--------+
selectExpr
:可以对指定字段进行特殊处理
可以直接对指定字段调用UDF
函数,或者指定别名
等。传入String
类型参数,得到DataFrame
对象。
获取年龄最大的学生信息
val df: DataFrame = list.toDF()
df.selectExpr(
"""
|max(age) max_age
|""".stripMargin).show()
+-------+
|max_age|
+-------+
| 25|
+-------+
col
:获取指定字段
只能获取一个字段,返回对象为Column类型。
val df: DataFrame = list.toDF()
val ageColumn: Column = df.col("age")
val nameColumn: Column = df.col("name")
println(ageColumn)
println(nameColumn)
age
name
apply
:获取指定字段
只能获取一个字段,返回对象为Column类型
val df: DataFrame = list.toDF()
val ageColumn: Column = df.apply("age")
val nameColumn: Column = df.apply("name")
println(ageColumn)
println(nameColumn)
age
name
drop
:去除指定字段,保留其他字段
返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。
val df: DataFrame = list.toDF()
val newDf: DataFrame = df.drop("sex")
newDf.show()
+---+------+---+
| id| name|age|
+---+------+---+
| 1| 张三| 18|
| 2| 绣花| 16|
| 3| 李四| 18|
| 4| 王五| 18|
| 5| 翠花| 19|
| 7| 张鹏| 14|
| 8| 刘秀| 13|
| 9|王菲菲| 20|
| 10| 乐乐| 21|
| 11| 小惠| 23|
| 12| 梦雅| 25|
+---+------+---+
也可以去除多个字段
val df: DataFrame = list.toDF()
val newDf: DataFrame = df.drop("sex","id")
newDf.show()
+------+---+
| name|age|
+------+---+
| 张三| 18|
| 绣花| 16|
| 李四| 18|
| 王五| 18|
| 翠花| 19|
| 张鹏| 14|
| 刘秀| 13|
|王菲菲| 20|
| 乐乐| 21|
| 小惠| 23|
| 梦雅| 25|
+------+---+
limit
limit
方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作
。
val df: DataFrame = list.toDF()
val newDf: DataFrame = df.limit(3)
newDf.show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 1|张三| 18| 男|
| 2|绣花| 16| 女|
| 3|李四| 18| 男|
+---+----+---+---+
order by
orderBy
和sort
:按指定字段排序,默认为升序
orderBy
按照年龄排序(asc
)
val df: DataFrame = list.toDF()
df.orderBy("age").show()
+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 8| 刘秀| 13| 男|
| 7| 张鹏| 14| 男|
| 2| 绣花| 16| 女|
| 4| 王五| 18| 男|
| 3| 李四| 18| 男|
| 1| 张三| 18| 男|
| 5| 翠花| 19| 女|
| 9|王菲菲| 20| 女|
| 10| 乐乐| 21| 男|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+
orderBy
按照年龄排序(desc
)
val df: DataFrame = list.toDF()
df.orderBy(df("age").desc).show()
+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 12| 梦雅| 25| 女|
| 11| 小惠| 23| 女|
| 10| 乐乐| 21| 男|
| 9|王菲菲| 20| 女|
| 5| 翠花| 19| 女|
| 4| 王五| 18| 男|
| 1| 张三| 18| 男|
| 3| 李四| 18| 男|
| 2| 绣花| 16| 女|
| 7| 张鹏| 14| 男|
| 8| 刘秀| 13| 男|
+---+------+---+---+
sort
按照年龄排序(asc
)
val df: DataFrame = list.toDF()
df.sort(df("age").asc).show()
sort
按照年龄排序(desc
)
val df: DataFrame = list.toDF()
df.sort(df("age").desc).show()
sortWithinPartitions
和上面的sort方法功能类似,区别在于sortWithinPartitions
方法返回的是按Partition
排好序的DataFrame
对象。
sortWithinPartitions
按照年龄排序(asc
)
val df: DataFrame = list.toDF()
df.sortWithinPartitions(df("age").asc).show()
+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 2| 绣花| 16| 女|
| 1| 张三| 18| 男|
| 3| 李四| 18| 男|
| 4| 王五| 18| 男|
| 5| 翠花| 19| 女|
------------------------------ 分为两个区,
| 8| 刘秀| 13| 男|
| 7| 张鹏| 14| 男|
| 9|王菲菲| 20| 女|
| 10| 乐乐| 21| 男|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
+---+------+---+---+
group by
groupBy
:根据字段进行group by操作
groupBy
方法有两种调用方式,可以传入String类型
的字段名,也可传入Column类型
的对象。
案例:按照性别分组,统计各个性别的总人数
cube
和rollup
:group by
的扩展
功能类似于SQL中的group by
cube
/rollup
。
GroupedData对象
该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,
-
max(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").max("age")
newDF.show()
+---+--------+
|sex|max(age)|
+---+--------+
| 男| 21|
| 女| 25|
+---+--------+
-
min(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").min("age")
newDF.show()
+---+--------+
|sex|min(age)|
+---+--------+
| 男| 13|
| 女| 16|
+---+--------+
-
mean(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").mean("age")
newDF.show()
+---+--------+
|sex|avg(age)|
+---+--------+
| 男| 17.0|
| 女| 20.6|
+---+--------+
-
sum(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").sum("age")
newDF.show()
+---+--------+
|sex|sum(age)|
+---+--------+
| 男| 102|
| 女| 103|
+---+--------+
-
count()
方法,获取分组中的元素个数
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.groupBy("sex").count()
newDF.show()
+---+-----+
|sex|count|
+---+-----+
| 男| 6|
| 女| 5|
+---+-----+
distinct
distinct
:返回一个不包含重复记录的DataFrame
返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.distinct()
newDF.show()
因为没有列是重复的数据所以就不展示了
。
dropDuplicates
:根据指定字段去重
根据指定字段去重。类似于select distinct a, b操作
按照年龄剔重
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.dropDuplicates("age")
newDF.show()
+---+------+---+---+
| id| name|age|sex|
+---+------+---+---+
| 8| 刘秀| 13| 男|
| 2| 绣花| 16| 女|
| 9|王菲菲| 20| 女|
| 5| 翠花| 19| 女|
| 11| 小惠| 23| 女|
| 12| 梦雅| 25| 女|
| 10| 乐乐| 21| 男|
| 7| 张鹏| 14| 男|
| 1| 张三| 18| 男|
+---+------+---+---+
聚合
聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。
val df: DataFrame = list.toDF()
val newDF: DataFrame = df.agg(
"age" -> "max",
"age" -> "avg",
"age" -> "min",
"age" -> "sum",
"age" -> "count"
)
newDF.show()
+--------+------------------+--------+--------+----------+
|max(age)| avg(age)|min(age)|sum(age)|count(age)|
+--------+------------------+--------+--------+----------+
| 25|18.636363636363637| 13| 205| 11|
+--------+------------------+--------+--------+----------+
union
重新整理一下数据
val female=List(
Student(2,"绣花",16,"女",1),
Student(5,"翠花",19,"女",2),
Student(9,"王菲菲",20,"女",1),
Student(11,"小惠",23,"女",1),
Student(12,"梦雅",25,"女",3)
)
val boys=List(
Student(1,"张三",18,"男",3),
Student(3,"李四",18,"男",2),
Student(4,"王五",18,"男",2),
Student(7,"张鹏",14,"男",1),
Student(8,"刘秀",13,"男",2),
Student(10,"乐乐",21,"男",1)
)
新增一个classid
case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)
示例
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.union(boysDF)
value.foreach(println(_))
[2,绣花,16,女,1]
[5,翠花,19,女,2]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[8,刘秀,13,男,2]
[7,张鹏,14,男,1]
[10,乐乐,21,男,1]
unionAll
方法:对两个DataFrame进行组合
类似于SQL中的UNION ALL操作。
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.unionAll(boysDF)
value.foreach(println(_))
[5,翠花,19,女,2]
[2,绣花,16,女,1]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[7,张鹏,14,男,1]
[8,刘秀,13,男,2]
[10,乐乐,21,男,1]
join
重点来了。在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。
接下来隆重介绍join方法。在DataFrame中提供了六个重载的join方法。
笛卡尔积
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF)
value.foreach(println(_))
[2,绣花,16,女,1,3,李四,18,男,2]
[2,绣花,16,女,1,7,张鹏,14,男,1]
[2,绣花,16,女,1,8,刘秀,13,男,2]
[2,绣花,16,女,1,1,张三,18,男,3]
[5,翠花,19,女,2,3,李四,18,男,2]
[5,翠花,19,女,2,7,张鹏,14,男,1]
[5,翠花,19,女,2,8,刘秀,13,男,2]
[5,翠花,19,女,2,1,张三,18,男,3]
[9,王菲菲,20,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,7,张鹏,14,男,1]
[9,王菲菲,20,女,1,8,刘秀,13,男,2]
[11,小惠,23,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,1,张三,18,男,3]
[11,小惠,23,女,1,7,张鹏,14,男,1]
[11,小惠,23,女,1,8,刘秀,13,男,2]
[12,梦雅,25,女,3,3,李四,18,男,2]
[12,梦雅,25,女,3,7,张鹏,14,男,1]
[11,小惠,23,女,1,1,张三,18,男,3]
[12,梦雅,25,女,3,8,刘秀,13,男,2]
[2,绣花,16,女,1,4,王五,18,男,2]
[12,梦雅,25,女,3,1,张三,18,男,3]
[2,绣花,16,女,1,10,乐乐,21,男,1]
[5,翠花,19,女,2,4,王五,18,男,2]
[5,翠花,19,女,2,10,乐乐,21,男,1]
[9,王菲菲,20,女,1,4,王五,18,男,2]
[9,王菲菲,20,女,1,10,乐乐,21,男,1]
[11,小惠,23,女,1,4,王五,18,男,2]
[11,小惠,23,女,1,10,乐乐,21,男,1]
[12,梦雅,25,女,3,4,王五,18,男,2]
[12,梦雅,25,女,3,10,乐乐,21,男,1]
using
一个字段形式
下面这种join类似于a join b using column1的形式,需要两个DataFrame中有相同的一个列名,
import sparkSession.implicits._
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,"classId")
value.foreach(println(_))
[2,5,翠花,19,女,8,刘秀,13,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]
using
多个字段形式
除了上面这种using一个字段的情况外,还可以using多个字段,如下
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId","id"))
value.foreach(println(_))
指定join
类型
两个DataFrame的join操作有inner
, outer
, left_outer
, right_outer
, leftsemi
类型。在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型,如下所示
left_outer
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"left_outer")
value.foreach(println(_))
[1,11,小惠,23,女,10,乐乐,21,男]
[2,5,翠花,19,女,8,刘秀,13,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[3,12,梦雅,25,女,1,张三,18,男]
right_outer
val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"right_outer")
value.foreach(println(_))
[2,5,翠花,19,女,8,刘秀,13,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[1,2,绣花,16,女,7,张鹏,14,男]
其他的就演示了
以上案例整理参考:https://blog.csdn.net/dabokele/article/details/52802150
更多API请参考Spark官网
上面使用的是样例类,会自动将字段名称
及字段类型
与表中的字段进行对应
case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)
@Test
def demo02: Unit ={
val female=List(
Student(2,"绣花",16,"女",1),
Student(5,"翠花",19,"女",2),
Student(9,"王菲菲",20,"女",1),
Student(11,"小惠",23,"女",1),
Student(12,"梦雅",25,"女",3)
)
val boys=List(
Student(1,"张三",18,"男",3),
Student(3,"李四",18,"男",2),
Student(4,"王五",18,"男",2),
Student(7,"张鹏",14,"男",1),
Student(8,"刘秀",13,"男",2),
Student(10,"乐乐",21,"男",1)
)
import sparkSession.implicits._
//val femaleDF: DataFrame = female.toDF()
val boysDF: DataFrame = boys.toDF()
boysDF.show()
}
+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+-------+
使用printSchema
查看字段类型
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- sex: string (nullable = true)
|-- classId: integer (nullable = false)
处理使用样例类,也可以使用元组的形式
@Test
def demo03(): Unit ={
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
import sparkSession.implicits._
val boysDF: DataFrame = list.toDF()
boysDF.show()
}
表字段将使用元组索引
命名
+---+----+---+---+---+
| _1| _2| _3| _4| _5|
+---+----+---+---+---+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+---+
使用printSchema
查看字段类型
root
|-- _1: integer (nullable = false)
|-- _2: string (nullable = true)
|-- _3: integer (nullable = false)
|-- _4: string (nullable = true)
|-- _5: integer (nullable = false)
toDF(colNames: String*)
重新队列进行命名
字段名为_N
的形式,不是很友好,可以自行指定
@Test
def demo03(): Unit ={
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
import sparkSession.implicits._
val boysDF: DataFrame = list.toDF("id","name","age","sex","classId")
boysDF.show()
}
+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+-------+
总结一下:
- 若数据为元组时,字段名为元组的
索引名
- 列表中的字段类型必须一致。
如:第一列为id列,第二行的类型却为字符类型
(1,"张三",18,"男",3),
("3","李四",18,"男",2),
- 列表中的参数个数必须一致。
如: 第一列5个参数,第二行3个参数,这样是不行的。
(1,"张三",18,"男",3),
(3,"李四",18),
- 可以使用
toDF(colNames: String*)
重载方法,设置命名,必须元参数个数保持一致。
RDD 转 DataFrame
除了使用集合.toDF
,也可以使用rdd.toDF
将 RDD转为DataFrame
@Test
def demo04(): Unit ={
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
// 获取 SparkContext
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
// 使用 toDF 必须定义隐式转换
import sparkSession.implicits._
// RDD 转换成 DataFrame
val df: DataFrame = rdd.toDF
df.show()
}
使用toDF必须定义隐式转换
DataFrame的创建方式[了解]
上面的所有案例都是采用 toDF
的方式创建,关于DataFrame
的创建方式一共有四种创建方式。
- 可以通过toDF方法创建
使用toDF必须进行隐式转换
import sparkSession.implicits._
- 通过createDataFrame创建
createDataFrame[A <: Product : TypeTag](rdd: RDD[A])
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
val df: DataFrame = sparkSession.createDataFrame(rdd)
df.show()
+---+----+---+---+---+
| _1| _2| _3| _4| _5|
+---+----+---+---+---+
| 1|张三| 18| 男| 3|
| 3|李四| 18| 男| 2|
| 4|王五| 18| 男| 2|
| 7|张鹏| 14| 男| 1|
| 8|刘秀| 13| 男| 2|
| 10|乐乐| 21| 男| 1|
+---+----+---+---+---+
createDataFrame(rowRDD: RDD[Row], schema: StructType)
@Test
def demo06(): Unit ={
val list=List(
Row(1,"张三",18,"男",3),
Row(3,"李四",18,"男",2),
Row(4,"王五",18,"男",2),
Row(7,"张鹏",14,"男",1),
Row(8,"刘秀",13,"男",2),
Row(10,"乐乐",21,"男",1)
)
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[Row] = sc.parallelize(list)
// 指定StructType
val fields=Array(
StructField("id",IntegerType),
StructField("name",StringType),
StructField("age",IntegerType),
StructField("sex",StringType),
StructField("classId",IntegerType)
)
val schema =StructType(fields)
val df = sparkSession.createDataFrame(rdd, schema)
df.show()
}
相关依赖
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row}
剩下的暂时用不到...
createDataFrame[A <: Product : TypeTag](data: Seq[A])
createDataFrame(rowRDD: JavaRDD[Row], schema: StructType)
createDataFrame(rows: java.util.List[Row], schema: StructType)
createDataFrame(rdd: RDD[_], beanClass: Class[_])
createDataFrame(rdd: JavaRDD[_], beanClass: Class[_])
createDataFrame(data: java.util.List[_], beanClass: Class[_])
示例一:
val list=List(
(1,"张三",18,"男",3),
(3,"李四",18,"男",2),
(4,"王五",18,"男",2),
(7,"张鹏",14,"男",1),
(8,"刘秀",13,"男",2),
(10,"乐乐",21,"男",1)
)
val sc: SparkContext = sparkSession.sparkContext
val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
val df: DataFrame = sparkSession.createDataFrame(rdd)
df.show()
- 通过读取文件创建
- 通过其他的dataFrame衍生
上面的很多案例也有演示,就是通过上次结果的DataFrame
返回一个新的DataFrame
@Test
def demo08(): Unit ={
val female=List(
Student(2,"绣花",16,"女",1),
Student(5,"翠花",19,"女",2),
Student(9,"王菲菲",20,"女",1),
Student(11,"小惠",23,"女",1),
Student(12,"梦雅",25,"女",3)
)
val boys=List(
Student(1,"张三",18,"男",3),
Student(3,"李四",18,"男",2),
Student(4,"王五",18,"男",2),
Student(7,"张鹏",14,"男",1),
Student(8,"刘秀",13,"男",2),
Student(10,"乐乐",21,"男",1)
)
// 导入隐式转换
import sparkSession.implicits._
val femaleDf: DataFrame = female.toDF()
val boysDf: DataFrame = boys.toDF()
val unionAllDf: DataFrame = femaleDf.unionAll(boysDf)
val group: RelationalGroupedDataset = unionAllDf.groupBy("sex")
val resultDf: DataFrame = group.max("age")
resultDf.show()
}
+---+--------+
|sex|max(age)|
+---+--------+
| 男| 21|
| 女| 25|
+---+--------+