Introducing Pandas UDF for PySpark

Introducing Pandas UDF for PySpark

更新:此博客于 2018 年 2 月 22 日更新,以包含一些更改。

这篇博文在即将发布的 Apache Spark 2.3 版本中引入了 Pandas UDFs(即 Vectorized UDFs) 特性,这大大提高了 Python 中用户定义函数(UDF)的性能和可用性。

在过去的几年中,Python 已经成为数据科学家的默认语言。像 pandasnumpystatsmodelscikit-learn 这样的软件包已经获得了广泛的采用并成为主流工具包。同时,Apache Spark 已成为处理大数据的事实标准。为了使数据科学家能够利用大数据的价值,Spark 在 0.7 版中添加了 Python API,并支持user-defined functions。这些用户定义的函数一次只能操作一行,因此会遭遇高序列化和调用开销。因此,许多数据管道在 Java 和 Scala 中定义 UDF,然后从 Python 中调用它们。

基于 Apache Arrow 构建的 Pandas UDF 为您提供了两全其美的功能 - 完全用 Python 定义低开销,高性能 UDF的能力。

在 Spark 2.3 中,将会有两种类型的 Pandas UDF: 标量(scalar)和分组映射(grouped map)。接下来,我们使用四个示例程序来说明它们的用法:Plus One,累积概率,减去平均值,普通最小二乘线性回归。

Scalar Pandas UDFs

Scalar Pandas UDFs 用于向量化标量运算。要定义一个标量 Pandas UDF,只需使用 @pandas_udf 来注释一个 Python 函数,该函数接受 pandas.Series 作为参数并返回另一个相同大小的 pandas.Series。下面我们用两个例子来说明:Plus One 和 Cumulative Probability。

Plus One

计算 v + 1 是演示 row-at-a-time UDFs 和 scalar Pandas UDFs 之间差异的简单示例。请注意,在这种情况下内置的列运算符可能执行得更快。

使用一次一行的 UDF:

from pyspark.sql.functions import udf

# 使用 udf 定义一个 row-at-a-time 的 udf
@udf('double')
# 输入/输出都是单个 double 类型的值
def plus_one(v):
    return v + 1

df.withColumn('v2', plus_one(df.v))

使用 Pandas UDFs:

from pyspark.sql.functions import pandas_udf, PandasUDFType

# 使用 pandas_udf 定义一个 Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# 输入/输出都是 double 类型的 pandas.Series

def pandas_plus_one(v):
    return v + 1

df.withColumn('v2', pandas_plus_one(df.v))

上面的例子定义了一次一行的 UDF “plus_one” 和一个执行相同的“加一”计算的 scala Pandas UDF “pandas_plus_one”。除了函数装饰器之外,UDF 的定义是相同的:“udf” vs “pandas_udf”。

在一次一行的版本中,用户定义的函数接收一个 double 类型的参数 “v” 并将 “v + 1” 的结果作为 double 来返回。在 Pandas 版本中,用户定义函数接收 pandas.Series 类型的参数 “v”,并将 “v + 1” 的结果作为pandas.Series 返回。因为 “v + 1” 是在 pandas.Series 上进行矢量化的,所以 Pandas 版本比 row-at-a-time 的版本快得多。

请注意,使用 scala pandas UDF 时有两个重要要求:

  • 输入和输出序列必须具有相同的大小。
  • 如何将一列分割为多个 pandas.Series 是Spark的内部的事,因此用户定义函数的结果必须独立于分割。

累积概率

这个例子展示了 scalar Pandas UDF 更实际的用法:使用 scipy 包计算正态分布 N(0,1) 中值的累积概率

import pandas as pd
from scipy import stats

@pandas_udf('double')
def cdf(v):
    return pd.Series(stats.norm.cdf(v))


df.withColumn('cumulative_probability', cdf(df.v))

stats.norm.cdf 在标量值和 pandas.Series 上都是可用的,并且此示例也可以使用一次一行的 UDF 编写。与前面的例子类似,Pandas 版本运行速度更快,如后面的“性能比较”部分所示。

Grouped Map Pandas UDFs

Python 用户对数据分析中的 split-apply-combine 模式非常熟悉。Grouped Map Pandas UDF 是针对这种情况设计的,它们针对某些组的所有数据进行操作,例如“针对每个日期应用此操作”。

Grouped Map Pandas UDF 首先根据 groupby 运算符中指定的条件将 Spark DataFrame 分组,然后将用户定义的函数(pandas.DataFrame -> pandas.DataFrame)应用于每个组,并将结果组合并作为新的 Spark DataFrame 返回。

Grouped map Pandas UDF 使用与 scalar Pandas UDF 使用相同的函数装饰器 pandas_udf,但它们有一些区别:

  • 用户定义函数的输入:

    • Scalar: pandas.Series
    • Grouped map: pandas.DataFrame
  • 用户定义函数的输出:

    • Scalar: pandas.Series
    • Grouped map: pandas.DataFrame
  • 分组语义:

    • Scalar: 无分组语义
    • Grouped map: 由 “groupby” 从句定义
  • 输出大小:

    • Scalar: 和输入大小一样
    • Grouped map: 任何尺寸
  • 函数装饰器中的返回类型:

    • Scalar: 一个 DataType,用于指定返回的 pandas.Series 的类型
    • Grouped map: 一个 StructType,用于指定返回的 pandas.DataFrame 中每列的名称和类型

接下来,让我们通过两个示例来说明 grouped map Pandas UDF 的使用场景。

Subtract Mean

此示例显示了简单使用 grouped map Pandas UDFs:从组中的每个值中减去平均值。

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def subtract_mean(pdf):
    return pdf.assign(v=pdf.v - pdf.v.mean())

df.groupby('id').apply(subtract_mean)

在这个例子中,我们从每个组的 v 值中减去 v 的均值。分组语义由 “groupby” 函数定义,即每个输入到用户定义函数的 pandas.DataFrame 具有相同的 “id” 值。这个用户定义函数的输入和输出模式是相同的,所以我们将“df.schema” 传递给装饰器 pandas_udf 来指定模式。

Grouped map Pandas UDF 也可以作为驱动程序上的独立 Python 函数调用。这对于调试非常有用,例如:

sample = df.filter(id == 1).toPandas()
# Run as a standalone function on a pandas.DataFrame and verify result
subtract_mean.func(sample)

# Now run with Spark
df.groupby('id').apply(substract_mean)

在上面的示例中,我们首先将 Spark DataFrame 的一个小子集转换为 pandas.DataFrame,然后将 subtract_mean 作为独立的 Python 函数运行。验证函数逻辑后,我们可以在整个数据集上使用 Spark 调用 UDF。

普通最小二乘线性回归

最后一个示例显示了如何使用 statsmodels 为每个组运行 OLS 线性回归。对于每个组,我们根据统计模型 Y = bX + c 计算对于 X = (x1,x2) 的 beta b = (b1,b2)。

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)

此示例演示了 grouped map Pandas UDF 可以与任何任意的 python 函数一起使用:pandas.DataFrame -> pandas.DataFrame。返回的 pandas.DataFrame 可以具有与输入不同的行数和列数。

性能比较

最后,我们想要显示 row-at-a-time UDF 和 Pandas UDF 之间的性能比较。我们为以上三个示例运行微基准测试(plus one, cumulative probability 和 subtract mean)。

配置和方法

我们在 Databricks 社区版的单节点 Spark 群集上运行了基准测试。

配置细节:
数据:带有 Int 列和 Double 列的 10M 行 DataFrame
集群:6.0 GB 内存,0.88 核心,1 个 DBU
Databricks 运行时版本:Latest RC(4.0,Scala 2.11)

有关基准的详细实现,请查看 Pandas UDF Notebook

img

如图表所示,Pandas UDF 的表现比 row-at-a-time UDF 好得多,范围从 3倍100倍 不等。

结论和未来工作

即将推出的 Spark 2.3 版本为基本改进Python中用户定义函数的功能和性能奠定了基础。今后,我们计划在聚合和窗口函数中引入对 Pandas UDF 的支持。相关工作可以在 SPARK-22216 中进行跟踪。

Pandas UDFs 是 Spark 社区努力的一个很好的例子。我们要感谢 Bryan Cutler, Hyukjin Kwon, Jeff Reback, Liang-Chi Hsieh, Leif Walsh, Li Jin, Reynold Xin, Takuya Ueshin, Wenchen Fan, Wes McKinney, Xiao Li 以及其他人的贡献。最后,特别感谢 Apache Arrow 社区让这项工作成为可能。

下一步是什么

您可以尝试 Pandas UDF notebook ,并且此功能现在作为 Databricks Runtime 4.0 测试版的一部分提供.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容