数仓--DW--Hadoop数仓实践Case-04-数据定期装载

数据装载--定期装载

销售订单定期装载关系图.PNG

创建ods.ods_cdc_time

  • 本案例中source_order_dim维度表和sale_order_fact事实表使用基于时间戳的CDC装载模式。 为此在ods库中建立一个名为cdc_time的时间戳表, 这个表里有last_load和current_load两个字段。 之所以需要两个字段, 是因为抽取到的数据可能会多于本次需要处理的数据。 比如, 两点执行ETL过程, 则零点到两点这两个小时的数据不会在本次处理。 为了确定这个截止时间点, 需要给时间戳设定一个上限条件, 即这里的current_load字段值。 本示例的时间粒度为每天, 所以时间戳只要保留日期部分即可, 因此数据类型选为date。 这两个字段的初始值是“初始加载”执行日期的前一天, 本示例中为'2019-04-13'。 当开始装载时, current_load设置为当前日期。 在开始定期装载实验前, 先使用下面的脚本建立时间戳表。
    脚本创建命令:
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 10:32
-- @Description: TODO  初始化hive ods.ods_cdc_time表
-- @Version: V1.0 
-- ****************************************************
-- 创建ods.cdc_time
use ods;
drop table if exists ods_cdc_time;
-- 创建表
create external  table ods.ods_cdc_time(
last_load date,
current_load date
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
NULL defined as ''
STORED AS textfile
location "/user/root/dw/ods/ods_cdc_time";
-- 设置hive参数
--昨天的日期
set hivevar:last_load = date_add(current_date(),-1); 
-- 设置当前日期
set hivevar:cur_date = current_date();
-- 将数据插入到表中
insert overwrite table ods.ods_cdc_time
select
    ${hivevar:last_load},${hivevar:cur_date};

定期装载

sqoop定期装载

  • 利用sqoop定期装载整理拉取和增量拉取部分数据到hive ods层
  • 开发 "1-sqoop_extract_mysql2hive_daily.sh"
    执行脚本如下:
#!/bin/bash
# @Author:  LiYahui
# @Date:  Created in  2019/04/13 12:20
# @Description: TODO  sqoop定时抽取msyql中表到hive ods层
# @Version: V1.0
# 将consumer全量导入,overwrite的方式
echo "全量抽取consumer表"
sqoop import \
--connect jdbc:mysql://liyahui-02:3306/dw_source_data \
--username root \
--password liyahui \
--table consumer \
--num-mappers 4 \
--hive-import \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-table ods.ods_consumer

#将product全量导入的方式
echo "全量抽取product表"
sqoop import \
--connect jdbc:mysql://liyahui-02:3306/dw_source_data \
--username root \
--password liyahui \
--table product \
--num-mappers 4 \
--hive-import \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-table ods.ods_product
#对sale_order进行增量抽取
echo "增量抽取sale_order表"
sqoop job --exec myjob_incremental_import
echo "从mysql中抽取数据结束"

hive定期ETL

设置数据处理时间窗口ods.ods_cdc_time

  • 对于事实表, 我们采用基于时间戳的CDC增量装载模式, 时间粒度为天。 因此需要两个时间点, 分别是本次装载的起始时间点和终止时间点, 这两个时间点定义了本次处理的时间窗口, 即装载这个时间区间内的数据。 还要说明一点, 这个区间是左包含的, 就是处理的数据包括起始时间点, 但不包括终止时间点。 这样设计的原因是,我们既要处理完整的数据, 不能有遗漏, 又不能重复装载数据, 这就要求时间处理窗口既要连续, 又不能存在重叠的部分。 对于维度表, 除了要求相邻两个数据版本的时间段连续且不重叠之外, 为了表示当前版本的截止时间, 还需要一个很大的时间值, 大到足以满足数据仓库整个生命周期的需要, 本示例设置的是9999-12-31。
  • 为此我们在脚本中设置三个变量, 分别赋予起始时间点、 终止时间点、 最大时间点的值, 并且将时间戳表rds.cdc_time的last_load和current_load字段分别设置为起始时间点和终止时间点。 这些变量会在后面的脚本中多次引用。 顺便提一下, 这样设计还有一个好处是, 如果因为某种原因需要手工执行一个时间段内的数据装载,只需改变变量的赋值, 而不用修改脚本的执行逻辑。
  • 开发 "1-update_ods_cdc_time.hql " hive脚本
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 13:30
-- @Description: TODO  更新ods.ods_cdc_time窗口时间,时间粒度天 day
-- @Version: V1.0 
-- ****************************************************
-- 设置SCD的生效时间和结束时间
--结束日期(昨天得日期)
set hivevar:cur_date = current_date();
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
-- 设置CDC的上限时间
insert overwrite table ods.ods_cdc_time
select 
        last_load, ${hivevar:cur_date}
from
    ods.ods_cdc_time;
  • 说明:此项可以当作是需要进行初始化的操作,在定期脚本执行时默认ods.ods_cdc_time表已经创建,执行任务的时候只需要进行使用即可。

装载客户维度

  • 客户维度表的consumer_street_addresses字段值变化时采用SCD2, 需要新增版本, consumer_name字段值变化时采用SCD1, 直接覆盖更新。 如果一个表的不同字段有的采用SCD2, 有的采用SCD1, 就像客户维度表这样, 那么是先处理SCD2, 还是先处理SCD1呢? 为了回答这个问题, 我们看一个简单的例子。 假设有一个维度表包含c1, c2、 c3、 c4四个字段, c1是代理键, c2是业务主键, c3使用SCD1, c4使用SCD2。 源数据从1、 2、 3变为1、 3、 4。 如果先处理SCD1, 后处理SCD2, 则维度表的数据变化过程是先从1、 1、 2、 3变为1、 1、 3、 3, 再新增一条记录2、 1、 3、 4。 此时表中的两条记录是1、 1、 3、 3和2、 1、 3、 4。 如果先处理SCD2, 后处理SCD1, 则数据的变化过程是先新增一条记录2、 1、 2、 4, 再把1、 1、 2、 3和2、 1、 2、 4两条记录变为1、 1、 3、 3和2、 1、 3、 4。 可以看出, 无论谁先谁后, 最终的结果是一样的, 而且结果中都会出现一条实际上从未存在过的记录: 1、 1、 3、 3。 因为SCD1本来就不保存历史变化, 所以单从c2字段的角度看, 任何版本的记录值都是正确的, 没有差别。 而对于c3字段, 每个版本的值是不同的, 需要跟踪所有版本的记录。 我们从这个简单的例子可以得出以下结论: SCD1和SCD2的处理顺序不同, 但最终结果是相同的, 并且都会
    产生实际不存在的临时记录。 因此从功能上说, SCD1和SCD2的处理顺序并不关键, 只需要记住对SCD1的字段, 任意版本的值都正确, 而SCD2的字段需要跟踪所有版本。 但在性能上看, 先处理SCD1应该更好些, 因为更新的数据行更少。 本示例我们先处理SCD2。
  • 开发 "2-update_source_consumer_dim_scd2_scd1.hql" hive脚本
  • 业务处理逻辑为:先处理SCD2,在处理SCD1,再添加mysql表中新增的数据;SCD2中需要先修改变化列的过期时间,再将变化列的内容导入维表中。具体逻辑详见如下脚本:
part-01 处理SCD2变化列过期时间:
-- **********************************************
-- 装载客户维度,处理consumer_street_address  SCD2类型
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-01部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
--更新 设置已删除记录和 consumer_street_address 改变的 列上 SCD2过期日期 、consumer_indicator的值为Expired
update
        source.source_consumer_dim
set consumer_valid_to=${hivevar:pre_date} ,
    consumer_indicator=${hivevar:expired_indicator}
where 
    source_consumer_dim.consumer_key
in
(
select a.consumer_key 
from
-- 选择过期时间为"9999-12-31"的表内容,作为表a
    (select source_consumer_dim.consumer_key,consumer_number,consumer_street_address
        from 
            source.source_consumer_dim
        where 
            consumer_valid_to=${hivevar:max_date}
    ) a 
    -- join得出地址变化的列
    left join ods.ods_consumer b
    on
        a.consumer_number=b.customer_number
    -- 过滤出删除的列和地址改变的列
    where 
            b.customer_number is null 
        or
            a.consumer_street_address <> b.customer_street_address
 );
  • 语句说明,part-01是将source_consumer_dim中consumer_street_address变化的列过期时间设置为执行加载的前一天,内层的查询获取所有当前版本的数据。 外层查询使用一个左外连接查询出地址列发生变化的记录的代理键, 然后在update语句的where子句中用IN操作符, 更新这些记录的过期时间列。 left join的逻辑查询处理顺序是:
    (1) 执行a和b两个表的笛卡尔积。
    (2) 应用on过滤器: on a.consumer_number = b.consumer_number。
    (3) 添加外部行: a为保留表, 将不满足on条件的a表记录添加到结果集中。
    (4) 应用where过滤器: where b.consumer_number is null or a.consumer_street_address <> b.consumer_street_address, 其中b.consumer_number is null过滤出源数据中已经删除但维度表还存在的记录, a.consumer_street_address <> b.consumer_street_address过滤出源数据修改了地址信息的记录。
part-02 处理SCD2新增行

hive 脚本如下:

-- **********************************************
-- 装载客户维度,处理consumer_street_address  SCD2类型   新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-02部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
-- 处理source_consumer_dim的列customer_street_addresses上的新增行 SCD2方式
insert into
    source.source_consumer_dim
select 
-- 生成新的代理键
        row_number() over(order by t1.consumer_number)+t2.sk_max,
        t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
        t1.consumer_city,t1.consumer_province,t1.consumer_valid_from,t1.consumer_valid_to,
        "Current" as consumer_indicator,t1.consumer_version
from
    (
    select 
            b.customer_number as consumer_number,
            b.customer_name as consumer_name,
            b.customer_street_address as consumer_street_address,
            b.customer_zip_code as consumer_zip_code,
            b.customer_city as consumer_city,
            b.customer_state as consumer_province,
            a.consumer_version+1 as consumer_version,
            ${hivevar:pre_date} as consumer_valid_from,
            ${hivevar:max_date} as  consumer_valid_to
    from 
        source.source_consumer_dim  a
    inner join
        ods.ods_consumer b
    on
        a.consumer_number=b.customer_number and a.consumer_valid_to=${hivevar:pre_date}
    left join
        source.source_consumer_dim  c
    on
        a.consumer_number=c.consumer_number and c.consumer_valid_to=${hivevar:max_date}
    where 
        a.consumer_street_address <> b.customer_street_address and c.consumer_key is null
    ) t1
cross join
    (select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2;
  • 语句说明:上面这条语句插入SCD2的新增版本行。 子查询中用inner join获取当期版本号和源数据信息。 left join连接是必要的, 否则如果多次执行该语句, 会生成多条重复的记录。 最后用row_number()方法生成新记录的代理键。 新记录的版本号加1, 开始日期为执行时的前一天, 过期日期为“9999-12-31”。
part-03 处理SCD1
  • hive脚本执行如下:
-- **********************************************
-- 装载客户维度,处理consumer_name   SCD1类型  
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-03部分
-- *********************************************
--处理consumer_name 的SCD1,修改所有记录中的consumer_name
drop table if exists source.source_consumer_tmp;
-- 创建临时过渡表
create table source.source_consumer_tmp
as
select 
        a.consumer_key,a.consumer_number,
        b.customer_name,b.customer_street_address,b.customer_zip_code,
        a.consumer_city,a.consumer_province,a.consumer_valid_from,a.consumer_valid_to,a.consumer_indicator,a.consumer_version
from 
    source.source_consumer_dim a,ods.ods_consumer b
where 
    a.consumer_number=b.customer_number and (a.consumer_name <> b.customer_name);
--删除 source.source_consumer_dim中consumer_name修改的列
delete 
from 
    source.source_consumer_dim
where 
    consumer_key
in (select consumer_key from source.source_consumer_tmp);
-- 将source.source_consumer_tmp中数据插入到source.source_consumer_dim中;
insert into 
source.source_consumer_dim
select * from source.source_consumer_tmp;
part-04 处理ods.ods_consumer新增行
  • 执行脚本如下:
-- **********************************************
-- 装载客户维度,处理ods.ods_consumer 新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-04部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
-- 处理新增的customer数据
insert into 
    source.source_consumer_dim
select
        row_number() over(order by t1.consumer_number)+t2.sk_max,
        t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
        t1.consumer_city,t1.consumer_province,
        ${hivevar:pre_date},
        ${hivevar:max_date},
        'Current',
        1
from
    (
    select 
            a.customer_number as consumer_number,
            a.customer_name as consumer_name,
            a.customer_street_address as consumer_street_address,
            a.customer_zip_code as consumer_zip_code,
            a.customer_city as consumer_city,
            a.customer_state as consumer_province
    from
        ods.ods_consumer a
    left join 
        source.source_consumer_dim b
    on
        a.customer_number=b.consumer_number
    where 
        b.consumer_key is null
    ) t1
cross join
    (select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2
  • 语句说明:上面的语句装载新增的客户记录。 内层子查询使用rds.customer和dw.customer_dim的左外链接获取新增的数据。 新数据的版本号为1, 开始日期为执行时的前一天, 过期日期为“9999-12-31”。 同样使用row_number()方法生成代理键。

客户维度定期装载汇总脚本

  • 客户维度我们采用先处理SCD2类型,再处理SCD1类型。当然也可以采用处理SCD1再处理SCD类型。但是,处理方式内部的顺序必须是有序进行的,下面将脚本进行汇总。
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 13:30
-- @Description: TODO  定期装载source.source_consumer_dim,先对consumer_street_address 进行SCD2处理,再对consumer_name SCD1处理
-- @Version: V1.0 
-- ****************************************************
-- 装载客户维度
-- consumer_street_address  SCD2
-- consumer_name  SCD1
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
-- ***************************************************************
--更新 设置已删除记录和 consumer_street_address 改变的 列上 SCD2过期日期 、consumer_indicator的值为Expired
update
        source.source_consumer_dim
set consumer_valid_to=${hivevar:pre_date} ,
    consumer_indicator=${hivevar:expired_indicator}
where 
    source_consumer_dim.consumer_key
in
(
select a.consumer_key 
from
-- 选择过期时间为"9999-12-31"的表内容,作为表a
    (select source_consumer_dim.consumer_key,consumer_number,consumer_street_address
        from 
            source.source_consumer_dim
        where 
            consumer_valid_to=${hivevar:max_date}
    ) a 
    -- join得出地址变化的列
    left join ods.ods_consumer b
    on
        a.consumer_number=b.customer_number
    -- 过滤出删除的列和地址改变的列
    where 
            b.customer_number is null 
        or
            a.consumer_street_address <> b.customer_street_address
 );
-- ******************************************************
-- 处理source_consumer_dim的列customer_street_addresses上的新增行 SCD2方式
insert into
    source.source_consumer_dim
select 
-- 生成新的代理键
        row_number() over(order by t1.consumer_number)+t2.sk_max,
        t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
        t1.consumer_city,t1.consumer_province,t1.consumer_valid_from,t1.consumer_valid_to,
        "Current" as consumer_indicator,t1.consumer_version
from
    (
    select 
            b.customer_number as consumer_number,
            b.customer_name as consumer_name,
            b.customer_street_address as consumer_street_address,
            b.customer_zip_code as consumer_zip_code,
            b.customer_city as consumer_city,
            b.customer_state as consumer_province,
            a.consumer_version+1 as consumer_version,
            ${hivevar:pre_date} as consumer_valid_from,
            ${hivevar:max_date} as  consumer_valid_to
    from 
        source.source_consumer_dim  a
    inner join
        ods.ods_consumer b
    on
        a.consumer_number=b.customer_number and a.consumer_valid_to=${hivevar:pre_date}
    left join
        source.source_consumer_dim  c
    on
        a.consumer_number=c.consumer_number and c.consumer_valid_to=${hivevar:max_date}
    where 
        a.consumer_street_address <> b.customer_street_address and c.consumer_key is null
    ) t1
cross join
    (select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2;
-- ************************************************************
--处理consumer_name 的SCD1,修改所有记录中的consumer_name
drop table if exists source.source_consumer_tmp;
-- 创建临时过渡表
create table source.source_consumer_tmp
as
select 
        a.consumer_key,a.consumer_number,
        b.customer_name,b.customer_street_address,b.customer_zip_code,
        a.consumer_city,a.consumer_province,a.consumer_valid_from,a.consumer_valid_to,a.consumer_indicator,a.consumer_version
from 
    source.source_consumer_dim a,ods.ods_consumer b
where 
    a.consumer_number=b.customer_number and (a.consumer_name <> b.customer_name);
--删除 source.source_consumer_dim中consumer_name修改的列
delete 
from 
    source.source_consumer_dim
where 
    consumer_key
in (select consumer_key from source.source_consumer_tmp);
-- 将source.source_consumer_tmp中数据插入到source.source_consumer_dim中;
insert into 
source.source_consumer_dim
select * from source.source_consumer_tmp;
-- ************************************************
-- 处理新增的customer数据
insert into 
    source.source_consumer_dim
select
        row_number() over(order by t1.consumer_number)+t2.sk_max,
        t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
        t1.consumer_city,t1.consumer_province,
        ${hivevar:pre_date},
        ${hivevar:max_date},
        'Current',
        1
from
    (
    select 
            a.customer_number as consumer_number,
            a.customer_name as consumer_name,
            a.customer_street_address as consumer_street_address,
            a.customer_zip_code as consumer_zip_code,
            a.customer_city as consumer_city,
            a.customer_state as consumer_province
    from
        ods.ods_consumer a
    left join 
        source.source_consumer_dim b
    on
        a.customer_number=b.consumer_number
    where 
        b.consumer_key is null
    ) t1
cross join
    (select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2;

装载产品维度

  • 产品维度表的所有属性都使用SCD2, 处理方法和客户表类似。
  • 开发脚本 "3-update_source_product_dim_scd2.hql"
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 13:30
-- @Description: TODO   定期加载source.source_product_dim scd2
-- @Version: V1.0 
-- ****************************************************
-- 装载产品维度表
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
-- 设置已删除记录和 product_name product_category 的SCD2过期时间
update 
    source.source_product_dim
set product_valid_to=${hivevar:pre_date},product_indicator=${hivevar:expired_indicator}
where 
    source_product_dim.product_key
in
(
    select  
        a.product_key
    from
    ( 
    select 
        product_key,product_code,product_name,product_category
    from 
        source.source_product_dim
    where 
        product_valid_to=${hivevar:max_date}
    ) a
    left join 
        ods.ods_product b
    on
        a.product_code=b.product_code
    where 
        b.product_code is null
        or
        (a.product_name <> b.product_name or a.product_category <> b.product_category)
);
-- 处理product_name、product_category的变化内容
insert into 
    source.source_product_dim
select
        row_number() over(order by t1.product_code)+t2.sk_max,
        t1.product_code,t1.product_name,t1.product_category,
        t1.product_valid_from,t1.product_valid_to,t1.product_indicator,
        t1.product_version
from
(
    select 
            t2.product_code as product_code,
            t2.product_name as product_name,
            t2.product_category as product_category,
            ${hivevar:pre_date} as product_valid_from,
            ${hivevar:max_date} as product_valid_to,
            'Current' as product_indicator,
            t1.product_version+1 as product_version
    from
        source.source_product_dim t1
    inner join 
        ods.ods_product t2
    on 
        t1.product_code=t2.product_code and t1.product_valid_to=${hivevar:pre_date}
    left join
        source.source_product_dim t3
    on
        t1.product_code=t3.product_code  and t3.product_valid_to=${hivevar:max_date}
    where
        t3.product_key is null  and (t1.product_name <> t2.product_name or t1.product_category <> t2.product_category)
) t1
cross join
    (select coalesce(max(product_key),0) as sk_max from source.source_product_dim) t2;

-- 处理新增的product记录
insert into 
        source.source_product_dim
select
        row_number() over(order by t1.product_code)+t2.sk_max,
        t1.product_code,t1.product_name,t1.product_category,
        ${hivevar:pre_date} as product_valid_from,
        ${hivevar:max_date} as product_valid_to,
        'Current' as product_indicator,
        1 as product_version
from
(
    select 
            t1.*
    from
        ods.ods_product t1
    left join
        source.source_product_dim t2
    on
        t1.product_code=t2.product_code
    where 
        t2.product_key is null
) t1
cross join
    (select coalesce(max(product_key),0) as sk_max from source.source_product_dim) t2;

定期装载订单维度

  • 订单维度表的装载比较简单, 因为不涉及维度历史变化, 只要将新增的订单号插入source.source_order_dim表就可以了。
  • 开发脚本 "4-update_source_order_dim.hql"
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 13:30
-- @Description: TODO  定期装载source.source_order_dim
-- @Version: V1.0 
-- ****************************************************
-- 订单表不涉及到维度变化,直接插入就行,注意代理键
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
insert into 
    source.source_order_dim
select
        row_number() over(order by t1.order_number)+t2.sk_max,
        t1.order_number,t1.order_valid_from,
        t1.order_valid_to,t1.order_indicator,t1.order_version
from
    (
    select 
            order_number,
            order_date as order_valid_from,
            ${hivevar:max_date} as  order_valid_to,
            1 as order_version
    from
        ods.ods_sale_order,ods.ods_cdc_time
    where 
        entry_date>=last_load and entry_date < current_load
    ) t1
cross join
    (select coalesce(max(order_key),0) as sk_max from source.source_order_dim) t2;
  • 语句说明:上面语句的子查询中, 将过渡区库的订单表和时间戳表关联, 用时间戳表中的两个字段值作为时间窗口区间的两个端点, 用entry_date >= last_load AND entry_date <current_load条件过滤出上次执行定期装载的日期到当前日期之间的所有销售订单, 装载到order_dim维度表。

装载销售订单事实表

  • 为了装载dw.sale_order_fact事实表, 需要关联ods.ods_sale_order与source库中的四个维度表, 获取维度表的代理键和源数据的度量值。 这里只有销售金额字order_amount一个度量。 和订单维度一样, 也要关联时间戳表, 获取时间窗口作为确定新增数据的过滤条件。
  • 开发"5-update_dw_sale_order_fact.hql"
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 13:30
-- @Description: TODO  定期装载dw.sale_order_fact
-- @Version: V1.0 
-- ****************************************************
-- 装载订单事实表
insert into 
    dw.sale_order_fact
select 
        order_key,
        consumer_key,
        product_key,
        day_key,
        order_amount
from
    ods.ods_sale_order a,
    source.source_order_dim b,
    source.source_consumer_dim c,
    source.source_product_dim d,
    source.source_date_dim e,
    ods.ods_cdc_time f
where
        a.order_number=b.order_key
    and
        a.customer_number=c.consumer_number
    and
        a.order_date >=c.consumer_valid_from
    and
        a.entry_date < c.consumer_valid_to
    and
        a.product_code=d.product_code
    and
        a.order_date > d.product_valid_from
    and
        a.order_date < d.product_valid_to
    and
        to_date(a.order_date) = e.day_key
    and
        a.entry_date >= f.last_load
    and 
        a.entry_date < f.current_load
;

装载完毕,更新ods.ods_cdc_time

  • 最后更新时间戳表的数据, 将最后装载时间改为当前日期。
  • 开发脚本 "6-update_ods_cdc_time.hql"
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 13:30
-- @Description: TODO  定期装载完毕后,更新ods.ods_cdc_time
-- @Version: V1.0 
-- ****************************************************
-- 更新ods.ods_cdc_time 中的时间窗口
-- 更新时间戳表的last_load字段
-- 当前日期
set hivevar:cur_date = current_date();
-- 明天的日期
set hivevar:tomorrow_date = date_add(${hivevar:cur_date},1); 
insert overwrite 
table
  ods.ods_cdc_time 
select
    current_load, ${hivevar:tomorrow_date}
from
ods.ods_cdc_time;

定期加载总结

  • 定期装载中6步,合并在一起是整个的定期处理的逻辑。
  • 可以在mysql中添加或修改一些数据进行单元测试。
  • 可以采用SparkSql进行操作,将脚本语句进行修改即可。

Azkaban任务流调度

  • Azkaban任务流调度需要按照下面的逻辑顺序进行执行


    Azkaban调度流.PNG
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335