注意:
如需执行如下代码,请从官方github下载数据包 , 安装所需spark环境
执行如下创建Dataframe数据集代码创建好所需的Dataframe才能用接下来的代码对数据进行操作。
为了美观下面的例如.option() .load()为换行展示,真正输入代码时要在一行输入。
所需创建的DataFrame数据集(第四行数据集路径根据自己下载到本地的数据集地址进行修改):
// Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/all/*.csv")
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
7.3 Window Functions 窗口函数
在Spark里面每一行数据就是一个row,窗口函数是指定所需行(row)的数据组成一个数据集进行计算,窗口可以进行排名,分析,聚合操作。
下面对数据集进行处理,组成新的dfWithDate, 每行添加日期以便更好的直观的展现操作。
// Scala
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
第一步
就是利用窗口函数创建一个窗口,partitionBy是根据客户的id和购买日期进行分组,rowsBetween代表了这个窗口里面包含了哪几行(实例中是从"前面所有行"->"当前行")
// Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
第二步
下面根据这个窗口计算并返回所需列
- 求出每个用户购买最多的股票Stock数目
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
- 求出该顾客购买不同股票Stock数量的排名,dense rank表明按照排名往下排序不和数量绑定排名例如有1个第一名和2个第二名那么接下来的第三名则是从正常的第三名开始计数,而rank 则是从第四名开始计数。
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
第三步
这时候maxPurchaseQuantity, purchaseDenseRank, purchaseRank将会返回三个列,下面我们将这三个列和指定的dataframe的列一同返回得出所需结论:
// Scala
import org.apache.spark.sql.functions.col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()
结果如下:
7.4 Grouping Sets 分组集合
group-by可以使我们对组内进行聚合操作,但是有些时候我们需要跨组进行操作,这个时候我们就需要用到分组集合。
首先我们对上面已经处理好的dfWithDate 这个Dataframe进行去空.drop()操作:
// in Scala
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
注意: Grouping Sets对空值有效,所以需要去除空值保证结果不受影响。
下面通过SQL来得出所有的股票代码和持有该股票的用户以及每个用户持有该股票的总数目:
-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC
如果我们想求整个股票的数量而不是根据客户和股票分组则通过下面这个SQL语句:
-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC
然而分组集合仅仅在SQL里面提供,spark则使用 rullup 和 cube.
Rollups
目前讨论的都是显式数据集,及提供确定的数据集给Spark进行操作,下面将根据日期+不同国家股票创建rollups求出每个日期下面不同国家股票的购买总数:
Rollup可以返回的结果:
- 单个日期不同国家股票购买总数
- 单个日期所有国家股票购买总数
- 所有日期所有国家股票购买总数
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
.orderBy("Date")
rolledUpDF.show()
结果如下:
可以看到上图结果两个都为空的那个row为股票总数,可通过如下语句取出:
rolledUpDF.where("Country IS NULL").show()
rolledUpDF.where("Date IS NULL").show()
Cube
Cube相比Rollups更为深层次的返回结果:
- 可以返回所有不论国家和日期的股票总额
- 可以返回单个日期的所有国家的股票总额
- 可以返回单个国家的每个日期的股票总额
- 可以返回单个国家的不同日期的股票总额
和Rollup调用的方法差不多:
// Scala
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show()
结果如图:
Grouping Metadata
这个的意思就是希望显示的分组级别及从最细的到最粗的分组,而不是像cube那样一股脑的全部显示:
- 级别0 可以返回所有股票总额
- 级别1 可以返回单个客户所有购买的股票总额
- 级别2 可以返回单个股票所有用户购买的总额
- 级别3 可以返回单个客户单个股票编码的股票总额(最高级别)
由grouping_id来控制
// Scala
import org.apache.spark.sql.functions.{grouping_id, sum, expr}
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()
结果和上面的cube的结果差不多,只不过根据分组级别进行了分组.
Pivot
可以将行转换为列进行操作,如下示例我们可以将给定的国家按照日期来计算股票的数额:
// Scala
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
由于数目庞大仅展示美国的日期大于2011-12-05的数额.
// Scala
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
7.5 User-Defined Aggregation Functions 用户自定义聚合函数(UDAF)
注意:
目前UDAF只能在Spark2.3以上版本及Java和Scala上实现.
可以根据业务逻辑自己制定分组聚合,然而必须继承基类:
// Scala
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", BooleanType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("result", BooleanType) :: Nil
)
def dataType: DataType = BooleanType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = true
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
}
def evaluate(buffer: Row): Any = {
buffer(0)
}
}
现在我们只需注册进udf中便可使用.
// Scala
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
.selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
.selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
.select(ba(col("t")), expr("booland(f)"))
.show()