在统计中通常需要聚合多表信息到宽表,一般采用crontab+pyspark脚本
1.创建session
spark = SparkSession.builder.master("local").appName("app")\
.config("spark.driver.memory", "512m")\
.enableHiveSupport()\
.getOrCreate()
2.聚合订单信息宽表
df_order = spark.read.csv("file:///home/hadoop/order.csv", sep="\t", header=True)
df_user = spark.read.csv("file:///home/hadoop/user.csv".format(dt), sep="\t", header=True)
#合并并保留订单全部数据
df_user_temp = df.selectExpr("id as u_id", "name as user_name", "reg_time ", "age")
df_order_all = df_order.join(df_user_temp, df_order.user_id == df_user_temp.u_id, 'left').drop(df_user_temp.u_id)
table = "order_all"
temp_table = "temp_order_2019-01-01"
df_order_all .createOrReplaceTempView(temp_table)
spark.catalog.cacheTable(temp_table)
#表字段
cols = "id,create_time,user_id,user_name,reg_time,age"
#生成sql
dt = "2019-01-01"
sql = "insert overwrite table bi.{table} partition (dt='{dt}') select {cols} from {temp_table}" .format(table=table, dt=dt, cols=cols, temp_table=temp_table)
spark.sql(sql)
spark.catalog.uncacheTable(temp_table)
spark.stop()