一、项目介绍:
Adventure Works Cycles是Adventure Works样本数据库所虚构的一家大型跨国制造公司,该公司生产和销售自行车到全球各个市场。
- 产品介绍:
目前公司主要有四个产品线,一是生成的自行车(是最大的销售源);二是自行车部件,例如车轮、踏板或制动组件;三是自行车周边,例如服装等;四是自行车配件等。
二、项目目的:
- 1.随着公司销量日益增加的情况,需要增强公司数据化管理,同时使业务同事实现自主分析,从而实现对市场的快速判断。因此需要于数据部分进行沟通分析业务指标需求,构建可视化看板。
- 2.业务指标需求:
- 最近的销售情况:对销量、销售额、客单价等指标分别从当天、前一天、当月、当季、当年进行分析;
- 销售趋势:对最近21天的销售情况进行统计;
-
总体销售情况:总体销售额分布,地区客单价比较等。
三、项目过程
- MySQL数据源观察
- 数据指标构建
- 数据表构建
- linux定时部署
1.MySQL数据源观察
了解到数据库主要有3张表,分别如下:
ods_sales_orders:订单明细表
ods_customers:用户信息表
dim_date_df:日期维度表
2.数据指标构建
构建销量、销售额、达标率、环比、同比等指标,分别从当日、昨日、当月、当季、当年日期维度进行分析;
3.数据表构建
(1)dw_order_info:销售详情表
- 从mysql数据源读取订单明细表、客户信息表、日期维度表,将订单明细表与客户信息表通过"customer_key"字段合并,再与日期维度表通过"create_date"进行合并。
- 以下是部分python脚本代码:
def read_date(adventure_conn_read):
try:
# 订单明细表
sql_1 = "SELECT * from ods_sales_orders"
ods_sales_orders = pd.read_sql(sql_1,adventure_conn_read)
# 客户信息表
sql_2 = "SELECT customer_key,gender,chinese_territory,chinese_province,chinese_city from ods_customer"
ods_customer = pd.read_sql(sql_2,adventure_conn_read).drop_duplicates("customer_key")
# 日期维度表
sql_3 = "SELECT create_date, is_today, is_yesterday, is_21_day, is_current_month, is_current_quarter, is_current_year, is_last_year from dim_date_df"
dim_date_df = pd.read_sql(sql_3,adventure_conn_read)
return ods_sales_orders, ods_customer, dim_date_df
except Exception as e:
logger.info("read_date报错信息:{}".format(e))
# 合并订单明细表和客户信息表
def merge1(ods_sales_orders, ods_customer):
try:
df1 = pd.merge(ods_sales_orders, ods_customer, on = "customer_key")
return df1
except Exception as e:
logger.info("merge1报错信息:{}".format(e))
# 合并df1和日期维度表
def merge2(df1, dim_date_df):
try:
df1["create_date"] = pd.to_datetime(df1["create_date"])
dim_date_df["create_date"] = pd.to_datetime(dim_date_df["create_date"])
dw_orders_info = pd.merge(df1, dim_date_df, on = "create_date")
dw_orders_info["create_date"] = dw_orders_info["create_date"].apply(lambda x:x.strftime("%Y-%m-%d"))
return dw_orders_info
except Exception as e:
logger.info("merge2报错信息:{}".format(e))
# 保存至sql
def save_tosql(dw_orders_info,adventure_conn_tosql):
try:
dw_orders_info.to_sql("dw_orders_info_{}".format(my_name),con =adventure_conn_tosql, if_exists = 'replace', index=False)
except Exception as e:
logger.info("save_tosql报错信息{}".format(e))
(2)dw_order_by_day:每日聚合表
- 从sql读取销售订单表,利用groupby将每日信息进行聚合重组,最后计算每日环比、达标率等指标
- 以下是部分python脚本代码:
def sum_amount_order(adventure_conn_read):
"""sum_amount_order:销量订单聚合表
具体:读取ods_sales_orders(订单明细表),根据create_date聚合,求总销售额、客户数量、客单价"""
try:
sum_amount_order = pd.read_sql_query("select * from ods_sales_orders",
con=adventure_conn_read) # 读取ods_sales_orders表数据
# 聚合
sum_amount_order = sum_amount_order.groupby("create_date").agg({'customer_key':'count','unit_price':'sum'}).reset_index()
# 计算客单价
sum_amount_order['amount_div_order'] = sum_amount_order['unit_price']/sum_amount_order['customer_key']
# 修改列名
sum_amount_order.rename(columns = {'customer_key':'order_counts',
"unit_price":"sum_amount"},inplace = True)
return sum_amount_order
# 如果错误,则填写错误日志
except Exception as e:
logger.info("sum_amount_order异常,报错信息:{}".format(e))
# 生成目标销售额和销量
def add_order_goal(sum_amount_order):
"""add_order_goal:生成目标金额及目标销量
具体:利用空列表及循环生成对应随机值,与销量订单聚合表合并形成sum_amount_order_goal(销量订单聚合目标表)"""
try:
# 定义两个列表,一个为每日销售目标、一个为每日销量目标
sum_amount_goal_list = []
# 创建一个日期列表
create_date_list = list(sum_amount_order['create_date']) # 获取sum_amount_order中的create_date
# 根据日期遍历每日销售额、销量目标
for i in create_date_list:
a = random.uniform(0.85,1.1)
amount_goal = list(sum_amount_order[sum_amount_order['create_date']==i]['sum_amount'] * a)[0]
sum_amount_goal_list.append(amount_goal)
# 组合每日销量表和每日目标
sum_amount_order_goal = pd.concat([sum_amount_order,pd.DataFrame({"sum_amount_goal":sum_amount_goal_list})],axis = 1)
sum_amount_order_goal["compliance rate"] = sum_amount_order_goal["sum_amount"] / sum_amount_order_goal["sum_amount_goal"]
return sum_amount_order_goal
except Exception as e:
logger.info("add_order_goal异常,报错信息:{}".format(e))
# 读取日期维度表
def date_data(adventure_conn_read):
"""数据库读取dim_date_df日期维度表"""
try:
date_sql = "SELECT create_date,is_today,is_yesterday,is_21_day,is_current_month,is_current_quarter,is_current_year,is_last_year FROM dim_date_df"
date_info = pd.read_sql_query(date_sql, con=adventure_conn_read)
return date_info
except Exception as e:
logger.info("date_data异常,报错信息:{}".format(e))
# 融合销量订单聚合目标表和日期维度表
def merge_data(sum_amount_order_goal, date_info):
"""参数解释:sum_amount_order_goal销量订单聚合目标表,
date_info日期维度表(来自date_data函数)
输出:amount_order_by_day销量订单聚合目标及日期维度表
"""
try:
# 将create_date设置为标准日期格式
date_info['create_date'] = pd.to_datetime(date_info['create_date'],format = "%Y-%m-%d")
sum_amount_order_goal['create_date'] = pd.to_datetime(sum_amount_order_goal['create_date'],format = "%Y-%m-%d")
# 融合两张表
amount_order_by_day = pd.merge(sum_amount_order_goal,date_info,on = "create_date",how = "inner")
amount_order_by_day["create_date"] = amount_order_by_day["create_date"].apply(lambda x:x.strftime("%Y-%m-%d"))
return amount_order_by_day
except Exception as e:
logger.info("merge_data异常,报错信息:{}".format(e))
# 储存数据表
def save_to_mysql(amount_order_by_day, adventure_conn_tosql):
"""将amount_order_by_day数据追加到数据库dw_order_by_day(每日环比表)当中"""
try:
# 每日销量、销售额环比情况
amount_order_by_day['order_diff'] = amount_order_by_day['order_counts'].pct_change().fillna(0)
amount_order_by_day['amount_diff']= amount_order_by_day['sum_amount'].pct_change().fillna(0)
# 追加至数据库
amount_order_by_day.to_sql('dw_order_by_day_{}'.format(my_name),con = adventure_conn_tosql,
if_exists = 'replace',index = False)
except Exception as e:
logger.info("save_to_mysql异常,报错信息:{}".format(e))
(3)dw_amount_diff:数据同比表
- 读取dw_order_by_day数据,按照当天、昨天、当月、当季、当年、去年日期维度进行聚合重组,分别于去年的数据进行比较。
- 以下是部分python脚本:
def diff(stage, indictor):
try:
'''stage:日期维度的判断,如:is_today 内有[0,1]
indictor:需取值字段,如:sum_amount(总金额),sum_order(总订单量)
输出:当前时间维度下总和,去年同期总和'''
# 当前stage维度,indictor之和
current_stage_indictor = dw_order_by_day[dw_order_by_day[stage]==1][indictor].sum()
# 前一年日期维度列表
before_stage_list = list(dw_order_by_day[dw_order_by_day[stage]==1]['create_date'] - datetime.timedelta(366))
# 前一年stage维度,indictor之和
before_stage_indictor = dw_order_by_day[dw_order_by_day['create_date'].isin(before_stage_list)][indictor].sum()
return current_stage_indictor, before_stage_indictor
except Exception as e:
logger.info("diff异常,报错信息:{}".format(e))
def delete_table():
try:
pd.read_sql_query("Truncate table dw_amount_diff_{}".format(my_name), con=adventure_conn_tosql)
except: # 因为删除插入更新操作没有返回值,程序会抛出ResourceClosedError,并终止程序。使用try捕捉此异常。
print('继续')
logger.info("继续运行")
if __name__ == "__main__":
'''目的:生成dw_amount_diff当日维度表(按当天/昨天/当月/当季/当年的同比)'''
"""各阶段的金额"""
today_amount, before_year_today_amount = diff('is_today', 'sum_amount')
yesterday_amount, before_year_yesterday_amount = diff('is_yesterday', 'sum_amount')
month_amount, before_year_month_amount = diff('is_current_month', 'sum_amount')
quarter_amount, before_year_quarter_amount = diff('is_current_quarter', 'sum_amount')
year_amount, before_year_year_amount = diff('is_current_year', 'sum_amount')
"""各阶段的订单数"""
today_order, before_year_today_order = diff('is_today', 'order_counts')
yesterday_order, before_year_yesterday_order = diff('is_yesterday', 'order_counts')
month_order, before_year_month_order = diff('is_current_month', 'order_counts')
quarter_order, before_year_quarter_order = diff('is_current_quarter', 'order_counts')
year_order, before_year_year_order = diff('is_current_year', 'order_counts')
# 生成当天/昨天/本月/本季度/今年日期维度的同比指标 (今日数据/去年今日数据 - 1)
try:
amount_dic = {'today_diff': [today_amount / before_year_today_amount - 1,
today_order / before_year_today_order - 1,
(today_amount / today_order) / (before_year_today_amount /
before_year_today_order) - 1],
'yesterday_diff': [yesterday_amount / before_year_yesterday_amount - 1,
yesterday_order / before_year_yesterday_order - 1,
(yesterday_amount / yesterday_order) / (before_year_yesterday_amount /
before_year_yesterday_order) - 1],
'month_diff': [month_amount / before_year_month_amount - 1,
month_amount / before_year_month_amount - 1,
(month_amount / month_order) / (before_year_month_amount /
before_year_month_order) - 1],
'quarter_diff': [quarter_amount / before_year_quarter_amount - 1,
quarter_amount / before_year_quarter_amount - 1,
(quarter_amount / quarter_order) / (before_year_quarter_amount /
before_year_quarter_order) - 1],
'year_diff': [year_amount / before_year_year_amount - 1,
year_amount / before_year_year_amount - 1,
(year_amount / year_order) / (before_year_year_amount /
before_year_year_order) - 1],
'flag': ['amount', 'order', 'avg']} # 做符号简称,横向提取数据方便;销售额、销量、客单价
amount_diff = pd.DataFrame(amount_dic)
amount_diff["create_date"] = today_date.strftime("%Y-%m-%d")
4. linux服务器定时部署
- 在linux服务器上创建指定时间部署脚本,边可以实现数据每日自动更新,而不需要自己手动去操作;
(1)创建定时任务脚本
- 使用python的schedule库,创建定时脚本;
- 部分代码如下:
python执行代码
os.system("/home/anaconda3/bin/python3 /home/frog005/adventure_yxs/dw_amount_diff.py >> /home/frog005/adventure/yxs_logs/dw_amount_diff_schedule.log 2>&1 &")
schedule定时任务代码,分别将三张表的自动更新时间设置为6:20,7:00,7:10
schedule.every().day.at('06:20').do(job0)
schedule.every().day.at('07:00').do(job1)
schedule.every().day.at('07:10').do(job2)
(2)linux服务器部署脚本
- 在linux服务器上,加入nohup即可使退出终端,脚本依然执行,&可令脚本自动挂在后台执行。
nohup python schedule_job_yxs.py > schedule_test.log 2>&1 &
- 若想查看后台运行情况 需要使用 ps aux|grep 代码实现
- 若想杀掉进程,则需要使用 kill -9 <进程号>
四、连接PowerBI部署展示
完成前面的步骤后,将powerbi与数据库连接,即可将每日数据进行展示。
在PowerBI中,一共有三页,分别为销售数据(可指定维度),销售趋势、总体销售情况
(1)销售数据页面
- 指标:订单量、销售额、客单价、购买客户性别、当日达标率、销售额环比、自行车销售额排名、销量占比、销售额城市占比
-
时间维度:可选择今日、昨日、本月、本季度、今年
(2)销售趋势页面
- 指标:可从地区、城市、产品种类方面,查看销售额、自行车类别销售额、客单价
-
时间维度:最近21天、当月、当季、今年
(3)总体销售情况页面
从省份、区域角度可以查看总体销售情况以及客单价情况
可视化地址:点击