《Spark: The Definitive Guide 》第7章:Aggregations 聚合 中文学习笔记

注意:
如需执行如下代码,请从官方github下载数据包 , 安装所需spark环境
执行如下创建Dataframe数据集代码创建好所需的Dataframe才能用接下来的代码对数据进行操作。
为了美观下面的例如.option() .load()为换行展示,真正输入代码时要在一行输入。

所需创建的DataFrame数据集(第四行数据集路径根据自己下载到本地的数据集地址进行修改):

// Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/all/*.csv")
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

7.3 Window Functions 窗口函数

在Spark里面每一行数据就是一个row,窗口函数是指定所需行(row)的数据组成一个数据集进行计算,窗口可以进行排名,分析,聚合操作。


窗口示例

下面对数据集进行处理,组成新的dfWithDate, 每行添加日期以便更好的直观的展现操作。

// Scala
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
原始DF和添加日期DF比较

第一步

就是利用窗口函数创建一个窗口,partitionBy是根据客户的id和购买日期进行分组,rowsBetween代表了这个窗口里面包含了哪几行(实例中是从"前面所有行"->"当前行")

// Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

第二步

下面根据这个窗口计算并返回所需列

  • 求出每个用户购买最多的股票Stock数目
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
  • 求出该顾客购买不同股票Stock数量的排名,dense rank表明按照排名往下排序不和数量绑定排名例如有1个第一名和2个第二名那么接下来的第三名则是从正常的第三名开始计数,而rank 则是从第四名开始计数。
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)

第三步

这时候maxPurchaseQuantity, purchaseDenseRank, purchaseRank将会返回三个列,下面我们将这三个列和指定的dataframe的列一同返回得出所需结论:

// Scala
import org.apache.spark.sql.functions.col

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

结果如下:


image.png

7.4 Grouping Sets 分组集合

group-by可以使我们对组内进行聚合操作,但是有些时候我们需要跨组进行操作,这个时候我们就需要用到分组集合。

首先我们对上面已经处理好的dfWithDate 这个Dataframe进行去空.drop()操作:

// in Scala
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

注意: Grouping Sets对空值有效,所以需要去除空值保证结果不受影响。

下面通过SQL来得出所有的股票代码和持有该股票的用户以及每个用户持有该股票的总数目:

-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

如果我们想求整个股票的数量而不是根据客户和股票分组则通过下面这个SQL语句:

-- SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC

然而分组集合仅仅在SQL里面提供,spark则使用 rullup 和 cube.

Rollups

目前讨论的都是显式数据集,及提供确定的数据集给Spark进行操作,下面将根据日期+不同国家股票创建rollups求出每个日期下面不同国家股票的购买总数:
Rollup可以返回的结果:

  • 单个日期不同国家股票购买总数
  • 单个日期所有国家股票购买总数
  • 所有日期所有国家股票购买总数
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
  .orderBy("Date")
rolledUpDF.show()

结果如下:


image.png

可以看到上图结果两个都为空的那个row为股票总数,可通过如下语句取出:

rolledUpDF.where("Country IS NULL").show()
rolledUpDF.where("Date IS NULL").show()

Cube

Cube相比Rollups更为深层次的返回结果:

  • 可以返回所有不论国家和日期的股票总额
  • 可以返回单个日期的所有国家的股票总额
  • 可以返回单个国家的每个日期的股票总额
  • 可以返回单个国家的不同日期的股票总额

和Rollup调用的方法差不多:

// Scala
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

结果如图:

image.png

image.png

Grouping Metadata

这个的意思就是希望显示的分组级别及从最细的到最粗的分组,而不是像cube那样一股脑的全部显示:

  • 级别0 可以返回所有股票总额
  • 级别1 可以返回单个客户所有购买的股票总额
  • 级别2 可以返回单个股票所有用户购买的总额
  • 级别3 可以返回单个客户单个股票编码的股票总额(最高级别)

由grouping_id来控制

// Scala
import org.apache.spark.sql.functions.{grouping_id, sum, expr}

dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()

结果和上面的cube的结果差不多,只不过根据分组级别进行了分组.

Pivot

可以将行转换为列进行操作,如下示例我们可以将给定的国家按照日期来计算股票的数额:

// Scala
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

由于数目庞大仅展示美国的日期大于2011-12-05的数额.

// Scala
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
image.png

7.5 User-Defined Aggregation Functions 用户自定义聚合函数(UDAF)

注意:
目前UDAF只能在Spark2.3以上版本及Java和Scala上实现.

可以根据业务逻辑自己制定分组聚合,然而必须继承基类:

// Scala
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", BooleanType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("result", BooleanType) :: Nil
  )
  def dataType: DataType = BooleanType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = true
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

现在我们只需注册进udf中便可使用.

// Scala
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
  .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
  .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
  .select(ba(col("t")), expr("booland(f)"))
  .show()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352