累计快照事实表概述
在生产系统,msyql表的设计会提前考虑到变化的因素,不会中途修改表结构,甚至于重构。本文在于学习,非生产环境所为
- 累积快照事实表用于定义业务过程开始、 结束以及期间的可区分的里程碑事件。 通常在此类事实表中针对过程中的关键步骤都包含日期外键, 并包含每个步骤的度量, 这些度量的产生一般都会滞后于数据行的创建时间。 累积快照事实表中的一行, 对应某一具体业务的多个状态。 例如, 当订单产生时会插入一行, 当该订单的状态改变时, 累积事实表行被访问并修改。 这种对累积快照事实表行的一致性修改在三种类型的事实表中具有独特性, 对于前面介绍的两类事实表只追加数据, 不会对已经存在的行进行更新操作。 除了日期外键与每个关键过程步骤关联外, 累积快照事实表中还可以包含其他维度和可选退化维度的外键。
- 累积快照事实表在库存、 采购、 销售、 电商等业务领域都有广泛应用。 比如在电商订单里面, 下单的时候只有下单时间, 但是在支付的时候, 又会有支付时间, 同理, 还有发货时间, 完成时间等。 下面以销售订单数据仓库为例, 讨论累积快照事实表的实现。
- 假设希望跟踪以下5个销售订单的里程碑: 下订单、 分配库房、 打包、 配送和收货, 分别用状态N、 A、 P、 S、 R表示。 这5个里程碑的日期及其各自的数量来自源数据库的销售订单表。 一个订单完整的生命周期由5行数据描述: 下订单时生成一条销售订单记录; 订单商品被分配到相应库房时, 新增一条记录, 存储分配时间和分配数量;产品打包时新增一条记录, 存储打包时间和数量; 类似的, 订单配送和订单客户收货时也都分别新增一条记录, 保存各自的时间戳与数量。 为了简化示例, 不考虑每种状态出现多条记录的情况(例如, 一条订单中的产品可能是在不同时间点分多次出库) , 并且假设这5个里程碑是以严格的时间顺序正向进行的。
- 对订单的每种状态新增记录只是处理这种场景的多种设计方案之一。 如果里程碑的定义良好并且不会轻易改变, 也可以考虑在源订单事务表中新增每种状态对应的数据列, 例如, 新增8列, 保存每个状态的时间戳和数量。 新增列的好处是仍然能够保证订单号的唯一性, 并保持相对较少的记录数。 但是, 这种方案还需要额外增加一个last_modified字段记录订单的最后修改时间, 用于Sqoop增量数据抽取。 因为每条订单在状态变更时都会被更新, 所以订单号字段已经不能作为变化数据捕获的比较依据。
- 相当于对前文建立的数仓进行重构,这个在生产环境下是一个比较危险的行为,所以需要在规划数仓的时候进行充分的考虑和论证,不能用脑袋空想,一拍脑袋就决定。
修改mysql数据库模式
- 修改脚本如下:
-- mysql
use source;
-- 修改销售订单事务表
alter table
sales_order
change order_date status_date datetime,
add order_status varchar(1) afterstatus_date,
change order_quantity quantity int;
-- 删除sales_order表的主键
alter table
sales_order change order_number order_number int not null;
alter table sales_order drop primary key;
-- 建立新的主键
alter table
sales_order
add id int unsigned not null auto_increment
primary key comment'主键' first
;
- 语句说明
说明:- 将order_date字段改名为status_date, 因为日期不再单纯指订单日期, 而是指变为某种状态日期。
- 将order_quantity字段改名为quantity, 因为数量变为某种状态对应的数量。
- 在status_date字段后增加order_status字段, 存储N、 A、 P、 S、 R等订单状态之一。 它描述了status_date列对应的状态值, 例如, 如果一条记录的状态为N, 则status_date列是下订单的日期。 如果状态是R, status_date列是收货日期。
- 每种状态都会有一条订单记录, 这些记录具有相同的订单号, 因此订单号不能再作为事务表的主键, 需要删除order_number字段上的自增属性与主键约束。
- 新增id字段作为销售订单表的主键, 它是表中的第一个字段。
- 依据源数据库事务表的结构, 执行下面的脚本修改Hive中相应的过渡区表。
-- 修改过渡表
use rds;
alter table
sales_order
change order_date status_date timestamp comment'status date';
alter table
sales_order
change order_quantity quantity int comment'quantity';
alter table
sales_order
add
columns (order_status varchar(1) comment'order status'
);
- 语句说明
- 将销售订单事实表中order_date和order_quantity字段的名称修改为与源表一致。
增加订单状态字段。 - rds.sales_order并没有增加id列, 原因有两个: 一是该列只作为增量检查列, 不用在过渡表中存储; 二是不需要再重新导入已有数据。
- 将销售订单事实表中order_date和order_quantity字段的名称修改为与源表一致。
- 执行下面的脚本将数据仓库中的事务事实表改造成累积快照事实表。
-- 修改事实表
use dw;
-- 事实表增加八列
alter table
sales_order_fact rename to sales_order_fact_old;
create table
sales_order_fact
(
order_number int comment'order number',
customer_sk int comment'customer SK',
customer_zip_code_sk int comment'customer zip code SK',
shipping_zip_code_sk int comment'shipping zip code SK',
product_sk int comment'product SK',
sales_order_attribute_sk int comment'sales order attribute SK',
order_date_sk int comment'order date SK',
allocate_date_sk int comment 'allocate date SK',
allocate_quantity int comment 'allocate quantity',
packing_date_sk int comment 'packing date SK',
packing_quantity int comment 'packing quantity',
ship_date_sk int comment 'ship date SK',ship_quantity int comment 'ship quantity',
receive_date_sk int comment 'receive date SK',
receive_quantity int comment 'receive quantity',
request_delivery_date_sk int comment'request delivery date SK',
order_amount decimal(10,2) comment'order amount',
order_quantity int comment'order quantity'
)
clustered by (order_number) into 8 buckets
stored as
orc tblproperties ('transactional'='true');
-- 将数据插入到新建事实表结构
insert into
sales_order_fact
select
order_number,
customer_sk,
customer_zip_code_sk,
shipping_zip_code_sk,
product_sk,
sales_order_attribute_sk,
order_date_sk,
null, null, null, null, null, null, null, null,
request_delivery_date_sk,
order_amount,
order_quantity
from
sales_order_fact_old;
-- 删除旧表
drop table sales_order_fact_old;
-- 建立4个日期维度视图
create view
allocate_date_dim
(allocate_date_sk, allocate_date, month
, month_name, quarter, year
)
as select
date_sk, date, month,
month_name, quarter, year
from
date_dim ;
create view
packing_date_dim
(packing_date_sk, packing_date, month, month_name, quarter, year
)
as select
date_sk, date, month, month_name, quarter, year
from
date_dim ;
create view
ship_date_dim
(ship_date_sk, ship_date, month, month_name, quarter, year
)
as select
date_sk, date, month, month_name, quarter, year
from
date_dim ;
create view
receive_date_dim
(receive_date_sk, receive_date, month, month_name, quarter, year
)
as select
date_sk, date, month, month_name, quarter, year
from
date_dim ;
- 语句说明
- 在销售订单事实表中新增加8个字段存储4个状态的日期代理键和度量值。
- 新增8个字段的初始值为空。
- 建立4个日期角色扮演维度视图, 用来获取相应状态的日期代理键。
- ORC表增加字段需要重建表以重新组织数据。
重建sqoop作业
- 使用下面的脚本重建Sqoop作业, 因为源表会有多个相同的order_number, 所以不能再用它作为检查字段, 将检查字段改为id。 抽取的字段名称也要做相应修改。
last_value=`sqoop job --show myjob_incremental_import --meta-connect
jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
sqoop job --delete myjob_incremental_import --meta-connectjdbc:hsqldb:hsql://cdh2:16000/sqoop
sqoop job \
--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
--create myjob_incremental_import \
-- \
import \
--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
--table sales_order \
--columns "order_number, customer_number, product_code, status_date, entry_date,
order_amount, quantity, request_delivery_date, verification_ind, credit_check_flag,
new_customer_ind, web_order_flag, order_status" \
--hive-import \
--hive-table rds.sales_order \
--incremental append \
--check-column id \
--last-value $last_value
修改定期装载脚本
--更新定期装载脚本
insert into
sales_order_fact
select
a.order_number,
c.customer_sk,
i.customer_zip_code_sk,
j.shipping_zip_code_sk,
d.product_sk,
g.sales_order_attribute_sk,
e.order_date_sk,
null, null, null, null, null, null, null, null,
f.request_delivery_date_sk,
order_amount,
quantity
from
rds.sales_order a,
customer_dim c,
product_dim d,
order_date_dim e,
request_delivery_date_dim f,
sales_order_attribute_dim g,
customer_zip_code_dim i,
shipping_zip_code_dim j,
rds.customer k,
rds.cdc_time l
where
a.order_status = 'N'
and
a.customer_number = c.customer_number
and
a.status_date >= c.effective_date
and
a.status_date < c.expiry_date
and
a.customer_number = k.customer_number
and
k.customer_zip_code = i.customer_zip_code
and
a.status_date >= i.effective_date
and
a.status_date <= i.expiry_date
and
k.shipping_zip_code = j.shipping_zip_code
and
a.status_date >= j.effective_date
and
a.status_date <= j.expiry_date
and
a.product_code = d.product_code
and
a.status_date >= d.effective_date
and
a.status_date < d.expiry_date
and
to_date(a.status_date) = e.order_date
and
to_date(a.request_delivery_date) = f.request_delivery_date
and
a.verification_ind = g.verification_ind
and
a.credit_check_flag = g.credit_check_flag
and
a.new_customer_ind = g.new_customer_ind
and
a.web_order_flag = g.web_order_flag
and
a.entry_date >= l.last_load
and
a.entry_date < l.current_load ;
-- 更新分配库房时间代理键和度量
drop table if exists tmp;
create table
tmp
as select
t0.order_number order_number,
t0.customer_sk customer_sk,
t0.customer_zip_code_sk customer_zip_code_sk,
t0.shipping_zip_code_sk shipping_zip_code_sk,
t0.product_sk product_sk,
t0.sales_order_attribute_sk sales_order_attribute_sk,
t0.order_date_sk order_date_sk,
t2.allocate_date_sk allocate_date_sk,
t1.quantity allocate_quantity,
t0.packing_date_sk packing_date_sk,
t0.packing_quantity packing_quantity,
t0.ship_date_sk ship_date_sk,
t0.ship_quantity ship_quantity,
t0.receive_date_sk receive_date_sk,
t0.receive_quantity receive_quantity,
t0.request_delivery_date_sk request_delivery_date_sk,
t0.order_amount order_amount,
t0.order_quantity order_quantity
from
sales_order_fact t0,
rds.sales_order t1,
allocate_date_dim t2,
rds.cdc_time t4
where
t0.order_number = t1.order_number
and
t1.order_status = 'A'
and
to_date(t1.status_date) = t2.allocate_date
and
t1.entry_date >= t4.last_load andt1.entry_date < t4.current_load;
delete from
sales_order_fact
where
sales_order_fact. order_number
in
(select
order_number
from
tmp
);
insert into
sales_order_fact
select * from
tmp;
-- 更新打包时间代理键和度量, 关联packing_date_dim维度视图, order_status = 'P'
-- 更新配送时间代理键和度量, 关联ship_date_dim维度视图, order_status = 'S'
-- 更新收货时间代理键和度量, 关联receive_date_dim维度视图, order_status = 'R'
-- 重载pa客户维度 ...
-- 更新时间戳表的last_load字段
- 语句说明:
- 需要修改定期数据装载中的事实表部分, 针对5个里程碑分别处理。 首先装载新增的订单。 在装载事务事实表时, 只用entry_date >= last_load and entry_date < current_load条件就可以过滤出新增的订单。 但是对于累积快照, 一个登记日期下会有多种状态的订单, 因此需要增加订单状态order_status = 'N'的判断。 装载新增订单时要连接过渡区的销售订单表以及所有相关的维度表, 从过渡表中获取订单金额和订单产品数量度量值, 从维度表中获取相关维度的代理键。
- 其他4个状态的处理和新增订单有所不同。 因为此时订单记录已经存在, 除了与特定状态相关的日期维度代理键和状态数量需要更改, 其他的信息不需要更新。 例如,当一个订单的状态由新增变为分配库房时, 只要使用订单号字段关联累积快照事实表和过渡区的事务表, 以事务表的order_status = 'A'为筛选条件, 更新累积快照事实表的状态日期代理键和状态数量两个字段即可。 对其他三个状态的处理是类似的, 只要将过滤条件换成对应的状态值, 并关联相应的日期维度视图获取日期代理键。
- 注意, 本示例中的累积周期快照表仍然是以订单号字段作为逻辑上的主键, 数据装载过程实际上是做了一个行转列的操作, 用源数据表中的状态行信息更新累积快照的状态列。 Hive目前没有多表更新功能, 所以用先删除再插入的方式替代。 之所以可以这样做, 是因为事务事实表中的订单号字段起到了主键的作用, 它能够唯一标识一行数据。 虽然处理方式相同, 但对于每种状态还是需要编写单独的HiveQL语句进行处理。