pyspark: sql.functions以及udf函数

大纲

  1. 选取列 select
  2. 常数列 lit
  3. 条件分支 when otherwise
  4. 数学函数
  5. 时间函数
  6. 窗口函数 row_number
  7. 自定义函数 udf
  8. split & exploda

本文主要是列举一些pyspark中类似于sql的相关函数,以及如何自定义函数。首先,创建一个dataframe。以下都是在pyspark的交互界面下执行,版本为2.1.1

from pyspark.sql import Row
from pyspark.sql import functions as sf
rdd = sc.parallelize([Row(name='Alice', level='a', age=5, height=80),Row(name='Bob', level='a', age=5, height=80),Row(name='Cycy', level='b', age=10, height=80),Row(name='Didi', level='b', age=12, height=75),Row(name='EiEi', level='b', age=10, height=70)])
df = rdd.toDF()
print df.show()
"""
+---+------+-----+-----+
|age|height|level| name|
+---+------+-----+-----+
|  5|    80|    a|Alice|
|  5|    80|    a|  Bob|
| 10|    80|    b| Cycy|
| 12|    75|    b| Didi|
| 10|    70|    b| EiEi|
+---+------+-----+-----+
"""

1. 选取列 select

除了选取现有的列,还可以增加新列,并且还可以将列的顺序重排。

df1 = df.select("name", (df.age+1).alias("new_age"))
print df1.show()
"""
+-----+-------+
| name|new_age|
+-----+-------+
|Alice|      6|
|  Bob|      6|
| Cycy|     11|
| Didi|     13|
| EiEi|     11|
+-----+-------+
"""

2.常数列 lit

df2 = df.select("name", (df.age+1).alias("new_age"), sf.lit(2))
print df2.show()
"""
+-----+-------+---+
| name|new_age|  2|
+-----+-------+---+
|Alice|      6|  2|
|  Bob|      6|  2|
| Cycy|     11|  2|
| Didi|     13|  2|
| EiEi|     11|  2|
+-----+-------+---+
"""
# 也可以重命名
df2 = df.select("name", (df.age+1).alias("new_age"), sf.lit(2).alias("constant"))
print df2.show()
"""
+-----+-------+--------+
| name|new_age|constant|
+-----+-------+--------+
|Alice|      6|       2|
|  Bob|      6|       2|
| Cycy|     11|       2|
| Didi|     13|       2|
| EiEi|     11|       2|
+-----+-------+--------+
"""

当然新增列的方式还可以用withColumn,这里不赘述了。

3.条件分支 when otherwise

当多个条件时,一直使用when进行连接,直到使用otherwise。注意当逻辑判断中出现多个判断,则需单个使用()后再进行&或|连接,比如(df.age>=7)&(df.age<11); 否则会报错。

df3 = df.withColumn("when", sf.when(df.age<7, "kindergarten").when((df.age>=7)&(df.age<11), 'low_grade').otherwise("high_grade"))
print df3.show()
"""
+---+------+-----+-----+------------+
|age|height|level| name|        when|
+---+------+-----+-----+------------+
|  5|    80|    a|Alice|kindergarten|
|  5|    80|    a|  Bob|kindergarten|
| 10|    80|    b| Cycy|   low_grade|
| 12|    75|    b| Didi|  high_grade|
| 10|    70|    b| EiEi|   low_grade|
+---+------+-----+-----+------------+
"""

4. 数学函数

数学函数不在此枚举,包括简单的+、-、*、/,log、pow、各三角函数,以及还有round、floor等。具体可见官网 pyspark.sql.functions

5. 时间函数

  • 获取时间current_date()、current_timestamp()、
  • 格式转换date_format()、year()、month()、等
  • 时间运算date_add()、date_sub()等

6. 窗口函数 row_number

from pyspark.sql.window import Window
df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy("level").orderBy("age")).alias("rowNum"))
# 其他写法
df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level).orderBy(df.age)).alias("rowNum"))
print df_r.show()
"""
+---+------+-----+-----+----------+                                             
|age|height|level| name|row_number|
+---+------+-----+-----+----------+
| 10|    80|    b| Cycy|         1|
| 10|    70|    b| EiEi|         2|
| 12|    75|    b| Didi|         3|
|  5|    80|    a|  Bob|         1|
|  5|    80|    a|Alice|         2|
"""

表示逆序,或者根据多个字段分组

df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level, df.age).orderBy(sf.desc("name"))).alias("rowNum"))
# 另一种写法
df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy("level", "age").orderBy(sf.desc("name"))).alias("rowNum"))
print df_r.show()
"""
+---+------+-----+-----+----------+
|age|height|level| name|row_number|
+---+------+-----+-----+----------+
|  5|    80|    a|  Bob|         1|
|  5|    80|    a|Alice|         2|
| 10|    70|    b| EiEi|         1|
| 10|    80|    b| Cycy|         2|
| 12|    75|    b| Didi|         1|
+---+------+-----+-----+----------+
"""

可是,下面这种写法是错误的。

df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level, df.age).orderBy(sf.desc(df.name))).alias("rowNum"))
"""
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/user/local/spark-2.1.1/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
  File "/home/user/local/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/home/user/local/spark-2.1.1/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/user/local/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.desc. Trace:
py4j.Py4JException: Method desc([class org.apache.spark.sql.Column]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
"""

7. 自定义函数 udf

udf只能对每一行进行操作,无法对groupBy后的数据处理。

from pyspark.sql import types as st
def ratio(a, b):
    if a is None or b is None or b == 0:
        r = -1.0
    else:
        r = 1.0 * a / b
    return r
col_ratio = udf(ratio, st.DoubleType())
df_udf = df.withColumn("ratio", col_ratio(df.age, df.height))
print df_udf.show()
"""
+---+------+-----+-----+-------------------+
|age|height|level| name|              ratio|
+---+------+-----+-----+-------------------+
|  5|    80|    a|Alice|             0.0625|
|  5|    80|    a|  Bob|             0.0625|
| 10|    80|    b| Cycy|              0.125|
| 12|    75|    b| Didi|               0.16|
| 10|    70|    b| EiEi|0.14285714285714285|
+---+------+-----+-----+-------------------+
"""

2.3版本以后有pandas_udf,用法比udf更多,可以进行groupBy后的聚合。但由于目前我使用的pyspark版本限制,无法进行实验。

8. split & exploda

待补充

参考资料

  1. 官网 pyspark.sql.functions
  2. Spark DataFrame ETL教程
    3.PySpark pandas udf
    4.pyspark dataframe之udf
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容