Spark强大的函数扩展功能

在数据分析领域中,没有人能预见所有的数据运算,以至于将它们都内置好,一切准备完好,用户只需要考虑用,万事大吉。扩展性是一个平台的生存之本,一个封闭的平台如何能够拥抱变化?在对数据进行分析时,无论是算法也好,分析逻辑也罢,最好的重用单位自然还是:函数

故而,对于一个大数据处理平台而言,倘若不能支持函数的扩展,确乎是不可想象的。Spark首先是一个开源框架,当我们发现一些函数具有通用的性质,自然可以考虑contribute给社区,直接加入到Spark的源代码中。我们欣喜地看到随着Spark版本的演化,确实涌现了越来越多对于数据分析师而言称得上是一柄柄利器的强大函数,例如博客文章《Spark 1.5 DataFrame API Highlights: Date/Time/String Handling, Time Intervals, and UDAFs》介绍了在1.5中为DataFrame提供了丰富的处理日期、时间和字符串的函数;以及在Spark SQL 1.4中就引入的Window Function

然而,针对特定领域进行数据分析的函数扩展,Spark提供了更好地置放之处,那就是所谓的“UDF(User Defined Function)”。

UDF的引入极大地丰富了Spark SQL的表现力。一方面,它让我们享受了利用Scala(当然,也包括Java或Python)更为自然地编写代码实现函数的福利,另一方面,又能精简SQL(或者DataFrame的API),更加写意自如地完成复杂的数据分析。尤其采用SQL语句去执行数据分析时,UDF帮助我们在SQL函数与Scala函数之间左右逢源,还可以在一定程度上化解不同数据源具有歧异函数的尴尬。想想不同关系数据库处理日期或时间的函数名称吧!

用Scala编写的UDF与普通的Scala函数没有任何区别,唯一需要多执行的一个步骤是要让SQLContext注册它。例如:

def len(bookTitle: String):Int = bookTitle.length

sqlContext.udf.register("len", len _)

val booksWithLongTitle = sqlContext.sql("select title, author from books where len(title) > 10")

编写的UDF可以放到SQL语句的fields部分,也可以作为where、groupBy或者having子句的一部分。

既然是UDF,它也得保持足够的特殊性,否则就完全与Scala函数泯然众人也。这一特殊性不在于函数的实现,而是思考函数的角度,需要将UDF的参数视为数据表的某个列。例如上面len函数的参数bookTitle,虽然是一个普通的字符串,但当其代入到Spark SQL的语句中,实参title实际上是表中的一个列(可以是列的别名)。

当然,我们也可以在使用UDF时,传入常量而非表的列名。让我们稍稍修改一下刚才的函数,让长度10作为函数的参数传入:

def lengthLongerThan(bookTitle: String, length: Int): Boolean = bookTitle.length > length

sqlContext.udf.register("longLength", lengthLongerThan _)

val booksWithLongTitle = sqlContext.sql("select title, author from books where longLength(title, 10)")

若使用DataFrame的API,则可以以字符串的形式将UDF传入:

val booksWithLongTitle = dataFrame.filter("longLength(title, 10)")

DataFrame的API也可以接收Column对象,可以用$符号来包裹一个字符串表示一个Column。$是定义在SQLContext对象implicits中的一个隐式转换。此时,UDF的定义也不相同,不能直接定义Scala函数,而是要用定义在org.apache.spark.sql.functions中的udf方法来接收一个函数。这种方式无需register:

import org.apache.spark.sql.functions._

val longLength = udf((bookTitle: String, length: Int) => bookTitle.length > length)

import sqlContext.implicits._
val booksWithLongTitle = dataFrame.filter(longLength($"title", $"10"))

注意,代码片段中的sqlContext是之前已经实例化的SQLContext对象。

不幸,运行这段代码会抛出异常:

cannot resolve '10' given input columns id, title, author, price, publishedDate;

因为采用$来包裹一个常量,会让Spark错以为这是一个Column。这时,需要定义在org.apache.spark.sql.functions中的lit函数来帮助:

val booksWithLongTitle = dataFrame.filter(longLength($"title", lit(10)))

普通的UDF却也存在一个缺陷,就是无法在函数内部支持对表数据的聚合运算。例如,当我要对销量执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用同比公式进行计算。此时,UDF就无能为力了。

该UDAF(User Defined Aggregate Function)粉墨登场的时候了。

Spark为所有的UDAF定义了一个父类UserDefinedAggregateFunction。要继承这个类,需要实现父类的几个抽象方法:

def inputSchema: StructType

def bufferSchema: StructType

def dataType: DataType

def deterministic: Boolean

def initialize(buffer: MutableAggregationBuffer): Unit

def update(buffer: MutableAggregationBuffer, input: Row): Unit

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit

def evaluate(buffer: Row): Any

可以将inputSchema理解为UDAF与DataFrame列有关的输入样式。例如年同比函数需要对某个可以运算的指标与时间维度进行处理,就需要在inputSchema中定义它们。

  def inputSchema: StructType = {
    StructType(StructField("metric", DoubleType) :: StructField("timeCategory", DateType) :: Nil)
  }

代码创建了拥有两个StructFieldStructTypeStructField的名字并没有特别要求,完全可以认为是两个内部结构的列名占位符。至于UDAF具体要操作DataFrame的哪个列,取决于调用者,但前提是数据类型必须符合事先的设置,如这里的DoubleTypeDateType类型。这两个类型被定义在org.apache.spark.sql.types中。

bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema,例如我们需要存储当年与上一年的销量总和,就需要定义两个StructField

  def bufferSchema: StructType = {
    StructType(StructField("sumOfCurrent", DoubleType) :: StructField("sumOfPrevious", DoubleType) :: Nil)
  }

dataType标明了UDAF函数的返回值类型,deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。

顾名思义,initialize就是对聚合运算中间结果的初始化,在我们这个例子中,两个求和的中间值都被初始化为0d:

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, 0.0)
    buffer.update(1, 0.0)
  }

update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始,所以第一行就是针对“sumOfCurrent”的求和值进行初始化。

UDAF的核心计算都发生在update函数中。在我们这个例子中,需要用户设置计算同比的时间周期。这个时间周期值属于外部输入,但却并非inputSchema的一部分,所以应该从UDAF对应类的构造函数中传入。我为时间周期定义了一个样例类,且对于同比函数,我们只要求输入当年的时间周期,上一年的时间周期可以通过对年份减1来完成:

case class DateRange(startDate: Timestamp, endDate: Timestamp) {
  def in(targetDate: Date): Boolean = {
    targetDate.before(endDate) && targetDate.after(startDate)
  }
}

class YearOnYearBasis(current: DateRange) extends UserDefinedAggregateFunction {
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (current.in(input.getAs[Date](1))) {
      buffer(0) = buffer.getAs[Double](0) + input.getAs[Double](0)
    }
    val previous = DateRange(subtractOneYear(current.startDate), subtractOneYear(current.endDate))
    if (previous.in(input.getAs[Date](1))) {
      buffer(1) = buffer.getAs[Double](0) + input.getAs[Double](0)
    }
  }
}  

update函数的第二个参数input: Row对应的并非DataFrame的行,而是被inputSchema投影了的行。以本例而言,每一个input就应该只有两个Field的值。倘若我们在调用这个UDAF函数时,分别传入了销量销售日期两个列的话,则input(0)代表的就是销量,input(1)代表的就是销售日期。

merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中:

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0)
    buffer1(1) = buffer1.getAs[Double](1) + buffer2.getAs[Double](1)
  }

最后,由evaluate函数完成对聚合Buffer值的运算,得到最后的结果:

  def evaluate(buffer: Row): Any = {
    if (buffer.getDouble(1) == 0.0)
      0.0
    else
      (buffer.getDouble(0) - buffer.getDouble(1)) / buffer.getDouble(1) * 100
  }

假设我们创建了这样一个简单的DataFrame:

    val conf = new SparkConf().setAppName("TestUDF").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    
    import sqlContext.implicits._

    val sales = Seq(
      (1, "Widget Co", 1000.00, 0.00, "AZ", "2014-01-01"),
      (2, "Acme Widgets", 2000.00, 500.00, "CA", "2014-02-01"),
      (3, "Widgetry", 1000.00, 200.00, "CA", "2015-01-11"),
      (4, "Widgets R Us", 2000.00, 0.0, "CA", "2015-02-19"),
      (5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2015-02-28")
    )

    val salesRows = sc.parallelize(sales, 4)
    val salesDF = salesRows.toDF("id", "name", "sales", "discount", "state", "saleDate")
    salesDF.registerTempTable("sales")

那么,要使用之前定义的UDAF,则需要实例化该UDAF类,然后再通过udf进行注册:

    val current = DateRange(Timestamp.valueOf("2015-01-01 00:00:00"), Timestamp.valueOf("2015-12-31 00:00:00"))
    val yearOnYear = new YearOnYearBasis(current)

    sqlContext.udf.register("yearOnYear", yearOnYear)
    val dataFrame = sqlContext.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales")
    dataFrame.show()

在使用上,除了需要对UDAF进行实例化之外,与普通的UDF使用没有任何区别。但显然,UDAF更加地强大和灵活。如果Spark自身没有提供符合你需求的函数,且需要进行较为复杂的聚合运算,UDAF是一个不错的选择。

通过Spark提供的UDF与UDAF,你可以慢慢实现属于自己行业的函数库,让Spark SQL变得越来越强大,对于使用者而言,却能变得越来越简单。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容