1.提交任务
spark-submit
--driver-memory 10g
--queue queue.name
--conf spark.dynamicAllocation.minExecutors=160
--conf spark.dynamicAllocation.maxExecutors=800
--conf spark.default.parallelism=1600
--py-files utils.zip
pycode.py param
- user defied function
1.创建普通的python函数
def toDate(s):
return str(s)+'-'
2.注册自定义函数
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
//根据python的返回值类型定义好spark对应的数据类型
//python函数中返回的是string,对应的pyspark是StringType
toDateUDF=udf(toDate, StringType())
3.使用自定义函数
df1.withColumn('color',toDateUDF('color')).show()
- window函数
from pyspark.sql import Window
window=Window.partitionBy("pid","time_grp").orderBy(train_info_df2["timestamp"].desc())
train_info_df2 = train_info_df2.withColumn('topn',func.row_number().over(window))