概述
结构化API可以用来处理各种数据类型,包括非结构化的日志文件、半结构化的csv文件以及结构化的parquet文件。Spark中的结构化API主要是指以下三种核心分布式集合类型的API:
- DataSet
- DataFrame
- SQL表和视图
这些结构化API基本都适用于流处理和批处理,这意味着只需要修改较少的代码即可实现二者的转化。
- DataFrame和Dataset
DF和DS是具有行和列的类似于数据表的集合。所有列的行数相同,缺省值用null补充,并且某一列的类型在所有行中必须保持一致。
在实际应用中通常使用SQL来操作DF,而不是执行DF专用代码。
Spark内部有一个名为Catalyst的引擎,所有其他语言API都会在内部转化为Catalyst执行。
DF与DS比较
类型不同:DF是非类型化(类型由Spark维护,为Row);DS是类型化的,类型需要我们自行维护,且只适用于使用JVM的语言(Scala、Java),通过case或java beans指定类型。
- DF的行和列
列:是一个简单类型(如:int或String等)或复杂类型(map或array等)或null。spark记录所有这些类型并提供多种转换方法。
行:一行对应一条记录。通过对DF调用collect方法可以展示,DF中每条记录都是Row类型。
Spark类型与其他语言类型对应关系(见官网)
结构化API执行
- 编写DF/DS/SQL代码;
- 如果代码能够有效执行,Spark将其转换为一个逻辑执行计划(logical plan)
- Spark将这个逻辑执行计划转换为一个物理执行计划(Physical Paln),检查可行的优化策略,并在此过程中检查优化
- Spark在集群上执行该物理计划(RDD操作)
基本的结构化操作
介绍DF的基本操作。
DF由多条record构成,record是Row类型(类比表中的一条数据)。每条record由说个列组成,一系列的操作都是对列进行的。
数据模式(Schema)定义了DF列名及数据类型;DF的分区定义了DF在集群上的物理分布;划分模式定义了partition的分配方式,可以自定义也可以选择随机分配。
DF创建示例:
val df = spark.read.format("json").load("/usr/root/input/2015-summary.json") //创建DF
df.printSchema() //查看DF的schema信息,即名字与类型
1 数据模式(Schema)
schema定义了DF列的名字及数据类型,它可以由数据源来定义(称为读时模式,schema-on-read),也可以显式指定。
根据实际应用场景决定定义schema的方式。
查看DF的schema:
df.schema
//运行结果:
res1: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true))
由上述运行结果可知,一个schema是由许多字段构成的StructType。这些字段就是StructField,每个StructField包含了名称、类型、和一个布尔值(指定该列是否可以包含缺失值或空值),用户还可以在此指定与该列关联的元数据(metadata)。
自定义schema示例:
//自定义schema
val myManualSchema = StructType(
Array(StructField("DEST_COUNTRY_NAME",StringType,true),
StructField("ORIGIN_COUNTRY_NAME",StringType,true),
StructField("count",LongType,false,
Metadata.fromJson("{\"hello\":\"world\"}"))
))
//创建DF时指定自定义的schema
val df = spark.read.format("json").scheme(myManualSchema).load("/usr/root/input/2015-summary.json")
2、列和表达式
对DF中的列进行选择、transform操作和删除等操作的表示称为表达式。在Spark中,列是逻辑结构,要操作一个列必须有一个行,有行则必须有一个DF,因此对列的操作都在DF上进行。
列
构造和引用列的方法col函数和column函数。使用时要传入列名:
col("someColumnName")
column("someColumnName")
- 显式列引用
直接在DF上使用col方法:
df.col("count")
表达式(expression)
表达式是对一个DF中的某一个或多个值的一组转换操作。(可以理解为一个函数,将一个或多个列名作为输入,解析它们,然后对每条记录应用该表达式得到一个单值。这个单值可以是一个复杂类型,如array或map等)
- expr:创建表达式函数
expr("someCol") //等同于col("someCol")
//几个相同的转换操作表达式
expr("someCol - 5")
col("someCol") -5
expr("someCol") – 5
//相同的转换操作,代码不同但转换为底层逻辑树都是一样的
(((col("someCol") + 5) * 200) - 6) < col("otherCol")
expr("(((someCol + 5) * 200) - 6) < otherCol")
- printSchema:查询DF的所有列
- columns:访问DF的所有列
df.columns
3、记录和行
DF中的每一行就是一个记录,都是Row类型的对象。表达式操作的就是Row对象,Row对象内部是字节数组,Spark无法直接访问这些数组,只能使用表达式去操纵。
- first:查看DF中的一行数据
df.first()
- 创建行和使用行
import org.apache.spark.sql.Row
val myRow = Row("Hello",null,1,false) //创建行
//指定想要的位置,使用java或scala时,必须用辅助方法或者显示指定值的类型
myRow(0) // 任意类型
myRow(0).asInstanceOf[String] // 字符串
myRow.getString(0) // 字符串
myRow.getInt(2) // 整型
4、DataFrame转换操作
几种主要操作:
- 添加行或列
- 删除行或列
- 行列互转
- 根据列值排列行的顺序
- read:从原始数据源中创建DF
- createOrReplaceTempView:注册临时表
val df = spark.read.format("json").load("/usr/root/input/2015-summary.json")
df.createOrReplaceTempView("dfTable") //创建临时表
- createDataFrame:用RDD创建DF
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false))) //创建自定义schema
val myRows = Seq(Row("Hello", null, 1L)) //构造Row对象合集
val myRDD = spark.sparkContext.parallelize(myRows) //构造RDD
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show() //创建DF并打印出来
还可以使用toDF函数实现Sep到DF的隐式转换,该方法对null的支持不稳定,需酌情使用。
val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")
- select函数和selectExpr函数
处理DF三个常用的工具:处理列和表达式的select方法;处理字符串表达式的selectExpr方法;org.apache.spark.sql.functions包内的其他方法。
1)select函数实现对DF的SQL查询
df.select("DEST_COUNTRY_NAME").show(2)
//等同于SQL
SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2
//使用多种方式引用列
import org.apache.spark.sql.functions.{expr, col, column}
df.select(
df.col("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"),
'DEST_COUNTRY_NAME,
$"DEST_COUNTRY_NAME",
expr("DEST_COUNTRY_NAME"))
.show(2)
expr是目前最灵活的方式,可以引用列也可以引用SQL字符串。
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
//相当于sql
SELECT DEST_COUNTRY_NAME as destination FROM dfTable LIMIT 2
//使用alias方法再将列名改回去
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))
.show(2)
2)selectExpr表达式
等同于select表达式加expr的写法。可以使用selectExpr构建复杂表达式来创建DF。
//判断两个列的值是否相等,用比较结果构建新的列
df.selectExpr(
"*", //所有列
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
.show(2)
//等同于SQL
SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry FROM dfTable LIMIT 2
还可以使用系统预定义的聚合函数操作DF。
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)
//相当于SQL
SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable LIMIT 2
- lit:传递字面量(literal)
import org.apache.spark.sql.functions.lit
df.select(expr("*"), lit(1).as("One")).show(2)
//相当于SQL
SELECT *, 1 as One FROM dfTable LIMIT 2
- withColumn:添加列,两个参数分别为列名和赋值过程
//添加一个值为1的列
df.withColumn("numberOne",lit(1)).show(2)
//相当于SQL
SELECT *,1 AS numberOne FROM dfTable LIMIT 2
//使用测试数据,判断出发地与目的地在相同国家的数据
df.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)
//还可以用来重命名列
df.withColumn("Destination",expr("DEST_COUNTRY_NAME"))
- withColumnRenamed重命名列。两个参数,第一个是待修改的列名,第二个为修改后的列名。
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns
- 保留字与关键字:使用反引号(`)实现
import org.apache.spark.sql.functions.expr
val dfWithColName = ddf.withColumn("This Long Column-name",expr("DEST_COUNTRY_NAME")) //修改列名
dfWithColName。selectExpr("`This Long Column-name`","`This Long Column-name` as `newcol`") //引用列名时需要进行转义
//相当于SQL
SELECT `This Long Column-name`,`This Long Column-name` as `newcol` FROM dfTableLong LIMIT 2
区分大小写: set spark.sql.caseSensitive true
drop:删除列
当然可以通过select实现(只选出部分列后生成新的RDD)。drop后可以跟多个参数列。
df.drop("ORIGIN_COUNTRY_NAME").columns
- withColumn+cast:强制转换列的数据类型
df.withColumn("count2",col("count").cast("long"))
//相当于SQL
SELECT *, cast(count as long) AS count2 FROM dfTable
- where或filter:过滤行
构建表达式来判断该表达式是true还是false,过滤掉false的行。相当于SQL中的where子句。
df.filter(col("count") < 2).show(2)
//相当于SQL
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
//多过滤条件关联
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)
//相当于SQL
SELECT * FROM dfTable WHERE count < 2 AND RIGIN_COUNTRY_NAME != "Croatia"
LIMIT 2
注意在使用多条件过滤时,Spark会同时执行所有操作,不管过滤条件的先后顺序,所以使用的时候并不总是能有效实现我们的需求。使用时只需将所有条件串在一起即可,不需考虑先后顺序。
- distinct:去重
df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()
//相当于SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
- sample:随机抽样,其中参数withReplacement的值决定抽样是否放回,true为有放回抽样
val seed = 5
val withReplacement = false
var fraction = 0.5
df.sample(withReplacement,fraction,seed).count
- randomSplit:当需要将原始DataFrame随机分割成多个分片时,可以使用随机分割。
由于随机分割是一种随机方法,所以我们还需要指定一个随机种子(只需在代码中用特定数字来替换seed)。需要注意的是,如果一个DataFrame的分
割比例的和不为1,则比例参数会被自动归一化。
val dataFrames = df.randomSplit(Array(0.25,0.75),seed)
dataFrames(0).count() > dataFrames(1).count()
- union:联合操作
由于DF是不可变的,这意味着我们不能向DF中追加行。可以通过union的方式将两个DF联合起来实现。这种操作必须保证两个DF具有相同的列数和schema。
import org.apache.spark.sql.Row
//创建新的DF
val schema = df.schema
val newRows = Sep(Row("New Country","Other Country",5L),Row("New Country 2","Other Country 2",1L))
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows,schema)
//联合两个DF
df.union(newDF).where("count = 1").where($"ORIGIN_COUNTRY_NAME" =!= "United States").show()
当DataFrame追加了记录后,需要对产生的新DataFrame进行引用。一个常见的方式是将这个新DataFrame变成视图(View)或者注册成一个数据表,以便在代码中使用。
- sort和orderBy:行排序(二者等价)
df.sort("count").show(5)
//指定升降序
import org.apache.spark.sql.functions.{desc, asc}
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc("count"),asc("DEST_COUNTRY_NAME")).show(2)
//相当于SQL
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
你可以指定空值在排序列表中的位置,asc_nulls_first、esc_nulls_first、asc_nulls_last、desc_nulls_last
- sortWithinPartitions:对每个分区进行内部排序
spark.read.format("json").load("/data/flight-data/json/*-summary.json")
.sortWithinPartitions("count")
- limit:只取几条
df.limit(5).show()
df.orderBy(expr("count desc")).limit(5).show()
//相当于SQL
SELECT * FROM dfTable ORDER BY count desc LIMIT 5
- repartitions和coalesce:重划分和合并
另一个重要的优化是根据一些经常过滤的列对数据进行分区,控制跨群集数据的物理布局,包括分区方案和分区数。
不管是否有必要,重新分区都会导致数据的全面洗牌。如果将来的分区数大于当前的分区数,或者当你想要基于某一组特定列来进行分区时,通常只能重新分区。
//获取分区数
df.rdd.getNumPartitions
//设置重分区数
df.repartitions(5)
//如果你知道你经常按某一列执行过滤操作,则根据该列进行重新分区
df.repartitions(col("DEST_COUNTRY_NAME"))
//合并操作(coalesce)不会导致数据的全面洗牌,但会尝试合并分区
df.repartitions(5,col("DEST_COUNTRY_NAME")).coalesce(2) //先分为5个分区再合并它们
- toLocalIterator:驱动器获取行
为了遍历整个数据集,还有一种让驱动器获取行的方法,即toLocalIterator函数。toLocalIterator函数式一个迭代器,将每个分区的数据返回给驱动器。这个函数允许你以串行的方式一个一个分区地迭代整个数据集。
val collectDF = df.limit(10)
collectDF.take(5) // 获取整数行
collectDF.show() // 更友好的打印
collectDF.show(5, false)
collectDF.collect()
collectDF.toLocalIterator()
【注意】:将数据集合传递给驱动器的代价很高。当数据集很大时调用collect函数,可能会导致驱动器崩溃。如果使用toLocalIterator,并且分区很大,则容易使驱动器节点崩溃并丢失应用程序的状态,代价也是巨大的。因此我们可以一个一个分区进行操作,而不是并行运行。
5、处理不同类型的数据
- 处理布尔型数据
布尔型数据通常用于条件过滤,由and、or、true、false四要素构成。
import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo").equalTo(536365)) //等同于col("InvoiceNo")===536365
.select("InvoiceNo","Description")
.show(5,false)
【关于spark中的相等判断】:使用Scala进行相等判断时,应该使用===(等于)或者=!=(不等于)符号,另外还可以使用not或equalTo函数实现。python中则使用传统的==和!=。
还可以直接使用字符串表达式进行比较,这种方式比较简洁:
df.where("InvoiceNo = 536365")
.show(5,false)
df.where("InvoiceNo <> 536365")
.show(5,false)
多个表达式的连接
虽然可以通过and或or将布尔表达式连接在一起。但在spark中,最好以链式连接的方式进行组合,形成顺序执行的过滤器。spark会将所有的过滤器合并为一条语句同时执行,隐式创建and。or语句需要在同一个语句中指定。
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)).show()
//串行连接时直接使用“.”,or在同一语句中指定
//相当于SQL
SELECT * FROM dfTable WHERE StockCode in ("DOT") AND(UnitPrice > 600 OR
instr(Description, "POSTAGE") >= 1)
其他过滤器的实现
实现过滤并不一定要使用布尔表达式。例如:
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive",DOTCodeFilter.and(priceFilter.or(descripFilter)))
.where("isExpensive")
.select("unitPrice","isExpensive").show(5) //构建isExpensive列,保留值为true的行
//相当于SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))
使用SQL表达式和使用DF接口的效果是等价的,不会有性能差异。例如下列两种方式实现的效率是相同的。
import org.apache.spark.sql.functions.{expr, not, col}
df.withColumn("isExpensive", not(col("UnitPrice").leq(250)))
.filter("isExpensive")
.select("Description", "UnitPrice").show(5)
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))
.filter("isExpensive")
.select("Description", "UnitPrice").show(5)
【注意】在执行布尔表达式时,如果数据出现空值就会有问题,需要使用不同方式处理。例如:
df.where(col("Description").eqNullSafe("hello")).show()
- 处理数值类型
1)DF操作进行数值计算
//求(Quantity*UnitPrice)^2+5
import org.apache.spark.sql.functions.{expr,pow}
val fabricatedQuantity = pow(col("Quantity")*col("UnitPrice"),2)+5 //自定义计算函数
df.select(expr("CustomerId"),fabricatedQuantity.alias("realQuantity")).show(2)
2)使用SQL表达式进行计算
df.selectExpr(
"CustomerId",
"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
取整操作:round(向上取整)、bround(向下取整)。可以指定精度。
import org.apache.spark.sql.functions.{round, bround,lit}
df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
求列的Pearson相关系数corr
import org.apache.spark.sql.functions.{corr}
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()
计算列的统计信息(计数、均值、标准差、最值):describe
df.describe().show()
也可以通过导入函数实现上述聚合操作需求并得到更精确的值
import org.apache.spark.sql.functions.{count, mean, stddev_pop, min, max}
statFunction包中封装了许多统计函数。如approxQuantile用来计算数据的精确分位数或近似分位数、crosstab来查看交叉列表或频繁项对、monotonically_increasing_id函数为每行添加一个唯一的ID,它会从0开始,为每行生成一个唯一值。
//approxQuantile用法
val colName = "UnitPrice"
val quantileProbs = Array(0.5)
val relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) // 2.51
//crosstab用法
df.stat.crosstab("StockCode", "Quantity").show()
df.stat.freqItems(Seq("StockCode", "Quantity")).show()
//monotonically_increasing_id用法
import org.apache.spark.sql.functions.monotonically_increasing_i
df.select(monotonically_increasing_id()).show(2)
- 处理字符串类型
1)大小写转换
initcap:将字符串在每个单词的首字母转大写
lower:字符串转小写
upper:字符串转大写
ltrim:清除字符串左边空字符
rtrim:清除字符串右边空字符
trim:清除字符串两边边空字符
lpad:(待确认)
rpad:(待确认)
import org.apache.spark.sql.functions.{lit, ltrim, rtrim, rpad, lpad, trim}
df.select(
ltrim(lit(" HELLO ")).as("ltrim"),
rtrim(lit(" HELLO ")).as("rtrim"),
trim(lit(" HELLO ")).as("trim"),
lpad(lit("HELLO"), 3, " ").as("lp"),
rpad(lit("HELLO"), 10, " ").as("rp")).show(2)
2)正则
regexp_extract和regexp_replace:分别用于提取和替换值
//将列中颜色都替换为COLOR
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// "|"在正则表达式中是"或"的意思,构建的正则表达式为'BLACK|WHITE|RED|GREEN|BLUE'
df.select(
regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
col("Description")).show(2)
//提取颜色数据
import org.apache.spark.sql.functions.regexp_extract
val regexString = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")
// "|"是正则表达式中的"或"的意思,构建的正则表达式'(BLACK|WHITE|RED|GREEN|BLUE)'
df.select(
regexp_extract(col("Description"), regexString, 1).alias("color_clean"),
col("Description")).show(2)
translate:直接替换
import org.apache.spark.sql.functions.translate
df.select(translate(col("Description"), "LEET", "1337"), col("Description"))
.show(2)
contains:检查字符串是否存在
val containsBlack = col("Description").contains("BLACK")
val containsWhite = col("DESCRIPTION").contains("WHITE")
df.withColumn("hasSimpleColor", containsBlack.or(containsWhite))
.where("hasSimpleColor")
.select("Description").show(3, false)
当需要匹配查找的值很多时,可以使用以下方式(暂不理解)
val simpleColors = Seq("black", "white", "red", "green", "blue")
val selectedColumns = simpleColors.map(color => {
col("Description").contains(color.toUpperCase).alias(s"is_$color")
}):+expr("*") // could also append this value
df.select(selectedColumns:_*).where(col("is_white").or(col("is_red")))
.select("Description").show(3, false)
//在这种情况下,用Python来实现就很容易。将使用locate函数,它返回整数位置 (从1开始),然后将返回值转换为布尔值之后再使用它
- 处理日期和时间戳
current_date,current_timestamp:构建日期和时间戳
date_add,date_sub:日期的增减
datediff:日期之间相隔的天数
months_between:日期之间相隔的月数
to_date:字符串转换为日期
to_timestamp:字符串转换为时间戳
import org.apache.spark.sql.functions.{date_add, date_sub}
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)
import org.apache.spark.sql.functions.{datediff, months_between, to_date}
dateDF.withColumn("week_ago", date_sub(col("today"), 7))
.select(datediff(col("week_ago"), col("today"))).show(1)
dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))
.select(months_between(col("start"), col("end"))).show(1)
import org.apache.spark.sql.functions.{to_date, lit}
spark.range(5).withColumn("date", lit("2017-01-01"))
.select(to_date(col("date"))).show(1)
import org.apache.spark.sql.functions.to_date
val dateFormat = "yyyy-dd-MM"
val cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")
import org.apache.spark.sql.functions.to_timestamp
cleanDateDF.select(to_timestamp(col("date”), dateFormat)).show()
【注意】在字符串转时间格式时,如果不指定格式直接进行隐式转换,当字符串格式不符时程序不会报错,只会处理成null,因此不建议在生产中使用隐式转换。
日期的比较:直接使用lit引用的字符串会字符串文本进行比较
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()
- 处理数据中的空值
在spark中建议使用null表示空值。
coalesce:返回一组列中第一个非空值
ifnull:如果第一个值为空, 则允许选择第二个值, 并将其默认为第一个
nullif:如果两个值相等, 则返回null, 否则返回第二个值
nvl:如果第一个值为null,则返回第二个值,否则返回第一个
nvl2:如果第一个不为null,返回第二个值;否则,它将返回最后一个指定值
SELECT
ifnull(null, 'return_value'),
nullif('value', 'value'),
nvl(null, 'return_value'),
nvl2('not_null', 'return_value', "else_value")
FROM dfTable LIMIT 1
--DataFrame上的select表达式中使用它们同理
drop:用于删除包含null的行,默认删除包含null值的行
df.na.drop()
//指定“any”作为参数,当存在一个值是null时,就删除改行
df.na.drop("any")
//指定“all”作为参数,当所有的值为null或者NaN时才能删除该行
df.na.drop("all")
//指定某几个字段同时为空时删除
df.na.drop("all", Seq("StockCode", "InvoiceNo"))
fill:使用一组值填充一列或多列
//将某字符串类型列中null值替换为指定字符串
df.na.fill("All Null values become this string")
//填充int类型的列中的null,其他类型同理
df.na.fill(5:Integer)
df.na.fill(5:Double)
//填充多列
df.na.fifill(5, Seq("StockCode", "InvoiceNo"))
//使用map填充多列,key为列名,value为填充值
val fifillColValues = Map("StockCode" -> 5, "Description" -> "No Value")
df.na.fifill(fifillColValues)
repalce:替换操作
df.na.replace("Description", Map("" -> "UNKNOWN"))
asc_nulls_first、desc_nulls_first、asc_nulls_last、desc_nulls_last:对null值排序
- 处理复杂类型
1)结构体
可以将结构体STRUCT视为DF中的DF。
//从DF中构建STRUCT
df.selectExpr("(Description, InvoiceNo) as complex", "*")
df.selectExpr("struct(Description, InvoiceNo) as complex", "*")
//操作STRUCT
//创建
import org.apache.spark.sql.functions.struct
val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
//访问
complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))
//访问结构体中的所有列
complexDF.select("complex.*")
2) 数组
主要操作是将DF中的列转换为数组类型和使用这些数组。
//构造数组
import org.apache.spark.sql.functions.split
df.select(split(col("Description"), " ")).show(2)
//通过下标访问数组元素
df.select(split(col("Description"), " ").alias("array_col"))
.selectExpr("array_col[0]").show(2)
//获取数组大小
import org.apache.spark.sql.functions.size
df.select(size(split(col("Description"), " "))).show(2)
//判断数组是否包含某些元素
import org.apache.spark.sql.functions.array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
//展开数组构造列
import org.apache.spark.sql.functions.{split, explode}
df.withColumn("splitted", split(col("Description"), " "))
.withColumn("exploded", explode(col("splitted")))
.select("Description", "InvoiceNo", "exploded").show(2)
3)map
//构建map类型的列
import org.apache.spark.sql.functions.map
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2)
//使用map类型的列
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
//展开map类型构造为列
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
.selectExpr("explode(complex_map)").show(2)
- 处理JSON类型
Spark中对json数据有独特的支持。
//构建json字符串的列
val jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
//查询json对象,仅有一层嵌套时可以使用json_tuple
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
jsonDF.select(get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",
json_tuple(col("jsonString"), "myJSONKey")).show(2)
//StructType转为json
import org.apache.spark.sql.functions.to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")
.select(to_json(col("myStruct")))
//解析json数据
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val parseSchema = new StructType(Array(
new StructField("InvoiceNo",StringType,true),
new StructField("Description",StringType,true)))
df.selectExpr("(InvoiceNo, Description) as myStruct")
.select(to_json(col("myStruct")).alias("newJSON"))
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)
- 用户自定义函数(UDF)
//第1步:设计一个程序实现接受一个数字并返回它的三次幂。
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
//第2步:注册UDF,只能在DF操作中使用
import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)
//第3步:使用udf
udfExampleDF.select(power3udf(col("num"))).show()
//还可以将函数注册为sparkSQL函数,以便于在sparksql中使用
spark.udf.register("power3", power3(_:Double):Double)
udfExampleDF.selectExpr("power3(num)").show(2)
【注意】所以建议使用 Scala 或 Java编写UDF,不仅编写程序的时间少,还能提高性能。使用Python做UDF的代价较高。
为了确保我们的函数正常工作,还要做的一件事是指定返回类型。最后,还可以使用Hive语法来创建UDF/UDAF。为了实现这一点, 首先必须在创建SparkSession 时启用Hive支持(SparkSession.builder().enableHiveSupport()来启用)。然后, 你可以在SQL中注册UDF。这仅支持预编译的Scala和Java包, 因此你需要将它们指定为依赖项。
CREATE TEMPORARY FUNCTION myFunc AS 'com.organization.hive.udf.FunctionName'
此外, 还能通过删除TEMPORARY将其注册为Hive Metastore中的永久函数。
6、聚合操作
聚合操作是数据分析中的常用基本操作。一般情况下用户会对数据进行分组后的各组内数据的行汇总,汇总运算可能是求和、累乘、计数等。spark中可以将任何类型的值聚合称为array、list、map等。
测试数据准备:
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/all/*.csv")
.coalesce(5) //重新分区减少分区数,防止小文件过多
df.cache() //缓存起来以便于快速访问
df.createOrReplaceTempView("dfTable")
//执行最简单的聚合操作
df.count() //count()是一个action操作,还有个作用是将整个DF缓存到内存
- 聚合函数
大多数聚合函数可以在org.apache.spark.sql.functions包中找到。Scala和python中导入的函数与SQL中的函数有些不一样。
1)count
在用作聚合函数时,count是一个transformation操作。可以指定列计数,也可以使用*或1整行计数.
import org.apache.spark.sql.functions.count
df.select(count("StockCode")).show()
【注意】执行count(*)时,spark会对null进行计数,指定某列计数时不会对null进行计数。
2)countDistinct
去重计数,只在正对某列计数时使用才有意义。
import org.apache.spark.sql.functions.countDistinct
df.select(countDistinct("StockCode")).show()
3)approx_count_distinct
处理超大型数据时,近似地统计数据量,可以指定参数调节近似度。
import org.apache.spark.sql.functions.approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()
4)first和last
基于DF中行的顺序获取首尾位置的值。
5)min和max
提取DF中的最值。
6)sum 累加值、sumDistinct 去重累加、avg 求均值、mean 求均值
import org.apache.spark.sql.functions.{first, last}
df.select(first("StockCode"), last("StockCode")).show()
import org.apache.spark.sql.functions.{min, max}
df.select(min("Quantity"), max("Quantity")).show()
import org.apache.spark.sql.functions.sum
df.select(sum("Quantity")).show()
import org.apache.spark.sql.functions.sumDistinct
df.select(sumDistinct("Quantity")).show()
import org.apache.spark.sql.functions.{sum, count, avg, expr}
df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()
7)方差和标准差
spark中支持样本的标准差方差计算,也支持总体的方差标准差计算。
variance函数和stddev函数默认是计算样本方差和样本标准差。
import org.apache.spark.sql.functions.{var_pop, stddev_pop}
import org.apache.spark.sql.functions.{var_samp, stddev_samp}
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()
8)偏度系数和峰度系数
偏度系数(skewness)和峰度系数(kurtosis)都是对数据集中的极端数据点的衡量指标。偏度系数衡量数据相对于平均值的不对称程度,而峰度系数衡量数据分布形态陡缓程度。在将数据建模为随机变量的概率分布时,它们都很重要。
import org.apache.spark.sql.functions.{skewness, kurtosis}
df.select(skewness("Quantity"), kurtosis("Quantity")).show()
9)协方差和相关性
比较两个不同列的值之间的相互关系。相关性采用Pearson相关系数来衡量,范围是-1~+1。协方差的范围由数据中的输入决定。协方差分样本协方差和整体协方差,需要在使用时指定。
import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity")).show()
10)聚合输出复杂类型
用户可以在流水线处理的后续操作中再访问该集合,或者将整个集合传递给用户自定义函数(UDF)
import org.apache.spark.sql.functions.{collect_set, collect_list}
df.agg(collect_set("Country"), collect_list("Country")).show()
- 分组
分两个阶段进行分组:首先指定要对其进行分组的一列或多列,然后指定一个或多个聚合操作。
df.groupBy("InvoiceNo", "CustomerId").count().show()
1)使用表达式分组
如前所述,计数有点特殊,因为它是作为一种方法存在的。为此,通常我们更喜欢使用count函数。我们不是将该函数作为表达式传递到select语句中,而是在agg中指定它。这使得仅需指定一些聚合操作,即可传入任意表达式。甚至可以在转换某列之后给它取别名,以便在之后的数据流处理中使用。
import org.apache.spark.sql.functions.count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()
2)使用map进行分组
有时将转换操作指定为一系列Map会更方便,其中键为列,值为要执行的字符串形式的聚合函数。如果以inline方式指定也可以重用多个列名。
df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()
- window函数
spark支持三种串口函数:排名函数、解析函数、聚合函数。
//在原DF中添加date列,该列只包含日期信息
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),
"MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
//第一步:创建窗口规范
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
.partitionBy("CustomerId", "date") //根据指定列进行分组,与之前认识的partition by无关
.orderBy(col("Quantity").desc) //分组内的排序规则
.rowsBetween(Window.unboundedPreceding, Window.currentRow) //frame配置即window中的数据范围,本例中指包含当前行和之前所有行的数据范围
//第二步:编写业务需求,求有史以来最大购买量和对应客户
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
//创建购买量排名,使用dense_rank而不是rank,是为了避免在有等值的情况下避免排序结果不连续
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
//第三步:使用select执行并查看计算结果
import org.apache.spark.sql.functions.col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()
3)分组集 GROUPING SET
分组集是用于将多组聚合操作组合在一起的底层工具,使得能够在group-by语句中创建任意的聚合操作。分组集需要过滤null值,否则会得到错误的结果。分组集就是实现了将各种分组统计的结果union到一起。
//去掉可空值,准备测试数据
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
--SQL实现分组集操作
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC
--GROUPING SET 实现分组集
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC
--多分组聚合
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC
GROUPING SET只能在SQL中使用,在DF操作中,需要使用rollup和cube实现。
rollup
当我们设置分组的key为多个列时,Spark会分析这些列,并根据各列中存在的实际数值,确定列值组合作为分组的key。而rollup分组聚合是一种多维聚合操作,可以执行多种不同group-by风格的计算。
//根据Date和Country创建rollup分组。结果包含:Date分组聚合的数据、以Date和Country分组聚合的数据以及所有数据聚合的数据。
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
.orderBy("Date")
rolledUpDF.show()
//得到的结果中有列存在null值:每列的null值表示不区分该列的总数(比如Country列为null值表示该日期所有地点的总数),而如果在两列中都是null值则表示所有日期和地点的总数
cube
cube分组聚合是对所有参与的列值进行所有维度的全组合聚合。
//获得所有Date所有Country的聚合数据、每个Date所有Country的聚合数据、每个Date每个Country的聚合数据、所有Date每个Country的聚合数据
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show()
对元数据进行分组
使用cube和rollup进行分组聚合时,可以增加一列grouping_id表示聚合级别。示例:
[图片上传失败...(image-ebab88-1640316912601)]
import org.apache.spark.sql.functions.{grouping_id, sum, expr}
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()
透视转换
透视转换可以根据某列中的不同行创建多个列。
//使用透视转换,在使用了透视转换后,现在DataFrame会为每一个Country和数值型列组合产生一个新列,以及之前的date列。
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
//查看透视转换后的数据
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
- 用户自定义聚合函数 UDAF
若要创建UDAF,必须继承UserDefinedAggregateFunction基类并实现以下方法:
inputSchema用于指定输入参数,输入参数类型为StructType。
bufferSchema用于指定UDAF中间结果,中间结果类型为StructType。
dataType用于指定返回结果,返回结果的类型为DataType。
deterministic是一个布尔值,它指定此UDAF对于某个输入是否会返回相同的结果。
initialize初始化聚合缓冲区的初始值。
update描述应如何根据给定行更新内部缓冲区。
merge描述应如何合并两个聚合缓冲区。
evaluate将生成聚合最终结果。
//编写UDAF函数,实现返回(给定列)所有的行是否为 true;如果不是,则返回 false
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction{
def inputSchema:org.apache.spark.sql.types.StructType =
StructType(StructField("value",BooleanType) :: Nil)
def bufferSchema:StructType = StructType(
StructField("result",BooleanType) :: Nil
)
def dataType: DataType = BooleanType
def determinstic: Boolean = true
def initialize(buffer:MutableAggregationBuffer):Unit = {
buffer(0) = true
}
def update(buffer:MutableAggregationBuffer,input:Row):Unit = {
buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
}
def merge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit = {
buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
}
def evaluate(buffer:Row):Any = {
buffer(0)
}
}
//实例化该类并注册成函数在表达式中调用
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
.selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
.selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
.select(ba(col("t")), expr("booland(f)"))
.show()
【注意】UDAF仅在Scala和java中可用。
7、连接操作
连接操作不局限于单个数据集,结合spark的能力,可以实现各种数据使用。
Spark中提供的连接类型:
- inner join,内连接,保留左、右数据集内某个键都存在的行
- outer join,外链接,保留左侧或右侧数据集中具有某个键的行
- left outer join,左外连接,保留左侧数据集中具有某个键的行
- right outer join,右外连接,保留右侧数据集中具有某个键的行
- left semi join,左半连接,如果某键在右侧数据行中出现,则保留且仅保留左侧数据行
- left anti join,左反连接,如果某键在右侧数据行中没出现,则保留且仅保留左侧数据行
- nutural join,自然连接,通过隐式匹配两个的数据集之间具有相同名称的列来执行连接
- cross join,笛卡尔连接,将左侧数据集中的每一行与右侧数据集中的每一行匹配
//准备示例数据
val person = Seq(
(0, "Bill Chambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")
val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley"))
.toDF("id", "degree", "department", "school")
val sparkStatus = Seq(
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor"))
.toDF("id", "status")
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")
//1、内连接
//编写连接条件
val joinExpression = person.col("graduate_program") === graduateProgram.col("id")
//使用连接,默认就是内连接
person.join(graduateProgram, joinExpression).show()
var joinType = "inner" //指定连接参数
//使用连接参数指定内连接
person.join(graduateProgram, joinExpression, joinType).show()
//相当于SQL
SELECT * FROM person INNER JOIN graduateProgram ON person.graduate_program = graduateProgram.id
//2、外链接
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()
//相当于SQL
SELECT * FROM person FULL OUTER JOIN graduateProgram ON graduate_program = graduateProgram.id
//3、左外连接
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()
//4、右外连接
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()
//5、左半连接
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()
val gradProgram2 = graduateProgram.union(Seq(
(0, "Masters", "Duplicated Row", "Duplicated School")).toDF())
gradProgram2.createOrReplaceTempView("gradProgram2")
gradProgram2.createOrReplaceTempView("gradProgram2")
gradProgram2.join(person, joinExpression, joinType).show()
//6、左反连接
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()
//7、自然连接
//自然连接(natural join)不必指定连接条件,它对你执行连接操作中可能要基于的列进行隐式猜测(往往是根据相同的列名),找出匹配列并返回连接结果。目前自然连接支持左连接、右连接和外连接。隐示的操作始终是危险的!
//8、笛卡尔连接(交叉连接)
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()
//也可以显式地使用笛卡尔连接
person.crossJoin(graduateProgram).show()
//【注意】只有在真的百分之百需要交叉连接的时候才使用它。在Spark中定义交叉连接时,需要清楚这种做法很危险!数据量会剧增。
连接操作常见问题与解决方案
- 对复杂类型的连接操作
任何返回Boolean值的表达式都是有效的连接。
import org.apache.spark.sql.functions.expr
person.withColumnRenamed("id", "personId")
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()
//相当于SQL
SELECT * FROM
(select id as personId, name, graduate_program, spark_status FROM person)
INNER JOIN sparkStatus ON array_contains(spark_status, id)
- 处理重复的列名
当两个DF关联时,如果判断条件中的列名相同会报错。
//构建测试数据集
val gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
val joinExpr = gradProgramDupe.col("graduate_program") === person.col(
"graduate_program")
person.join(gradProgramDupe,joinExpr).select("graduate_program").show() //直接使用就会报错
//解决办法一:将布尔表达式改为字符串表达式
person.join(gradProgramDupe,"graduate_program").select("graduate_program").show()
//解决办法二:在链接后删除同名列
person.join(gradProgramDupe, joinExpr).drop(person.col(“graduate_program"))
.select(“graduate_program").show()
//解决办法三:在链接前将同名列重命名
val gradProgram3 = graduateProgram.withColumnRenamed("id","grad_id")
val joinExpr = person.col("graduate_program") === gradProgram3.col("grad_id")
person.join(gradProgram3,joinExpr).show()
8、数据源
spark的核心数据源:
1)CSV
2)JSON
3)Parquet
4)ORC
5)JDBC/ODBC连接
6)纯文本文件
社区数据源:
1)Cassandra
2)HBase
3)MongoDB
4)AWS Redshift
5)XML
6)其他数据源
8.1 数据源API(DataSource API)
- Read API的结构:
DataFrameReader.format(...).option("key", "value").schema(...).load()
此结构可以用来读取所有的数据源。参数介绍:
DataFrameReader:通过sparkSession的read属性得到,spark.read
format:指定要解析的数据源,默认为Parquet
schema:选择指定的schema
option:配置k-v对来执行读取数据的方式
spark.read.format(“csv")
.option(“mode", “FAILFAST")
.option(“inferSchema", “true")
.option(“path", “path/to/file(s)")
.schema(someSchema)
.load()
读取模式参考:
[图片上传失败...(image-55e993-1640316912601)]
默认是permissive。
- Write API结构
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
DataFrameWriter:通过DataFrame的write属性来获取,DF.write
format:写数据格式,默认为Parquet
option:k-v对配置写出数据的方法,PartitionBy,bucketBy和sortBy仅适用于基于文件的数据源,可以基于这些方法控制写出目标文件的具体结构
save模式:option中配置的保存模式
dataframe.write.format(“csv")
.option(“mode", “OVERWRITE")
.option(“dateFormat", “yyyy-MM-dd")
.option(“path", “path/to/file(s)")
.save()
保存模式参考:
默认值为errorIfExists,也就是说如果目标路径已有数据存在数据则Spark立即写入失败。
对各种数据源的option选项参见《权威指南》157页-167页
关于SQL数据库、纯文本文件、高级IO分片分桶等介绍见1665页-177页