15.3 使用UDAFs构建自定义聚合
在前面的小节中,您快速回顾了聚合数据,在简单数据集上执行了聚合操作,并最终处理了真实的数据。在这些操作中,使用了包括max()、avg()和min()在内的标准聚合操作。Spark并没有实现所有可能的数据聚合。
在本节中,您将通过构建自己的聚合函数来扩展Spark。用户定义的聚合函数(UDAFs),可以执行自定义聚合。
想象一下下面的用例:您是一个在线零售商,想要给客户忠诚度积分。每位顾客每订购一件商品可得一分,但每次订购最多可得三分。
解决这个问题的一种方法是在您的order dataframe中添加一个point列并匹配点归属规则,但是您将使用一个聚合函数来解决这个问题(您可以自己轻松地使用point列来解决这个问题)。
图15.6显示了将要使用的数据集。它类似于本章第一节15.1节中所使用的方法。
图15.6 应用自定义UDAF来计算每个客户每个订单获得多少忠诚点
操作的结果是一个客户及其关联点的列表,如下面的清单所示。
#清单15.23客户及其关联积分
+------------+--------+-----+-------------+-----+
| firstName|lastName|state|sum(quantity)|point|
+------------+--------+-----+-------------+-----+|
Ginni | Rometty| NY | 7 | 3 |
|Jean-Georges| Perrin | NC | 3 | 3 |
| Holden | Karau | CA | 10 | 6 |
|Jean-Georges| Perrin | CA | 4 | 3 |
+------------+--------+-----+-------------+-----+
实验
这个实验的代码在net.jgp.books.spark.ch13.lab400_udaf包。该应用程序称为PointsPerOrderApp.java,UDAF代码在PointAttributionUdaf.java中
调用UDAF并不比调用任何聚合函数复杂。有几个步骤:
在Spark会话中使用udf().register()方法注册这个函数。
使用callUDF()函数调用该函数。
下面的清单显示了调用UDAF的过程。
//清单15.24注册和调用UDAF
package net.jgp.books.spark.ch13.lab400_udaf;
import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.sum;
import static org.apache.spark.sql.functions.when;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class PointsPerOrderApp {
public static void main(String[] args ) {
PointsPerOrderApp app = new PointsPerOrderApp();
app .start();
}
private void start() {
SparkSession spark = SparkSession.builder()
.appName( "Orders loyalty point" )
.master( "local[*]" )
.getOrCreate();
spark
.udf().register( "pointAttribution" , new PointAttributionUdaf());
Dataset<Row> df = spark .read().format( "csv" )
.option( "header" , true )
.option( "inferSchema" , true )
.load( "data/orders/orders.csv" );
Dataset<Row> pointDf = df
col( "firstName" ), col( "lastName" ), col( "state" ))
.agg(
sum( "quantity" ),
callUDF( "pointAttribution" , col( "quantity" )).as( "point" ));
pointDf .show(20);
}
}
调用UDAF就像这样简单:
callUDF( "pointAttribution" , col( "quantity" ))
在这种情况下,您的UDAF只接受一个参数,但如果有必要,函数可以接受多个参数。如果UDAF需要更多的参数,只需添加这些参数:将它们添加到您的调用和输入模式中(参见清单15.24)。
在深入代码之前,让我们先理解UDAF的架构。每一行将被处理,结果可以存储在一个聚合缓冲区中(在worker点上)。请注意,缓冲区不必反映传入数据的结构:您将定义它的模式,它可以存储其他元素。图15.7说明了聚合及其结果缓冲区的机制。
图15.7 当你的代码正在分析数据集的每一行时,中间结果可以保存在一个缓冲区中。
现在,让我们看看如何实现UDAF本身。当你的应用程序调用这个函数时,它就是一个函数;然而,当涉及到它的实现时,它是一个完整的类。这个类必须扩展UserDefinedAggregateFunction(在org.apache .spark.sql.expressions包)。
因此,实现UDAF的类必须实现以下方法:
bufferSchema()——定义缓冲区的模式。
dataType()——表示来自聚合函数的数据类型。
deterministic()——当Spark通过分割数据来执行时,它会分别处理数据块并将它们组合在一起。如果UDAF逻辑使结果独立于处理和组合数据的顺序,则UDAF是确定性的。
evaluate()——根据给定的聚合缓冲区计算该UDAF的最终结果。
initialize()——初始化给定的聚合缓冲区。
inputSchema()——描述发送到UDAF的输入的模式。
merge()——合并两个聚合缓冲区并存储更新后的缓冲区值。当我们将两个部分聚合的数据元素合并在一起时,将调用此方法。
update()——用新的输入数据更新给定的聚合缓冲区。每个输入行调用一次此方法。
现在,您拥有了构建UDAF所需的所有元素,如清单15.25所示。注意,这个类继承了UserDefinedAggregateFunction,它实现了Serializable。您将需要定义一个serialVersionUID变量,但最重要的是,该类的每个元素也需要是可序列化的。
//清单15.25关注于UDAF:
inputSchema()方法定义了发送给函数的数据的模式。在本例中,您接收到的是一个整数,表示订单中的原始项数。Spark中的一个模式,你已经用过几次了,是用StructType实现的:
@Override public StructType inputSchema() { List<StructField> inputFields = new ArrayList<>(); inputFields .add( DataTypes.createStructField( "_c0" , DataTypes. IntegerType , true )); return DataTypes.createStructType( inputFields ); }
bufferSchema()方法定义聚合缓冲区的模式,用于存储中间结果。在本例中,您只需要一列存储整数。对于更复杂的聚合流程,可能需要更多的列。
@OverridepublicStructTypebufferSchema(){List bufferFields =newArrayList<>(); bufferFields .add(DataTypes.createStructField("sum", DataTypes. IntegerType ,true));returnDataTypes.createStructType( bufferFields ); }@OverridepublicDataTypedataType(){returnDataTypes. IntegerType ; }@Overridepublicbooleandeterministic(){returntrue; }
很好,initialize()方法初始化内部缓冲区。在这种情况下,由于这是一个相当简单的聚合,缓冲区将被设置为0。
然而,由类履行的契约需要遵循这个基本规则。在两个初始缓冲区上应用merge()方法应该返回初始缓冲区本身;例如,merge(initialBuffer, initialBuffer) = initialBuffer。
@Overridepublicvoidinitialize(MutableAggregationBuffer buffer ){ buffer .update(0,0); }
该操作发生在update()方法中。您将在这里处理数据。你接收到的缓冲区可能包含数据,也可能不包含数据,所以不能忽略它:在第一次调用中,它将不包含初始化数据以外的数据。然而,在随后的调用中,数据已经在缓冲区中了,所以不应该忽略它:
@Overridepublicvoidupdate(MutableAggregationBuffer buffer , Row input ){...intinitialValue = buffer .getInt(0);intinputValue = input .getInt(0);intoutputValue =0;if( inputValue < MAX_POINT_PER_ORDER ) { outputValue = inputValue ;}else{ outputValue = MAX_POINT_PER_ORDER ; } outputValue += initialValue ;buffer .update(0, outputValue ); }
merge()方法合并两个聚合缓冲区,并将更新后的缓冲区值存储回聚合缓冲区中。在这个场景中,当有两个包含忠诚度点的缓冲区时,只需相加它们:
@Override public void merge(MutableAggregationBuffer buffer , Row row ) {buffer.update(0,buffer.getInt(0) +row.getInt(0)); }
最后,evaluate()方法根据给定的聚合缓冲区计算这个UDAF的最终结果:
@OverridepublicIntegerevaluate(Row row ){returnrow .getInt(0); } }
在本节中,您已经使用并构建了自己的用户定义聚合函数,这有点棘手。您遵循的用例是一个简单的忠诚度点归属,但您可以想象其他类型的场景。
如果您有兴趣了解更多关于聚合如何工作的信息,可以在Log4j.properties中激活跟踪日志:
log4j.logger.net.jgp= DEBUG//修改为log4j.logger.net.jgp= TRACE
在下一次执行时,你将得到详细的输出:
...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...alize(PointAttributionUdaf.java:79):->initialize()-bufferas1row(s)...pdate(PointAttributionUdaf.java:92):->update(),inputrowhas1args...pdate(PointAttributionUdaf.java:97):->update(0, 1) ...
总结
聚合是一种对数据进行分组的方法,这样您就可以从更高或更宏观的级别查看数据。
Apache Spark可以使用Spark SQL(通过创建一个视图)或dataframe API对dataframe进行聚合。
groupBy()方法等价于SQL GROUP BY语句。
在执行聚合之前,需要准备和清理数据。这些步骤可以通过转换来完成(第12章)。
聚合可以通过groupBy()方法之后链接的方法执行,也可以通过agg()方法内部的静态函数执行。
Spark的聚合可以通过自定义的自定义聚合函数(UDAFs)进行扩展。
一个UDAF必须在你的Spark会话中通过名字注册。
使用callUDF()方法和UDAF名称来调用UDAF。
一个UDAF作为一个类实现,它应该实现几个方法。
使用agg()方法一次对多个列执行聚合。
您可以使用sum()方法和静态函数来计算集合的和。
可以使用avg()方法和静态函数来计算集合的平均值。
可以使用max()方法和静态函数来提取集合的最大值。
可以使用min()方法和静态函数来提取集合的最小值。
其他聚合函数包括许多统计方法,如:approx_count_distinct() , collect_list() , collect_set() , corr() , count() , countDistinct() , covar_pop() , covar_samp() , first() , grouping() , grouping _id() , kurtosis() , last() , mean() , skewness() , stddev() , stddev_pop() , stddev_samp() , sumDistinct() , var_pop() , var_samp() , 和variance()