SparkSql之DataFrame

DataFrame

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame只知道每一列的类型是什么,每一行的类型是不知道的,不管每一行

创建SparkSession

  val sparkSession =SparkSession.builder().master("local[4]").appName("test").getOrCreate()

创建样例类

case class Student(id:Int,name:String,age:Int,sex:String)

使用toDF必须进行隐式转换

import sparkSession.implicits._

为了方便测试,单独把sparkSession 提出去,使用它 Junit的方式进行测试运行。

  @Test
  def demo01: Unit ={
    // 数据准备
    val list=List(
      Student(1,"张三",18,"男"),
      Student(2,"绣花",16,"女"),
      Student(3,"李四",18,"男"),
      Student(4,"王五",18,"男"),
      Student(5,"翠花",19,"女"),
      Student(6,"张鹏",17,"男")
    )
    // 使用`toDF`必须进行隐式转换
    import sparkSession.implicits._

    val df: DataFrame = list.toDF()
    //  执行,类似于 select * from  table;
    df.show()

  }

API参考:https://blog.csdn.net/dabokele/article/details/52802150

DataFrame对象上Action操作

show:展示数据

  1. show() 展示所有数据
val df: DataFrame = list.toDF()
df.show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  2|绣花| 16| 女|
|  3|李四| 18| 男|
|  4|王五| 18| 男|
|  5|翠花| 19| 女|
|  6|张鹏| 17| 男|
+---+----+---+---+
  1. show(numRows: Int) 展示指定条数数据
val df: DataFrame = list.toDF()
df.show(2)
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  2|绣花| 16| 女|
+---+----+---+---+
  1. show(truncate: Boolean):是否截断长字符串。如果为 true,超过 20 个字符的字符串将被截断,所有单元格将右对齐
  2. show(numRows: Int, truncate: Boolean):展示指定条数数据并指定是否截断长字符串。
  3. show(numRows: Int, truncate: Int):numRows展示条数,truncat<=0 左对齐,truncate>0 右对齐
  4. show(numRows: Int, truncate: Int, vertical: Boolean):vertical 如果设置为 true,则垂直打印输出行(每列值一行)。

collect:获取所有数据到数组
不同于前面的show方法,这里的collect方法会将jdbcDF中的所有数据都获取到,并返回一个Array对象。

    val df: DataFrame = list.toDF()
    val rows: Array[Row] = df.collect()
    rows.foreach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]

collectAsList:获取所有数据到List
功能和collect类似,只不过将返回结构变成了List对象,使用方法如下

    val df: DataFrame = list.toDF()
    val rows: util.List[Row] = df.collectAsList()
    rows.forEach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]

describe(cols: String*):获取指定字段的统计信息
这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。

    val df: DataFrame = list.toDF()
    val frame: DataFrame = df.describe("name", "age")
    frame.show()
+-------+----+------------------+
|summary|name|               age|
+-------+----+------------------+
|  count|   6|                 6|
|   mean|null|17.666666666666668|
| stddev|null|1.0327955589886446|
|    min|张三|                16|
|    max|翠花|                19|
+-------+----+------------------+

first, head, take, takeAsList:获取若干行记录
这里列出的四个方法比较类似,其中

  1. first获取第一行记录
    val df: DataFrame = list.toDF()
    val row: Row = df.first()
    println(row)
[1,张三,18,男]
  1. head获取第一行记录,head(n: Int)获取前n行记录
    val df: DataFrame = list.toDF()
    val row: Row = df.head()
    println(row)
[1,张三,18,男]

取前n行记录

    val df: DataFrame = list.toDF()
    val rows: Array[Row] = df.head(3)
    rows.foreach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
  1. take(n: Int)获取前n行数据
    val df: DataFrame = list.toDF()
    val rows: Array[Row] = df.take(3)
    rows.foreach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
  1. takeAsList(n: Int)获取前n行数据,并以List的形式展现
    val df: DataFrame = list.toDF()
    val rows: util.List[Row] = df.takeAsList(3)
    rows.forEach(println(_))
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]

以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。

take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError

DataFrame对象上的条件查询和join等操作

新增一些数据

val list=List(
      Student(1,"张三",18,"男"),
      Student(2,"绣花",16,"女"),
      Student(3,"李四",18,"男"),
      Student(4,"王五",18,"男"),
      Student(5,"翠花",19,"女"),
      Student(7,"张鹏",14,"男"),
      Student(8,"刘秀",13,"男"),
      Student(9,"王菲菲",20,"女"),
      Student(10,"乐乐",21,"男"),
      Student(11,"小惠",23,"女"),
      Student(12,"梦雅",25,"女"),
)

where条件相关

where(conditionExpr: String):SQL语言中where关键字后的条件
传入筛选条件表达式,可以用andor。得到DataFrame类型的返回结果,

查询性别为的学生信息

    import sparkSession.implicits._

    val df: DataFrame = list.toDF()
    df.where("sex='男'").show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  3|李四| 18| 男|
|  4|王五| 18| 男|
|  7|张鹏| 14| 男|
|  8|刘秀| 13| 男|
| 10|乐乐| 21| 男|
+---+----+---+---+

and
查询年龄小于18岁,并且性别为的学生信息

    val df: DataFrame = list.toDF()
    df.where("age<18 and sex='女'").show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  2|绣花| 16| 女|
+---+----+---+---+

or
查询年龄>18 或者性别为的学生信息

    val df: DataFrame = list.toDF()
    df.where("age>18 or sex='女'").show()
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  2|  绣花| 16| 女|
|  5|  翠花| 19| 女|
|  9|王菲菲| 20| 女|
| 10|  乐乐| 21| 男|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

filter:根据字段进行筛选
传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同

查询性别不为的学生信息

    val df: DataFrame = list.toDF()
    df.filter("sex!='男'").show()
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  2|  绣花| 16| 女|
|  5|  翠花| 19| 女|
|  9|王菲菲| 20| 女|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

filter中也可以使用orand

    val df: DataFrame = list.toDF()
    df.filter("sex!='男' and age >20").show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 11|小惠| 23| 女|
| 12|梦雅| 25| 女|
+---+----+---+---+

查询指定字段

select:获取指定字段值

  1. select(cols: Column*) :根据传入的String类型字段名,获取指定字段的值,以DataFrame类型返回
    val df: DataFrame = list.toDF()
    df.select("name","age").show()
+------+---+
|  name|age|
+------+---+
|  张三| 18|
|  绣花| 16|
|  李四| 18|
|  王五| 18|
|  翠花| 19|
|  张鹏| 14|
|  刘秀| 13|
|王菲菲| 20|
|  乐乐| 21|
|  小惠| 23|
|  梦雅| 25|
+------+---+

还有一个重载的select方法,不是传入String类型参数,而是传入Column类型参数。可以实现select id, id+1 from test这种逻辑。

    val df: DataFrame = list.toDF()
    df.select(df("id"),df("id")+1).show()
+---+--------+
| id|(id + 1)|
+---+--------+
|  1|       2|
|  2|       3|
|  3|       4|
|  4|       5|
|  5|       6|
|  7|       8|
|  8|       9|
|  9|      10|
| 10|      11|
| 11|      12|
| 12|      13|
+---+--------+

selectExpr:可以对指定字段进行特殊处理
可以直接对指定字段调用UDF函数,或者指定别名等。传入String类型参数,得到DataFrame对象。

获取年龄最大的学生信息

    val df: DataFrame = list.toDF()
    df.selectExpr(
      """
        |max(age) max_age
        |""".stripMargin).show()
+-------+
|max_age|
+-------+
|     25|
+-------+

col:获取指定字段
  只能获取一个字段,返回对象为Column类型。

    val df: DataFrame = list.toDF()
    val ageColumn: Column = df.col("age")
    val nameColumn: Column = df.col("name")

    println(ageColumn)
    println(nameColumn)
age
name

apply:获取指定字段
  只能获取一个字段,返回对象为Column类型

    val df: DataFrame = list.toDF()
    val ageColumn: Column = df.apply("age")
    val nameColumn: Column = df.apply("name")

    println(ageColumn)
    println(nameColumn)
age
name

drop:去除指定字段,保留其他字段
  返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。

    val df: DataFrame = list.toDF()
    val newDf: DataFrame = df.drop("sex")
    newDf.show()
+---+------+---+
| id|  name|age|
+---+------+---+
|  1|  张三| 18|
|  2|  绣花| 16|
|  3|  李四| 18|
|  4|  王五| 18|
|  5|  翠花| 19|
|  7|  张鹏| 14|
|  8|  刘秀| 13|
|  9|王菲菲| 20|
| 10|  乐乐| 21|
| 11|  小惠| 23|
| 12|  梦雅| 25|
+---+------+---+

也可以去除多个字段

    val df: DataFrame = list.toDF()
    val newDf: DataFrame = df.drop("sex","id")
    newDf.show()
+------+---+
|  name|age|
+------+---+
|  张三| 18|
|  绣花| 16|
|  李四| 18|
|  王五| 18|
|  翠花| 19|
|  张鹏| 14|
|  刘秀| 13|
|王菲菲| 20|
|  乐乐| 21|
|  小惠| 23|
|  梦雅| 25|
+------+---+

limit

limit
方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作

    val df: DataFrame = list.toDF()
    val newDf: DataFrame = df.limit(3)
    newDf.show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  2|绣花| 16| 女|
|  3|李四| 18| 男|
+---+----+---+---+

order by

orderBysort:按指定字段排序,默认为升序

orderBy按照年龄排序(asc)

    val df: DataFrame = list.toDF()
    df.orderBy("age").show()
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  8|  刘秀| 13| 男|
|  7|  张鹏| 14| 男|
|  2|  绣花| 16| 女|
|  4|  王五| 18| 男|
|  3|  李四| 18| 男|
|  1|  张三| 18| 男|
|  5|  翠花| 19| 女|
|  9|王菲菲| 20| 女|
| 10|  乐乐| 21| 男|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

orderBy按照年龄排序(desc)

    val df: DataFrame = list.toDF()
    df.orderBy(df("age").desc).show()
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
| 12|  梦雅| 25| 女|
| 11|  小惠| 23| 女|
| 10|  乐乐| 21| 男|
|  9|王菲菲| 20| 女|
|  5|  翠花| 19| 女|
|  4|  王五| 18| 男|
|  1|  张三| 18| 男|
|  3|  李四| 18| 男|
|  2|  绣花| 16| 女|
|  7|  张鹏| 14| 男|
|  8|  刘秀| 13| 男|
+---+------+---+---+

sort按照年龄排序(asc)

    val df: DataFrame = list.toDF()
    df.sort(df("age").asc).show()

sort按照年龄排序(desc)

    val df: DataFrame = list.toDF()
    df.sort(df("age").desc).show()

sortWithinPartitions
  和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。

sortWithinPartitions按照年龄排序(asc)

    val df: DataFrame = list.toDF()
    df.sortWithinPartitions(df("age").asc).show()
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  2|  绣花| 16| 女|
|  1|  张三| 18| 男|
|  3|  李四| 18| 男|
|  4|  王五| 18| 男|
|  5|  翠花| 19| 女|
------------------------------ 分为两个区,
|  8|  刘秀| 13| 男|
|  7|  张鹏| 14| 男|
|  9|王菲菲| 20| 女|
| 10|  乐乐| 21| 男|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

group by

groupBy:根据字段进行group by操作
  groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。

案例:按照性别分组,统计各个性别的总人数

cuberollupgroup by的扩展
  功能类似于SQL中的group by cube/rollup

GroupedData对象
  该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,

  • max(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").max("age")
    newDF.show()
+---+--------+
|sex|max(age)|
+---+--------+
| 男|      21|
| 女|      25|
+---+--------+
  • min(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").min("age")
    newDF.show()
+---+--------+
|sex|min(age)|
+---+--------+
| 男|      13|
| 女|      16|
+---+--------+
  • mean(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").mean("age")
    newDF.show()
+---+--------+
|sex|avg(age)|
+---+--------+
| 男|    17.0|
| 女|    20.6|
+---+--------+
  • sum(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").sum("age")
    newDF.show()
+---+--------+
|sex|sum(age)|
+---+--------+
| 男|     102|
| 女|     103|
+---+--------+
  • count()方法,获取分组中的元素个数
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").count()
    newDF.show()
+---+-----+
|sex|count|
+---+-----+
| 男|    6|
| 女|    5|
+---+-----+

distinct

distinct:返回一个不包含重复记录的DataFrame
  返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。

    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.distinct()
    newDF.show()

因为没有列是重复的数据所以就不展示了

dropDuplicates:根据指定字段去重
  根据指定字段去重。类似于select distinct a, b操作

按照年龄剔重

    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.dropDuplicates("age")
    newDF.show()
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  8|  刘秀| 13| 男|
|  2|  绣花| 16| 女|
|  9|王菲菲| 20| 女|
|  5|  翠花| 19| 女|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
| 10|  乐乐| 21| 男|
|  7|  张鹏| 14| 男|
|  1|  张三| 18| 男|
+---+------+---+---+

聚合

聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。

    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.agg(
      "age" -> "max",
      "age" -> "avg",
      "age" -> "min",
      "age" -> "sum",
      "age" -> "count"
    )
   newDF.show()
+--------+------------------+--------+--------+----------+
|max(age)|          avg(age)|min(age)|sum(age)|count(age)|
+--------+------------------+--------+--------+----------+
|      25|18.636363636363637|      13|     205|        11|
+--------+------------------+--------+--------+----------+

union

重新整理一下数据

    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )

新增一个classid

case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)

示例

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.union(boysDF)

    value.foreach(println(_))
[2,绣花,16,女,1]
[5,翠花,19,女,2]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[8,刘秀,13,男,2]
[7,张鹏,14,男,1]
[10,乐乐,21,男,1]

unionAll方法:对两个DataFrame进行组合
  类似于SQL中的UNION ALL操作。

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.unionAll(boysDF)

    value.foreach(println(_))
[5,翠花,19,女,2]
[2,绣花,16,女,1]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[7,张鹏,14,男,1]
[8,刘秀,13,男,2]
[10,乐乐,21,男,1]

join

重点来了。在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。
  接下来隆重介绍join方法。在DataFrame中提供了六个重载的join方法。
笛卡尔积

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF)

    value.foreach(println(_))
[2,绣花,16,女,1,3,李四,18,男,2]
[2,绣花,16,女,1,7,张鹏,14,男,1]
[2,绣花,16,女,1,8,刘秀,13,男,2]
[2,绣花,16,女,1,1,张三,18,男,3]
[5,翠花,19,女,2,3,李四,18,男,2]
[5,翠花,19,女,2,7,张鹏,14,男,1]
[5,翠花,19,女,2,8,刘秀,13,男,2]
[5,翠花,19,女,2,1,张三,18,男,3]
[9,王菲菲,20,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,7,张鹏,14,男,1]
[9,王菲菲,20,女,1,8,刘秀,13,男,2]
[11,小惠,23,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,1,张三,18,男,3]
[11,小惠,23,女,1,7,张鹏,14,男,1]
[11,小惠,23,女,1,8,刘秀,13,男,2]
[12,梦雅,25,女,3,3,李四,18,男,2]
[12,梦雅,25,女,3,7,张鹏,14,男,1]
[11,小惠,23,女,1,1,张三,18,男,3]
[12,梦雅,25,女,3,8,刘秀,13,男,2]
[2,绣花,16,女,1,4,王五,18,男,2]
[12,梦雅,25,女,3,1,张三,18,男,3]
[2,绣花,16,女,1,10,乐乐,21,男,1]
[5,翠花,19,女,2,4,王五,18,男,2]
[5,翠花,19,女,2,10,乐乐,21,男,1]
[9,王菲菲,20,女,1,4,王五,18,男,2]
[9,王菲菲,20,女,1,10,乐乐,21,男,1]
[11,小惠,23,女,1,4,王五,18,男,2]
[11,小惠,23,女,1,10,乐乐,21,男,1]
[12,梦雅,25,女,3,4,王五,18,男,2]
[12,梦雅,25,女,3,10,乐乐,21,男,1]

using一个字段形式
  下面这种join类似于a join b using column1的形式,需要两个DataFrame中有相同的一个列名,

    import sparkSession.implicits._

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,"classId")

    value.foreach(println(_))
[2,5,翠花,19,女,8,刘秀,13,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]

using多个字段形式
  除了上面这种using一个字段的情况外,还可以using多个字段,如下

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId","id"))

    value.foreach(println(_))

指定join类型
  两个DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型。在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型,如下所示
left_outer

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"left_outer")

    value.foreach(println(_))
[1,11,小惠,23,女,10,乐乐,21,男]
[2,5,翠花,19,女,8,刘秀,13,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[3,12,梦雅,25,女,1,张三,18,男]

right_outer

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"right_outer")

    value.foreach(println(_))
[2,5,翠花,19,女,8,刘秀,13,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[1,2,绣花,16,女,7,张鹏,14,男]

其他的就演示了
以上案例整理参考:https://blog.csdn.net/dabokele/article/details/52802150
更多API请参考Spark官网

上面使用的是样例类,会自动将字段名称字段类型与表中的字段进行对应

case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)
@Test
  def demo02: Unit ={

    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )


    import sparkSession.implicits._

    //val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()

    boysDF.show()
  }
+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
|  1|张三| 18| 男|      3|
|  3|李四| 18| 男|      2|
|  4|王五| 18| 男|      2|
|  7|张鹏| 14| 男|      1|
|  8|刘秀| 13| 男|      2|
| 10|乐乐| 21| 男|      1|
+---+----+---+---+-------+

使用printSchema 查看字段类型

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- sex: string (nullable = true)
 |-- classId: integer (nullable = false)

处理使用样例类,也可以使用元组的形式

  @Test
  def demo03(): Unit ={
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )

    import sparkSession.implicits._
    val boysDF: DataFrame = list.toDF()

    boysDF.show()
  }

表字段将使用元组索引命名

+---+----+---+---+---+
| _1|  _2| _3| _4| _5|
+---+----+---+---+---+
|  1|张三| 18| 男|  3|
|  3|李四| 18| 男|  2|
|  4|王五| 18| 男|  2|
|  7|张鹏| 14| 男|  1|
|  8|刘秀| 13| 男|  2|
| 10|乐乐| 21| 男|  1|
+---+----+---+---+---+

使用printSchema 查看字段类型

root
 |-- _1: integer (nullable = false)
 |-- _2: string (nullable = true)
 |-- _3: integer (nullable = false)
 |-- _4: string (nullable = true)
 |-- _5: integer (nullable = false)

toDF(colNames: String*)
重新队列进行命名
字段名为_N的形式,不是很友好,可以自行指定

  @Test
  def demo03(): Unit ={
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )

    import sparkSession.implicits._
    val boysDF: DataFrame = list.toDF("id","name","age","sex","classId")

    boysDF.show()
  }
+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
|  1|张三| 18| 男|      3|
|  3|李四| 18| 男|      2|
|  4|王五| 18| 男|      2|
|  7|张鹏| 14| 男|      1|
|  8|刘秀| 13| 男|      2|
| 10|乐乐| 21| 男|      1|
+---+----+---+---+-------+

总结一下:

  1. 若数据为元组时,字段名为元组的索引名
  2. 列表中的字段类型必须一致。
    如:第一列为id列,第二行的类型却为字符类型
      (1,"张三",18,"男",3),
      ("3","李四",18,"男",2),
  1. 列表中的参数个数必须一致。
    如: 第一列5个参数,第二行3个参数,这样是不行的。
      (1,"张三",18,"男",3),
      (3,"李四",18),
  1. 可以使用 toDF(colNames: String*)重载方法,设置命名,必须元参数个数保持一致。

RDD 转 DataFrame

除了使用集合.toDF,也可以使用rdd.toDF 将 RDD转为DataFrame

  @Test
  def demo04(): Unit ={
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )

    // 获取 SparkContext
    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)

    // 使用 toDF 必须定义隐式转换
    import sparkSession.implicits._
   
    // RDD 转换成 DataFrame 
    val df: DataFrame = rdd.toDF
    df.show()

  }

使用toDF必须定义隐式转换

DataFrame的创建方式[了解]

上面的所有案例都是采用 toDF 的方式创建,关于DataFrame的创建方式一共有四种创建方式。

  1. 可以通过toDF方法创建
    使用toDF必须进行隐式转换
import sparkSession.implicits._
  1. 通过createDataFrame创建

createDataFrame[A <: Product : TypeTag](rdd: RDD[A])

    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )
    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
    val df: DataFrame = sparkSession.createDataFrame(rdd)
    df.show()
+---+----+---+---+---+
| _1|  _2| _3| _4| _5|
+---+----+---+---+---+
|  1|张三| 18| 男|  3|
|  3|李四| 18| 男|  2|
|  4|王五| 18| 男|  2|
|  7|张鹏| 14| 男|  1|
|  8|刘秀| 13| 男|  2|
| 10|乐乐| 21| 男|  1|
+---+----+---+---+---+

createDataFrame(rowRDD: RDD[Row], schema: StructType)

@Test
  def demo06(): Unit ={
    val list=List(
      Row(1,"张三",18,"男",3),
      Row(3,"李四",18,"男",2),
      Row(4,"王五",18,"男",2),
      Row(7,"张鹏",14,"男",1),
      Row(8,"刘秀",13,"男",2),
      Row(10,"乐乐",21,"男",1)
    )


    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[Row] = sc.parallelize(list)
    // 指定StructType

    val fields=Array(
      StructField("id",IntegerType),
      StructField("name",StringType),
      StructField("age",IntegerType),
      StructField("sex",StringType),
      StructField("classId",IntegerType)
    )

    val schema =StructType(fields)

    val df = sparkSession.createDataFrame(rdd, schema)
    df.show()

  }

相关依赖

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row}

剩下的暂时用不到...
createDataFrame[A <: Product : TypeTag](data: Seq[A])
createDataFrame(rowRDD: JavaRDD[Row], schema: StructType)
createDataFrame(rows: java.util.List[Row], schema: StructType)
createDataFrame(rdd: RDD[_], beanClass: Class[_])
createDataFrame(rdd: JavaRDD[_], beanClass: Class[_])
createDataFrame(data: java.util.List[_], beanClass: Class[_])

示例一:

    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )
    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
    val df: DataFrame = sparkSession.createDataFrame(rdd)
    df.show()
  1. 通过读取文件创建
  2. 通过其他的dataFrame衍生
    上面的很多案例也有演示,就是通过上次结果的DataFrame返回一个新的DataFrame
@Test
  def demo08(): Unit ={

    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )


    // 导入隐式转换
    import sparkSession.implicits._

    val femaleDf: DataFrame = female.toDF()
    val boysDf: DataFrame = boys.toDF()

    val unionAllDf: DataFrame = femaleDf.unionAll(boysDf)

    val group: RelationalGroupedDataset = unionAllDf.groupBy("sex")

    val resultDf: DataFrame = group.max("age")

    resultDf.show()

  }
+---+--------+
|sex|max(age)|
+---+--------+
| 男|      21|
| 女|      25|
+---+--------+
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容