Spark开发--Spark SQL--DataFrame(十一)

一、创建DataFrame的几种方式

1. 通过Seq生成

    // Seq生成DataFrame
    val df = spark.createDataFrame(Seq(
      ("数学", "张三", 88), ("语文", "张三", 92),
      ("英语", "张三", 77), ("数学", "王五", 65),
      ("语文", "王五", 87), ("英语", "王五", 90),
      ("数学", "李雷", 67), ("语文", "李雷", 33),
      ("英语", "李雷", 24), ("数学", "宫九", 77),
      ("语文", "宫九", 87), ("英语", "宫九", 90))).toDF("科目", "姓名", "分数")
    // 显示数据
    df.orderBy("科目").show()

+----+----+----+
|科目|姓名|分数|
+----+----+----+
|数学|张三|  88|
|数学|王五|  65|
|数学|宫九|  77|
|数学|李雷|  67|
|英语|张三|  77|
|英语|王五|  90|
|英语|李雷|  24|
|英语|宫九|  90|
|语文|张三|  92|
|语文|宫九|  87|
|语文|王五|  87|
|语文|李雷|  33|
+----+----+----+

2. DataSet生成

Dataset与DataFrame的区别是DataFrame的一行记录中没有指定特定的数据类型,而 Dataset 的一行中的数据都是明确类型的。

    import org.apache.spark.sql.Encoders
    // 指定类型为Encoders.STRING
    val dataSet = spark.createDataset(Array(
      "李明,20,15552211521", "王红,19,13287994007", "刘三,21,15552211523"
    ))(Encoders.STRING)
    spark.read.csv(dataSet).toDF("name", "age", "phone").show()

+----+---+-----------+
|name|age|      phone|
+----+---+-----------+
|李明| 20|15552211521|
|王红| 19|13287994007|
|刘三| 21|15552211523|
+----+---+-----------+

3. 动态创建schema

import java.util
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

    val schema = StructType(List(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("phone", LongType, true)
    ))

    val dataList = new util.ArrayList[Row]()
    dataList.add(Row("李明",20,15552211521L))
    dataList.add(Row("王红",19,13287994007L))
    dataList.add(Row("刘三",21,15552211523L))
    spark.createDataFrame(dataList,schema).show()

+----+---+-----------+
|name|age|      phone|
+----+---+-----------+
|李明| 20|15552211521|
|王红| 19|13287994007|
|刘三| 21|15552211523|
+----+---+-----------+

二、DataFrame API基本操作

1. 数据

people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

2. 基本操作

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object MySpark {

  def main(args: Array[String]) {
    // 定义应用名称
    val conf = new SparkConf().setAppName("mySpark0")
    conf.setMaster("spark://master:7077")
    conf.setJars(Seq("/root/SparkTest.jar"))
    // 创建SparkSession对象
    val spark = SparkSession.builder()
      .appName("DataFrameAPP")
      .config(conf)
      .getOrCreate()

   // 将json文件加载成一个DataFrame
   val peopleDF = spark.read.format("json")
     .load("hdfs://192.168.71.130:9000/people.json")  // 添加hdfs路径

    // 1、输出DataFrame对应的schema信息
    peopleDF.printSchema()
    // 2、输出DataFrame里面的数据,不加参数默认输出前20条
    peopleDF.show(100)

    // 3、查询DataFrame里面某一列数据:select name from table;
    peopleDF.select("name").show()

    // 4、查询某几列所有数据,并对列进行计算:select name,age+10 from table;
    peopleDF.select(peopleDF.col("name"), peopleDF.col("age") + 10).show

    // 5、起个别名:age+10 ==> age2
    peopleDF.select(peopleDF.col("name"),(peopleDF.col("age") + 10)
      .as("age2")).show()

    // 6、过滤,输出年龄大于19岁的
    peopleDF.filter(peopleDF.col("age") > 19).show()

    // 7、根据某一列进行分组,然后再进行聚合操作:select age,count(1) from table group by age
    peopleDF.groupBy("age").count().show()

    //关闭资源
    spark.stop()
  }
}

3. show方法

org.apache.spark.sql.Dataset类的方法:
功能:展示数据

1)show()

show方法有四种调用方式,分别为:
默认,只显示前20条记录

df.show()
2)show(numRows: Int)

显示指定行数:

df.show(1)
3)show(numRows: Int)

显示指定行数:

df.show(1)
4)show(truncate: Boolean)

是否截断长字符串。如果为true,则超过20个字符的字符串将被截断,并且所有单元格都将右对齐。

scala> df.show()
+----+---+------+---+----------------------+-----+
|addr|age|deptid| id|                  name|score|
+----+---+------+---+----------------------+-----+
|天津| 20|    10|  1|赵伟123456789098765...|80.92|

scala> df.show(false)
+----+---+------+---+-----------------------+-----+
|addr|age|deptid|id |name                   |score|
+----+---+------+---+-----------------------+-----+
|天津|20 |10    |1  |赵伟1234567890987654321|80.92|

4)show(numRows: Int, truncate: Int, vertical: Boolean)
scala> df.show(1,3,true)
-RECORD 0-------
 addr   | 天津  
 age    | 20    
 deptid | 10    
 id     | 1     
 name   | 赵伟1 
 score  | 80.   
only showing top 1 row

二、 Spark Scala Sql

数据:

{"id":1,"name":"赵伟","age":20,"addr":"天津","score":80.92,"deptid":10}
{"id":2,"name":"钱枫","age":30,"addr":"北京","score":90,"deptid":20}
{"id":3,"name":"孙斌","age":19,"addr":"上海","score":88,"deptid":30}
{"id":4,"name":"李浩楠","age":19,"addr":"上海","score":87,"deptid":40}
{"id":5,"name":"刘文娟","age":19,"addr":null,"score":96,"deptid":20}

1、查询

 // 按id 与 姓名查询
 df.select("id", "name").show

+---+------+
| id|  name|
+---+------+
|  1|  赵伟|
|  2|  钱枫|
|  3|  孙斌|
|  4|李浩楠|
+---+------+
// 选择字段,并进行计算,必须指定字段别名 as
df.select(df("name"), (df("age")+ 1).as("age1")).show()
+------+----+
|  name|age1|
+------+----+
|  赵伟|  21|
|  钱枫|  31|
|  孙斌|  20|
|李浩楠|  20|
+------+----+

// selectExpr 可以对指定字段进行特殊处理 round 进行四舍五入
df.selectExpr("id", "name as username", "round(score,1)").show(false)

+---+--------+---------------+
|id |username|round(score, 1)|
+---+--------+---------------+
|1  |赵伟    |80.9           |
|2  |钱枫    |90.0           |
|3  |孙斌    |88.0           |
|4  |李浩楠  |88.0           |
+---+--------+---------------+

//  获取指定字段(只能一个,返回为Column)对象
val id1 = df.apply("id")
val id2 = df("id")

2、查询指定字段

col:获取指定字段
  只能获取一个字段,返回对象为Column类型。

val idCol = df.col("id")

apply:获取指定字段
  只能获取一个字段,返回对象为Column类型

  //  获取指定字段(只能一个,返回为Column)
  val id1 = df.apply("id")
  val id2 = df("id")
// 选出两列,把其中一列加1
  df.select(id1, id2 + 1).show

+---+--------+
| id|(id + 1)|
+---+--------+
|  1|       2|
|  2|       3|
|  3|       4|
|  4|       5|
+---+--------+

3、选择指定行

//    first获取第一行
val row = df.first()
print(row.get(0) + "," + row.getAs("name"))

# 执行结果
天津,赵伟

//    head获取第一行,head(n: Int)获取前n行记录
val rows = df.head(2)
for(row<-rows){
  print(row.get(0)+"\t")
  print(row.get(1)+"\t")
  print(row.get(2))
  println()
}
# 执行结果
天津  20  10
北京  30  20

//    take(n: Int)获取前n行数据    
val rows = df.take(2)
for(row<-rows){
  print(row.get(0)+"\t")
  print(row.get(1)+"\t")
  print(row.get(2))
  println()
}
# 执行结果
天津  20  10
北京  30  20

 //  takeAsList(n: Int)获取前n行数据,并以List的形式展现
val rows = df.takeAsList(2)
// 查看输出数据
println(rows)
// 读取0行 0列数据
println(rows.get(0).get(0))
val iterator = rows.iterator()
while(iterator.hasNext){
  val row=iterator.next()
  print(row.get(0)+"  "+row.get(1)+"  "+row.get(2))
  println()
}

# 执行结果
[[天津,20,10,1,赵伟,80.92], [北京,30,20,2,钱枫,90.0]]
天津
天津  20  10
北京  30  20

以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。
take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError

4、条件查询

where(conditionExpr: String):SQL语言中where关键字后的条件,传入筛选条件表达式,可以用and和or。得到DataFrame类型的返回结果。

// 条件为id等于1,姓名为刘文娟
df.where("id = 1 or name = '刘文娟'" ).show()

+----+---+------+---+------+-----+
|addr|age|deptid| id|  name|score|
+----+---+------+---+------+-----+
|天津| 20|    10|  1|  赵伟|80.92|
|null| 19|    20|  5|刘文娟| 96.0|
+----+---+------+---+------+-----+

filter:根据字段进行筛选,传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同

// 条件为id等于1,姓名为刘文娟
df.filter("id = 1 or name = '刘文娟'" ).show()
+----+---+------+---+------+-----+
|addr|age|deptid| id|  name|score|
+----+---+------+---+------+-----+
|天津| 20|    10|  1|  赵伟|80.92|
|null| 19|    20|  5|刘文娟| 96.0|
+----+---+------+---+------+-----+

5、 limit

limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。

df.limit(3).show

+----+---+------+---+----+-----+
|addr|age|deptid| id|name|score|
+----+---+------+---+----+-----+
|天津| 20|    10|  1|赵伟|80.92|
|北京| 30|    20|  2|钱枫| 90.0|
|上海| 19|    30|  3|孙斌| 88.0|
+----+---+------+---+----+-----+

6、删除操作

删除指定字段(列),保留其他字段

//  删除id字段
df.drop("id").show()
+----+---+------+------+-----+
|addr|age|deptid|  name|score|
+----+---+------+------+-----+
|天津| 20|    10|  赵伟|80.92|
|北京| 30|    20|  钱枫| 90.0|
|上海| 19|    30|  孙斌| 88.0|
|上海| 19|    40|李浩楠| 88.0|
+----+---+------+------+-----+

//  删除name字段
df.drop(df("name")).show()
+----+---+------+---+-----+
|addr|age|deptid| id|score|
+----+---+------+---+-----+
|天津| 20|    10|  1|80.92|
|北京| 30|    20|  2| 90.0|
|上海| 19|    30|  3| 88.0|
|上海| 19|    40|  4| 88.0|
+----+---+------+---+-----+

//  删除带有空值的行
df.na.drop().show()
+----+---+------+---+------+-----+
|addr|age|deptid| id|  name|score|
+----+---+------+---+------+-----+
|天津| 20|    10|  1|  赵伟|80.92|
|北京| 30|    20|  2|  钱枫| 90.0|
|上海| 19|    30|  3|  孙斌| 88.0|
|上海| 19|    40|  4|李浩楠| 87.0|
+----+---+------+---+------+-----+

7、排序查询

orderBy|sort($"列名") 升序排列
orderBy|sort($"列名".desc) 降序排列
orderBy|sort($"列1" , $"列2".desc) 按两列排序

// 升序排列,只对数字类型和日期类型生效
df.select("id","name").orderBy(df("id")).show

+---+----+
| id|name|
+---+----+
|  1|赵伟|
|  2|钱枫|
|  3|孙斌|
+---+----+
// 降序排列,只对数字类型和日期类型生效
df.select("id","name").orderBy(df("id").desc).show
df.select("id","name").orderBy(-df("id")).show
+---+------+
| id|  name|
+---+------+
|  4|李浩楠|
|  3|  孙斌|
|  2|  钱枫|
|  1|  赵伟|
+---+------+

// sort降序,sort用法和orderBy相同,可以直接替换
df.select("id","name").sort(df("id").desc).show

// 一列升序,一列降序,同时调整两列
df.select("age","deptid").sort(df("age").desc,df("deptid").asc).show
+---+------+
|age|deptid|
+---+------+
| 30|    20|
| 20|    10|
| 19|    30|
| 19|    40|
+---+------+

//  sortWithinPartitions 和sort类似,但是是使用Partition来对其他字段排序
df.sortWithinPartitions("deptid").show
+----+---+------+---+------+-----+
|addr|age|deptid| id|  name|score|
+----+---+------+---+------+-----+
|天津| 20|    10|  1|  赵伟|80.92|
|北京| 30|    20|  2|  钱枫| 90.0|
|null| 19|    20|  5|刘文娟| 96.0|
|上海| 19|    30|  3|  孙斌| 88.0|
|上海| 19|    40|  4|李浩楠| 87.0|
+----+---+------+---+------+-----+

8、 去重

(1)distinct:返回一个不包含重复记录的Dataframe(整体去重),结果和dropDuplicates不传入指定字段的结果相同。
返回当前不重复的row(行)记录df.distinct()。
(2)dropDuplicates:根据指定字段去重
根据指定字段去重。
加入重复数据:

{"id":4,"name":"李浩楠","age":19,"addr":"上海","score":87,"deptid":40}

去重:

df.distinct().show

+----+---+------+---+------+-----+
|addr|age|deptid| id|  name|score|
+----+---+------+---+------+-----+
|null| 19|    20|  5|刘文娟| 96.0|
|北京| 30|    20|  2|  钱枫| 90.0|
|上海| 19|    40|  4|李浩楠| 87.0|
|上海| 19|    30|  3|  孙斌| 88.0|
|天津| 20|    10|  1|  赵伟|80.92|
+----+---+------+---+------+-----+

指定字段去重

df.dropDuplicates(Seq("age")).show

+----+---+------+---+----+-----+
|addr|age|deptid| id|name|score|
+----+---+------+---+----+-----+
|上海| 19|    30|  3|孙斌| 88.0|
|北京| 30|    20|  2|钱枫| 90.0|
|天津| 20|    10|  1|赵伟|80.92|
+----+---+------+---+----+-----+

9、分组查询--聚合(groupBy&agg)

groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。
groupBy("列名", ...).max(列名) 求最大值
groupBy("列名", ...).min(列名) 求最小值
groupBy("列名", ...).avg(列名) 求平均值
groupBy("列名", ...).sum(列名) 求和
groupBy("列名", ...).count() 求个数
groupBy("列名", ...).agg 可以将多个方法进行聚合

// 按年龄分组
df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 19|    2|
| 30|    1|
| 20|    1|
+---+-----+

//  分组计数(各个年龄的人出现了多少次)
df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 19|    4|
| 30|    1|
| 20|    1|
+---+-----+

GroupedData对象:
该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,
max(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
min(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
mean(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段(avg也可)
sum(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
count()方法,获取分组中的元素个数
agg方法,可以对指定字段进行聚合操作。

// 按年龄分组,找出第组最高分、
df.groupBy("age").max("score").show

+---+----------+
|age|max(score)|
+---+----------+
| 19|      96.0|
| 30|      90.0|
| 20|     80.92|
+---+----------+

// 按年龄分组,找出每组平均分、人数
df.groupBy("age").agg(Map("score" -> "max", "score" -> "mean","*" -> "count")).show()

// 如果有相同的列,使用别名将全显示
scala> df.groupBy("age").agg(max("score").alias("max_score"),avg("score").alias("avg_score")).show()
+---+---------+-----------------+
|age|max_score|        avg_score|
+---+---------+-----------------+
| 19|     96.0|90.33333333333333|
| 30|     90.0|             90.0|
| 20|    80.92|            80.92|
+---+---------+-----------------+

注意:
如果聚合中出现相同的字段,只显示最后一个字段。

10、union 合并查询

unionAll方法(已过时,使用union相同):对两个DataFrame进行组合,类似于SQL中的UNION ALL操作。


// 合并不去重
df.union(df.limit(1)).show()

+----+---+------+---+------+-----+
|addr|age|deptid| id|  name|score|
+----+---+------+---+------+-----+
|天津| 20|    10|  1|  赵伟|80.92|
|北京| 30|    20|  2|  钱枫| 90.0|
|上海| 19|    30|  3|  孙斌| 88.0|
|上海| 19|    40|  4|李浩楠| 87.0|
|上海| 19|    40|  4|李浩楠| 87.0|
|null| 19|    20|  5|刘文娟| 96.0|
|天津| 20|    10|  1|  赵伟|80.92|
+----+---+------+---+------+-----+

11、join 连接查询

添加部门数据:

{"deptid":10,"dname":"财务部"}
{"deptid":20,"dname":"研发部"}
{"deptid":30,"dname":"事业部"}
{"deptid":40,"dname":"人力部"}
{"deptid":60,"dname":"项目部"}

1)笛卡尔积

在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。

// 将json文件加载成一个DataFrame
val df = spark.read.format("json")
  .load("hdfs://192.168.71.130:9000/people.json") // 添加hdfs路径
val df1 = spark.read.format("json")
  .load("hdfs://192.168.71.130:9000/depart.json") // 添加hdfs路径
// 笛卡尔积
df.join(df1).show()

+----+---+------+---+------+-----+--------------------+------+------+
|addr|age|deptid| id|  name|score|     _corrupt_record|deptid| dname|
+----+---+------+---+------+-----+--------------------+------+------+
|天津| 20|    10|  1|  赵伟|80.92|                null|    10|财务部|
|天津| 20|    10|  1|  赵伟|80.92|{"deptid":20"dnam...|  null|  null|
|天津| 20|    10|  1|  赵伟|80.92|{"deptid":30"dnam...|  null|  null|
|天津| 20|    10|  1|  赵伟|80.92|{"deptid":40"dnam...|  null|  null|
|北京| 30|    20|  2|  钱枫| 90.0|                null|    10|财务部|
|北京| 30|    20|  2|  钱枫| 90.0|{"deptid":20"dnam...|  null|  null|
|北京| 30|    20|  2|  钱枫| 90.0|{"deptid":30"dnam...|  null|  null|
|北京| 30|    20|  2|  钱枫| 90.0|{"deptid":40"dnam...|  null|  null|
|上海| 19|    30|  3|  孙斌| 88.0|                null|    10|财务部|
|上海| 19|    30|  3|  孙斌| 88.0|{"deptid":20"dnam...|  null|  null|
|上海| 19|    30|  3|  孙斌| 88.0|{"deptid":30"dnam...|  null|  null|
|上海| 19|    30|  3|  孙斌| 88.0|{"deptid":40"dnam...|  null|  null|
|上海| 19|    40|  4|李浩楠| 87.0|                null|    10|财务部|
|上海| 19|    40|  4|李浩楠| 87.0|{"deptid":20"dnam...|  null|  null|
|上海| 19|    40|  4|李浩楠| 87.0|{"deptid":30"dnam...|  null|  null|
|上海| 19|    40|  4|李浩楠| 87.0|{"deptid":40"dnam...|  null|  null|
|上海| 19|    40|  4|李浩楠| 87.0|                null|    10|财务部|
|上海| 19|    40|  4|李浩楠| 87.0|{"deptid":20"dnam...|  null|  null|
|上海| 19|    40|  4|李浩楠| 87.0|{"deptid":30"dnam...|  null|  null|
|上海| 19|    40|  4|李浩楠| 87.0|{"deptid":40"dnam...|  null|  null|
+----+---+------+---+------+-----+--------------------+------+------+
only showing top 20 rows

常见问题:

A. Either: use the CROSS JOIN syntax to allow cartesian products between these

解决方案:设置spark.sql.crossJoin.enabled=true
在配置中添加参数。

 // 定义应用名称
    val conf = new SparkConf().setAppName("mySpark0")
    conf.setMaster("spark://master:7077")
    conf.setJars(Seq("/root/SparkTest.jar"))
    conf.set("spark.sql.crossJoin.enabled", "true")
B. Spark-Shell中设置
spark.conf.set("spark.sql.crossJoin.enabled", "true")

2)using一个字段形式

join类似于table-a join table-b using column1的形式,需要两个DataFrame中有相同的一个列名

// 将json文件加载成一个DataFrame
val df = spark.read.format("json")
  .load("hdfs://192.168.71.130:9000/people.json") // 添加hdfs路径
val df1 = spark.read.format("json")
  .load("hdfs://192.168.71.130:9000/depart.json") // 添加hdfs路径
// 使用deptid进行关联,using一个字段形式 
df.join(df1, "deptid").show()

+------+----+---+---+------+-----+------+
|deptid|addr|age| id|  name|score| dname|
+------+----+---+---+------+-----+------+
|    10|天津| 20|  1|  赵伟|80.92|财务部|
|    20|北京| 30|  2|  钱枫| 90.0|研发部|
|    30|上海| 19|  3|  孙斌| 88.0|事业部|
|    40|上海| 19|  4|李浩楠| 87.0|人力部|
|    40|上海| 19|  4|李浩楠| 87.0|人力部|
|    20|null| 19|  5|刘文娟| 96.0|研发部|
+------+----+---+---+------+-----+------+

using多个字段形式:
除了上面这种using一个字段的情况外,还可以using多个字段

 df.join(df1, Seq("deptid","name")).show()
3) 指定join类型

两个DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型。在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型。

// 内连接
df.join(df1, Seq("deptid"),"inner").show()

inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrmae时,可以不用关心哪个是左表,哪个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。

// 左外连接:
df.join(df1, Seq("deptid"),"left_outer").show()

left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。我们在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。

// 右外连接
df.join(df1, Seq("deptid"),"right_outer").show()

right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。所以说,右表是streamIter,左表是buildIter,我们在写sql语句或者使用DataFrmae时,一般让大表在右边,小表在左边。

// 全连接
df.join(df1, Seq("deptid"),"outer").show()

full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,但是又不能简单地先left outer join,再right outer join,最后union得到最终结果,因为这样最终结果中就存在两份inner join的结果了。因为既然完成left outer join又要完成right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter。

// 半连接
df.join(df1, Seq("deptid"),"leftsemi").show()
// 半连接
df.join(df1, Seq("deptid"),"leftanti").show()

说明:
left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回null。
left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录。

4)使用Column类型来join

如果不用using模式,灵活指定join字段的话,可以使用如下形式:

df.join(df1, df("deptid")=== df1( "deptid")).show()

+----+---+------+---+------+-----+------+------+
|addr|age|deptid| id|  name|score|deptid| dname|
+----+---+------+---+------+-----+------+------+
|天津| 20|    10|  1|  赵伟|80.92|    10|财务部|
|北京| 30|    20|  2|  钱枫| 90.0|    20|研发部|
|上海| 19|    30|  3|  孙斌| 88.0|    30|事业部|
|上海| 19|    40|  4|李浩楠| 87.0|    40|人力部|
|上海| 19|    40|  4|李浩楠| 87.0|    40|人力部|
|null| 19|    20|  5|刘文娟| 96.0|    20|研发部|
+----+---+------+---+------+-----+------+------+

// 在指定join字段同时指定join类型 
 df.join(df1, df("deptid")=== df1( "deptid"),"inner").show()

12、获取指定字段统计信息

stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。
根据score字段,统计该字段值出现频率在30%以上的内容。在df中字段score的出现频率,大于0.3

df.stat.freqItems(Seq ("score") , 0.3).show()
+---------------+
|score_freqItems|
+---------------+
|   [96.0, 87.0]|
+---------------+

freqItems用于计算一列或几列中出现频繁的的值的集合。

13、获取两个DataFrame中共有的记录(交集)

数据:
student1

{"id":2,"name":"钱枫","age":30,"addr":"北京","score":90,"deptid":20}
{"id":3,"name":"孙斌","age":19,"addr":"上海","score":88,"deptid":30}
{"id":4,"name":"李浩楠","age":19,"addr":"上海","score":87,"deptid":40}
{"id":5,"name":"刘文娟","age":19,"addr":null,"score":96,"deptid":20}

intersect方法可以计算出两个DataFrame中相同的记录,

df.intersect(df1.limit(1)).show(false)

14、获取一个DataFrame中有另一个DataFrame中没有的记录(差集)

数据同:13

 df.except(df1.limit(1)).show(false)

15、操作字段名

(1)withColumnRenamed:重命名DataFrame中的指定字段名

如果指定的字段名不存在,不进行任何操作。下面示例中将df中的id字段重命名为idx。

scala> df.withColumnRenamed( "id" , "idx" ).show()
+----+---+------+---+------+-----+
|addr|age|deptid|idx|  name|score|
+----+---+------+---+------+-----+
|天津| 20|    10|  1|  赵伟|80.92|
+----+---+------+---+------+-----+

(2)withColumn:往当前DataFrame中新增一列

whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。
以下代码往df中新增一个名为id2的列:

scala>  df.withColumn("id2", df("id")).show( false)
+----+---+------+---+------+-----+---+
|addr|age|deptid|id |name  |score|id2|
+----+---+------+---+------+-----+---+
|天津|20 |10    |1  |赵伟  |80.92|1  |
+----+---+------+---+------+-----+---+

16、行、列转换

数据:

    val df = spark.createDataFrame(Seq(
      ("数学", "张三", 88), ("语文", "张三", 92),
      ("英语", "张三", 77), ("数学", "王五", 65),
      ("语文", "王五", 87), ("英语", "王五", 90),
      ("数学", "李雷", 67), ("语文", "李雷", 33),
      ("英语", "李雷", 24), ("数学", "宫九", 77),
      ("语文", "宫九", 87), ("英语", "宫九", 90))).toDF("科目", "姓名", "分数")

1)行转列:

用PIVOT透视函数即可实现行转列(重塑数据)

    // 创建临时视图
    df.createOrReplaceTempView("scores")
    // 注意:分数的符号为`,而不是单引号
    // 说明:按分数进行分组(内容) 显示表头为姓名
    val sql ="select * from scores pivot (sum(`分数`) for `姓名` in ('张三','王五','李雷','宫九'))"
    // 数据查询显示
    spark.sql(sql).show()

// 执行结果
+----+----+----+----+----+
|科目|张三|王五|李雷|宫九|
+----+----+----+----+----+
|英语|  77|  90|  24|  90|
|语文|  92|  87|  33|  87|
|数学|  88|  65|  67|  77|
+----+----+----+----+----+

2)列转行:(逆透视Unpivot)

Spark没有提供内置函数来实现unpivot操作,不过我们可以使用Spark SQL提供的stack函数来间接实现需求。
stack()
stack(n, expr1, ..., exprk) - 会将expr1, ..., exprk 分割为n行.

// stack函数
    // 创建临时视图
    df.createOrReplaceTempView("scores")
    // 注意:分数的符号为`,而不是单引号
    // 说明:按分数进行分组(内容) 显示表头为姓名
    val sql ="select * from scores pivot (sum(`分数`) for `姓名` in ('张三','王五','李雷','宫九'))"
    // 数据查询显示
    val pivot=spark.sql(sql)
    // 创建视图
    pivot.createOrReplaceTempView("v_pivot")
    // 4 为分割行数 '张三'为姓名显示内容 `张三`为引用值即为分数, as 内容为显示表头
    val  stack_sql="select `科目`,stack(4, '张三', `张三`, '王五', `王五`, '李雷', `李雷`, '宫九', `宫九`) as (`姓名`, `分数` )" +
      " from  v_pivot "
    //  执行SQL
    val df_stack = spark.sql(stack_sql)
    df_stack.show()

// 执行结果
+----+----+----+
|科目|姓名|分数|
+----+----+----+
|英语|张三|  77|
|英语|王五|  90|
|英语|李雷|  24|
|英语|宫九|  90|
|语文|张三|  92|
|语文|王五|  87|
|语文|李雷|  33|
|语文|宫九|  87|
|数学|张三|  88|
|数学|王五|  65|
|数学|李雷|  67|
|数学|宫九|  77|
+----+----+----+

17、数据拆分与合并

1) 数据合并

concat_ws() 函数

    import org.apache.spark.sql.functions._
    val df = spark.createDataFrame(Seq(
      ("张三", 20, "篮球"), ("王五", 22, "游泳")))
      .toDF("姓名", "年龄", "爱好")

    // 数据合并,三列合一列
    val ds = df.select(concat_ws(",", df("姓名"), df("年龄"), df("爱好"))
      .cast(StringType).as("value"))
    ds.show()

// 执行结果
+------------+
|       value|
+------------+
|张三,20,篮球|
|王五,22,游泳|
+------------+

2) 数据拆分

explode函数可以把数组分割为多行

     import org.apache.spark.sql.functions._
    val df = spark.createDataFrame(Seq(
      ("张三", 20, "篮球,羽毛球"), ("王五", 22, "游泳,慢跑,编程")))
      .toDF("姓名", "年龄", "爱好")
    // 数据分割
    val dataset = df.withColumn("爱好",
      explode(split(df.col("爱好"), ",")))
    dataset.show()

// 执行结果
+----+----+------+
|姓名|年龄|  爱好|
+----+----+------+
|张三|  20|  篮球|
|张三|  20|羽毛球|
|王五|  22|  游泳|
|王五|  22|  慢跑|
|王五|  22|  编程|
+----+----+------+

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容