Introducing Pandas UDF for PySpark
更新:此博客于 2018 年 2 月 22 日更新,以包含一些更改。
这篇博文在即将发布的 Apache Spark 2.3 版本中引入了 Pandas UDFs(即 Vectorized UDFs) 特性,这大大提高了 Python 中用户定义函数(UDF)的性能和可用性。
在过去的几年中,Python 已经成为数据科学家的默认语言。像 pandas,numpy,statsmodel 和 scikit-learn 这样的软件包已经获得了广泛的采用并成为主流工具包。同时,Apache Spark 已成为处理大数据的事实标准。为了使数据科学家能够利用大数据的价值,Spark 在 0.7 版中添加了 Python API,并支持user-defined functions。这些用户定义的函数一次只能操作一行,因此会遭遇高序列化和调用开销。因此,许多数据管道在 Java 和 Scala 中定义 UDF,然后从 Python 中调用它们。
基于 Apache Arrow 构建的 Pandas UDF 为您提供了两全其美的功能 - 完全用 Python 定义低开销,高性能 UDF的能力。
在 Spark 2.3 中,将会有两种类型的 Pandas UDF: 标量(scalar)和分组映射(grouped map)。接下来,我们使用四个示例程序来说明它们的用法:Plus One,累积概率,减去平均值,普通最小二乘线性回归。
Scalar Pandas UDFs
Scalar Pandas UDFs 用于向量化标量运算。要定义一个标量 Pandas UDF,只需使用 @pandas_udf
来注释一个 Python 函数,该函数接受 pandas.Series
作为参数并返回另一个相同大小的 pandas.Series
。下面我们用两个例子来说明:Plus One 和 Cumulative Probability。
Plus One
计算 v + 1 是演示 row-at-a-time UDFs 和 scalar Pandas UDFs 之间差异的简单示例。请注意,在这种情况下内置的列运算符可能执行得更快。
使用一次一行的 UDF:
from pyspark.sql.functions import udf
# 使用 udf 定义一个 row-at-a-time 的 udf
@udf('double')
# 输入/输出都是单个 double 类型的值
def plus_one(v):
return v + 1
df.withColumn('v2', plus_one(df.v))
使用 Pandas UDFs:
from pyspark.sql.functions import pandas_udf, PandasUDFType
# 使用 pandas_udf 定义一个 Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# 输入/输出都是 double 类型的 pandas.Series
def pandas_plus_one(v):
return v + 1
df.withColumn('v2', pandas_plus_one(df.v))
上面的例子定义了一次一行的 UDF “plus_one” 和一个执行相同的“加一”计算的 scala Pandas UDF “pandas_plus_one”。除了函数装饰器之外,UDF 的定义是相同的:“udf” vs “pandas_udf”。
在一次一行的版本中,用户定义的函数接收一个 double 类型的参数 “v” 并将 “v + 1” 的结果作为 double 来返回。在 Pandas 版本中,用户定义函数接收 pandas.Series
类型的参数 “v”,并将 “v + 1” 的结果作为pandas.Series
返回。因为 “v + 1” 是在 pandas.Series
上进行矢量化的,所以 Pandas 版本比 row-at-a-time 的版本快得多。
请注意,使用 scala pandas UDF 时有两个重要要求:
- 输入和输出序列必须具有相同的大小。
- 如何将一列分割为多个
pandas.Series
是Spark的内部的事,因此用户定义函数的结果必须独立于分割。
累积概率
这个例子展示了 scalar Pandas UDF 更实际的用法:使用 scipy 包计算正态分布 N(0,1) 中值的累积概率。
import pandas as pd
from scipy import stats
@pandas_udf('double')
def cdf(v):
return pd.Series(stats.norm.cdf(v))
df.withColumn('cumulative_probability', cdf(df.v))
stats.norm.cdf
在标量值和 pandas.Series
上都是可用的,并且此示例也可以使用一次一行的 UDF 编写。与前面的例子类似,Pandas 版本运行速度更快,如后面的“性能比较”部分所示。
Grouped Map Pandas UDFs
Python 用户对数据分析中的 split-apply-combine 模式非常熟悉。Grouped Map Pandas UDF 是针对这种情况设计的,它们针对某些组的所有数据进行操作,例如“针对每个日期应用此操作”。
Grouped Map Pandas UDF 首先根据 groupby 运算符中指定的条件将 Spark DataFrame
分组,然后将用户定义的函数(pandas.DataFrame
-> pandas.DataFrame
)应用于每个组,并将结果组合并作为新的 Spark DataFrame
返回。
Grouped map Pandas UDF 使用与 scalar Pandas UDF 使用相同的函数装饰器 pandas_udf
,但它们有一些区别:
-
用户定义函数的输入:
- Scalar:
pandas.Series
- Grouped map:
pandas.DataFrame
- Scalar:
-
用户定义函数的输出:
- Scalar:
pandas.Series
- Grouped map:
pandas.DataFrame
- Scalar:
-
分组语义:
- Scalar: 无分组语义
- Grouped map: 由 “groupby” 从句定义
-
输出大小:
- Scalar: 和输入大小一样
- Grouped map: 任何尺寸
-
函数装饰器中的返回类型:
- Scalar: 一个
DataType
,用于指定返回的pandas.Series
的类型 - Grouped map: 一个
StructType
,用于指定返回的pandas.DataFrame
中每列的名称和类型
- Scalar: 一个
接下来,让我们通过两个示例来说明 grouped map Pandas UDF 的使用场景。
Subtract Mean
此示例显示了简单使用 grouped map Pandas UDFs:从组中的每个值中减去平均值。
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def subtract_mean(pdf):
return pdf.assign(v=pdf.v - pdf.v.mean())
df.groupby('id').apply(subtract_mean)
在这个例子中,我们从每个组的 v 值中减去 v 的均值。分组语义由 “groupby” 函数定义,即每个输入到用户定义函数的 pandas.DataFrame
具有相同的 “id” 值。这个用户定义函数的输入和输出模式是相同的,所以我们将“df.schema” 传递给装饰器 pandas_udf
来指定模式。
Grouped map Pandas UDF 也可以作为驱动程序上的独立 Python 函数调用。这对于调试非常有用,例如:
sample = df.filter(id == 1).toPandas()
# Run as a standalone function on a pandas.DataFrame and verify result
subtract_mean.func(sample)
# Now run with Spark
df.groupby('id').apply(substract_mean)
在上面的示例中,我们首先将 Spark DataFrame
的一个小子集转换为 pandas.DataFrame
,然后将 subtract_mean 作为独立的 Python 函数运行。验证函数逻辑后,我们可以在整个数据集上使用 Spark 调用 UDF。
普通最小二乘线性回归
最后一个示例显示了如何使用 statsmodels 为每个组运行 OLS 线性回归。对于每个组,我们根据统计模型 Y = bX + c 计算对于 X = (x1,x2) 的 beta b = (b1,b2)。
import statsmodels.api as sm
# df has four columns: id, y, x1, x2
group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
group_key = pdf[group_column].iloc[0]
y = pdf[y_column]
X = pdf[x_columns]
X = sm.add_constant(X)
model = sm.OLS(y, X).fit()
return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]], columns=[group_column] + x_columns)
beta = df.groupby(group_column).apply(ols)
此示例演示了 grouped map Pandas UDF 可以与任何任意的 python 函数一起使用:pandas.DataFrame
-> pandas.DataFrame
。返回的 pandas.DataFrame
可以具有与输入不同的行数和列数。
性能比较
最后,我们想要显示 row-at-a-time UDF 和 Pandas UDF 之间的性能比较。我们为以上三个示例运行微基准测试(plus one, cumulative probability 和 subtract mean)。
配置和方法
我们在 Databricks 社区版的单节点 Spark 群集上运行了基准测试。
配置细节:
数据:带有 Int 列和 Double 列的 10M 行 DataFrame
集群:6.0 GB 内存,0.88 核心,1 个 DBU
Databricks 运行时版本:Latest RC(4.0,Scala 2.11)
有关基准的详细实现,请查看 Pandas UDF Notebook。
如图表所示,Pandas UDF 的表现比 row-at-a-time UDF 好得多,范围从 3倍到 100倍 不等。
结论和未来工作
即将推出的 Spark 2.3 版本为基本改进Python中用户定义函数的功能和性能奠定了基础。今后,我们计划在聚合和窗口函数中引入对 Pandas UDF 的支持。相关工作可以在 SPARK-22216 中进行跟踪。
Pandas UDFs 是 Spark 社区努力的一个很好的例子。我们要感谢 Bryan Cutler, Hyukjin Kwon, Jeff Reback, Liang-Chi Hsieh, Leif Walsh, Li Jin, Reynold Xin, Takuya Ueshin, Wenchen Fan, Wes McKinney, Xiao Li 以及其他人的贡献。最后,特别感谢 Apache Arrow 社区让这项工作成为可能。
下一步是什么
您可以尝试 Pandas UDF notebook ,并且此功能现在作为 Databricks Runtime 4.0 测试版的一部分提供.