1. pyspark添加列,并向udf中传递多个参数
场景:现在有个keyword的list,需要对输入的每行数据的token字段进行判断,判断token是否在keyword中,并把判别的结果添加到新的列中。比如token为"union",那么返回1,token为"union1",返回0。
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf,col
keyword_list=['union','workers','strike','pay','rally','free','immigration']
def label_maker_topic(tokens,topic_words):
if tokens in topic_words:
return 1
else:
return 0
def make_topic_word(topic_words):
return udf(lambda c: label_maker_topic(c, topic_words),IntegerType())
df.withColumn("topics", make_topic_word(keyword_list)(col("token"))
注意:上面的keyword_list是一个变量,同样可以传递多个变量,通过df.withColumn("topics", make_topic_word(keyword_list,param2,param3,param4....)(col("token"))引用即可,同时函数的签名也需要变化。
2.filter中对数据进行模糊过滤
场景:现在需要对df中的数据进行过滤,类似sql中where...like...条件,下面对name列按包含jack关键字进行过滤
df.filter(df.name.like("%jack%")).show()
参考:
Passing a data frame column and external list to udf under withColumn
pyspark函数手册
PySpark - Pass list as parameter to UDF