Spark实战第二版(涵盖Spark3.0)-第15章. 聚合数据

15.3 使用UDAFs构建自定义聚合

在前面的小节中,您快速回顾了聚合数据,在简单数据集上执行了聚合操作,并最终处理了真实的数据。在这些操作中,使用了包括max()、avg()和min()在内的标准聚合操作。Spark并没有实现所有可能的数据聚合。

在本节中,您将通过构建自己的聚合函数来扩展Spark。用户定义的聚合函数(UDAFs),可以执行自定义聚合。

想象一下下面的用例:您是一个在线零售商,想要给客户忠诚度积分。每位顾客每订购一件商品可得一分,但每次订购最多可得三分。

解决这个问题的一种方法是在您的order dataframe中添加一个point列并匹配点归属规则,但是您将使用一个聚合函数来解决这个问题(您可以自己轻松地使用point列来解决这个问题)。

图15.6显示了将要使用的数据集。它类似于本章第一节15.1节中所使用的方法。

图15.6 应用自定义UDAF来计算每个客户每个订单获得多少忠诚点

操作的结果是一个客户及其关联点的列表,如下面的清单所示。

#清单15.23客户及其关联积分

+------------+--------+-----+-------------+-----+

| firstName|lastName|state|sum(quantity)|point|

+------------+--------+-----+-------------+-----+| 

Ginni      | Rometty| NY  | 7          | 3  |

|Jean-Georges| Perrin | NC  | 3          | 3  |

| Holden    | Karau  | CA  | 10          | 6  |

|Jean-Georges| Perrin | CA  | 4          | 3  | 

+------------+--------+-----+-------------+-----+

实验

这个实验的代码在net.jgp.books.spark.ch13.lab400_udaf包。该应用程序称为PointsPerOrderApp.java,UDAF代码在PointAttributionUdaf.java中

调用UDAF并不比调用任何聚合函数复杂。有几个步骤:

在Spark会话中使用udf().register()方法注册这个函数。

使用callUDF()函数调用该函数。

下面的清单显示了调用UDAF的过程。

//清单15.24注册和调用UDAF

package net.jgp.books.spark.ch13.lab400_udaf;

import static org.apache.spark.sql.functions.callUDF;

import static org.apache.spark.sql.functions.col;

import static org.apache.spark.sql.functions.sum;

import static org.apache.spark.sql.functions.when;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class PointsPerOrderApp {

    public static void main(String[] args ) {

      PointsPerOrderApp app = new PointsPerOrderApp();

      app .start();

    }

    private void start() {

      SparkSession spark = SparkSession.builder()

            .appName( "Orders loyalty point" )

            .master( "local[*]" )

            .getOrCreate();

      spark

.udf().register( "pointAttribution" , new PointAttributionUdaf());

      Dataset<Row> df = spark .read().format( "csv" )

            .option( "header" , true )

            .option( "inferSchema" , true )

            .load( "data/orders/orders.csv" );

      Dataset<Row> pointDf = df

col( "firstName" ), col( "lastName" ), col( "state" ))

            .agg(

sum( "quantity" ),

callUDF( "pointAttribution" , col( "quantity" )).as( "point" ));

      pointDf .show(20);

    }

}

调用UDAF就像这样简单:

callUDF( "pointAttribution" , col( "quantity" ))

在这种情况下,您的UDAF只接受一个参数,但如果有必要,函数可以接受多个参数。如果UDAF需要更多的参数,只需添加这些参数:将它们添加到您的调用和输入模式中(参见清单15.24)。

在深入代码之前,让我们先理解UDAF的架构。每一行将被处理,结果可以存储在一个聚合缓冲区中(在worker点上)。请注意,缓冲区不必反映传入数据的结构:您将定义它的模式,它可以存储其他元素。图15.7说明了聚合及其结果缓冲区的机制。

图15.7 当你的代码正在分析数据集的每一行时,中间结果可以保存在一个缓冲区中。

现在,让我们看看如何实现UDAF本身。当你的应用程序调用这个函数时,它就是一个函数;然而,当涉及到它的实现时,它是一个完整的类。这个类必须扩展UserDefinedAggregateFunction(在org.apache .spark.sql.expressions包)。

因此,实现UDAF的类必须实现以下方法:

bufferSchema()——定义缓冲区的模式。

dataType()——表示来自聚合函数的数据类型。

deterministic()——当Spark通过分割数据来执行时,它会分别处理数据块并将它们组合在一起。如果UDAF逻辑使结果独立于处理和组合数据的顺序,则UDAF是确定性的。

evaluate()——根据给定的聚合缓冲区计算该UDAF的最终结果。

initialize()——初始化给定的聚合缓冲区。

inputSchema()——描述发送到UDAF的输入的模式。

merge()——合并两个聚合缓冲区并存储更新后的缓冲区值。当我们将两个部分聚合的数据元素合并在一起时,将调用此方法。

update()——用新的输入数据更新给定的聚合缓冲区。每个输入行调用一次此方法。

现在,您拥有了构建UDAF所需的所有元素,如清单15.25所示。注意,这个类继承了UserDefinedAggregateFunction,它实现了Serializable。您将需要定义一个serialVersionUID变量,但最重要的是,该类的每个元素也需要是可序列化的。

//清单15.25关注于UDAF:

inputSchema()方法定义了发送给函数的数据的模式。在本例中,您接收到的是一个整数,表示订单中的原始项数。Spark中的一个模式,你已经用过几次了,是用StructType实现的:

@Override    public StructType inputSchema() {      List<StructField> inputFields = new ArrayList<>();      inputFields .add(            DataTypes.createStructField( "_c0" , DataTypes. IntegerType , true ));      return DataTypes.createStructType( inputFields );     }

bufferSchema()方法定义聚合缓冲区的模式,用于存储中间结果。在本例中,您只需要一列存储整数。对于更复杂的聚合流程,可能需要更多的列。

@OverridepublicStructTypebufferSchema(){List bufferFields =newArrayList<>();      bufferFields .add(DataTypes.createStructField("sum", DataTypes. IntegerType ,true));returnDataTypes.createStructType( bufferFields );    }@OverridepublicDataTypedataType(){returnDataTypes. IntegerType ;    }@Overridepublicbooleandeterministic(){returntrue;    }

很好,initialize()方法初始化内部缓冲区。在这种情况下,由于这是一个相当简单的聚合,缓冲区将被设置为0。

然而,由类履行的契约需要遵循这个基本规则。在两个初始缓冲区上应用merge()方法应该返回初始缓冲区本身;例如,merge(initialBuffer, initialBuffer) = initialBuffer。

@Overridepublicvoidinitialize(MutableAggregationBuffer buffer ){      buffer .update(0,0);    }

该操作发生在update()方法中。您将在这里处理数据。你接收到的缓冲区可能包含数据,也可能不包含数据,所以不能忽略它:在第一次调用中,它将不包含初始化数据以外的数据。然而,在随后的调用中,数据已经在缓冲区中了,所以不应该忽略它:

@Overridepublicvoidupdate(MutableAggregationBuffer buffer , Row input ){...intinitialValue = buffer .getInt(0);intinputValue = input .getInt(0);intoutputValue =0;if( inputValue < MAX_POINT_PER_ORDER ) {        outputValue = inputValue ;}else{        outputValue = MAX_POINT_PER_ORDER ;      }      outputValue += initialValue ;buffer .update(0, outputValue );    }

merge()方法合并两个聚合缓冲区,并将更新后的缓冲区值存储回聚合缓冲区中。在这个场景中,当有两个包含忠诚度点的缓冲区时,只需相加它们:

@Override    public void merge(MutableAggregationBuffer buffer , Row row ) {buffer.update(0,buffer.getInt(0) +row.getInt(0));    }

最后,evaluate()方法根据给定的聚合缓冲区计算这个UDAF的最终结果:

@OverridepublicIntegerevaluate(Row row ){returnrow .getInt(0);    } }

在本节中,您已经使用并构建了自己的用户定义聚合函数,这有点棘手。您遵循的用例是一个简单的忠诚度点归属,但您可以想象其他类型的场景。

如果您有兴趣了解更多关于聚合如何工作的信息,可以在Log4j.properties中激活跟踪日志:

log4j.logger.net.jgp= DEBUG//修改为log4j.logger.net.jgp= TRACE

在下一次执行时,你将得到详细的输出:

...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...pdate(PointAttributionUdaf.java:92):->update(),inputrowhas1args...pdate(PointAttributionUdaf.java:97):->update(0, 1) ...

总结

聚合是一种对数据进行分组的方法,这样您就可以从更高或更宏观的级别查看数据。

Apache Spark可以使用Spark SQL(通过创建一个视图)或dataframe API对dataframe进行聚合。

groupBy()方法等价于SQL GROUP BY语句。

在执行聚合之前,需要准备和清理数据。这些步骤可以通过转换来完成(第12章)。

聚合可以通过groupBy()方法之后链接的方法执行,也可以通过agg()方法内部的静态函数执行。

Spark的聚合可以通过自定义的自定义聚合函数(UDAFs)进行扩展。

一个UDAF必须在你的Spark会话中通过名字注册。

使用callUDF()方法和UDAF名称来调用UDAF。

一个UDAF作为一个类实现,它应该实现几个方法。

使用agg()方法一次对多个列执行聚合。

您可以使用sum()方法和静态函数来计算集合的和。

可以使用avg()方法和静态函数来计算集合的平均值。

可以使用max()方法和静态函数来提取集合的最大值。

可以使用min()方法和静态函数来提取集合的最小值。

其他聚合函数包括许多统计方法,如:approx_count_distinct() , collect_list() , collect_set() , corr() , count() , countDistinct() , covar_pop() , covar_samp() , first() , grouping() , grouping _id() , kurtosis() , last() , mean() , skewness() , stddev() , stddev_pop() , stddev_samp() , sumDistinct() , var_pop() , var_samp() , 和variance()

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容