0. 前言
- spark python提供丰富的库函数,比较容易学习。但是对于新手来说,如何完成一个完整的数据查询和处理的spark,存在一些迷惑
- 因此本文将详细的对一个入门demo讲述各个部分的作用
1. 基础操作
#python脚本里
spark = SparkSession.builder.appName(job_name).getOrCreate()
- spark-submit 设置运行参数
#spark安装地址
spark_home="xxx/spark-2.3/"
spark_submit=${spark_home}/bin/spark-submit
#要执行的Python脚本
py_file=$1
${spark_submit} \
--master yarn \
--queue xxxxx \
--num-executors 250 \
--executor-cores 4 \ #executor的核数,每个核可运行一个进程,核越多说明可并行程度越高
--executor-memory 16G \ #executor所占内存
--files adapter.py \
--conf spark.sql.catalogImplementation=hive \
--conf spark.dynamicAllocation.enable=false \
--conf spark.yarn.priority=NORMAL \
--conf spark.default.parallelism=1200 \
$py_file
#平常执行的sql语句
sql_str = ""
#执行sql语句
spark.sql(sql_str)
- 读取文本
#定义文本的schema 表示文本的结构
midlog_schema = T.StructType([
T.StructField("q_stra", T.StringType(), True),
T.StructField("query", T.StringType(), True),
T.StructField("qfreq", T.StringType(), True),
T.StructField("date", T.StringType(), True),
])
#读取文本为DataFrame对象
midlog_data = spark.read.csv(text_path_str, sep='\001', schema=midlog_schema, header=None, inferSchema=False, mode='FAILFAST')
- 处理数据
- 一般为处理特征,过滤无效值。包括很多函数,涉及到udf。udf是用户自定义的函数,灵活性相比spark提供的函数更高,但是pyspark的udf性能较低(dataframe自带的函数可以绕过python对象->java对象->spark底层通信,但是udf避免不了,因此会存在多次文件的序列化,性能不高)
- 声明udf
#udf(函数,返回类型)
disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
- 调用udf
# 由于spark内部不支持一次性传入多个参数,使用struct 可以传入多个参数
data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
- udf 定义
def get_disp_info(self, disp_args):
#F.struct在函数中是元组,根据元组方式获取对应的参数
disp_result = disp_args[0]
day = disp_args[1]
disp_info = []
if disp_result is None:
return disp_info
#处理其他步骤
- 查询对应的数据
- 无固定的方式,根据自己的目的。
- 分组求和,获取某个条件的所有数据等等
- demo 代码
# pyspark 相关的库
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SQLContext
reload(sys)
sys.setdefaultencoding('utf-8')
job_name = "%s_monitor_data_%s" % (user_name, day)
#启动spark任务,可在该语句增加spark任务配置(executor memory,executor个数等)
#具体配置参数可查找spark文档
spark = SparkSession.builder.appName(job_name).getOrCreate()
#定义sql语句
#读取文件有两种方式:sql读表;读取文本(见1.2)
sql_str = "select event_day, search_id, " \
"disp_result " \
"from data_table " \
"where event_day = %s " \
"and is_spam != '1' " \
"and page_no = '1' " % day
#执行sql语句
data = spark.sql(sql_str).cache()
#定义udf
disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
#调用udf
data = data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
data\
.groupBy(['event_day'])\
.agg(
F.countDistinct('search_id').alias('pv')
).coalesce(1)\
.write.csv("/user/%s/tmp_table/search_pv/%s' % (user_name, day), sep='\t', mode='overwrite')