基于binlog构建拉链表

前言:缓慢变化知识点回顾

缓慢变化分三种类型:

type1:当数据发生变化时,直接覆盖旧值


type2:当数据发生变化时,新增一行

这里的DWID 也就是我们日常项目使用的代理键CustomerKey

type3:当数据发生变化时,新增一列

传统的拉链表分类(按照实现时的方式):

增量拉链表:

        1.适用范围:         1).有增量字段可以识别数据更新,且业务数据变动时,增量时间会更新;2).数据不存在物理删除

        2.实现方式:         2).基于mysql表的增量数据实现,此处我们假设增量数据存放于stg_a001_table1,全量表存放于ods_a001_table1,uuid为构造字段,具体逻辑为,md5(concat(nvl(a),nvl(b)...)),也就是拉链比对字段空处理后拼接,再MD5的值

则具体实现可体现为:

select

    oat.ods_start_dt,

    if(sat.id is not null and oat.uuid<>sat.uuid,'${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新数据进行关链

    oat.uuid,

    oat.其它字段

from ods_a001_table1 oat

left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

union all

select

    '${ymd}' as ods_start_dt,

    '99991231' as ods_end_dt, -- 新增和更新数据进行开链

    sat.uuid,

    sat.其它字段

from stg_a001_table1 sat

where sat.ymd='${ymd}'

全量拉链表:

 1.适用范围:         1).无增量字段可以识别数据更新;2).数据存在物理删除

 2.实现方式:         2).基于mysql表的增量数据实现,此处我们假设同步全量数据存放于stg_a001_table1,全量表存放于ods_a001_table1

select

    oat.ods_start_dt,

    case when sat.id is null or (sat.id is not null and oat.uuid<>sat.uuid),'${ymd}',oat.ods_end_dt) as ods_end_dt, -- 对更新和删除进行关链

    oat.uuid,

    oat.其它字段

from ods_a001_table1 oat

left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

union all

select

    '${ymd}' as oat.ods_start_dt,

    '99991231' as ods_end_dt, -- 对插入和更新的开链

    sat.uuid,

    sat.其它字段

from stg_a001_table1 sat

left join ods_a001_table1 oat on oat.id=sat.id and oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'-- stg_a001_table1最好设置为分区表

where sat.ymd='${ymd}'

and (oat.id is null or (oat.id is not null and oat.uuid<>sat.uuid))

基于binlog的实现

binlog的数据我们通过canel已经写入kafka,所以此处只需要通过flume去实时消费对应的topic并写入hdfs即可。

1.对每天路径下的数据进行去重

1).创建外表tmp_stg_a001_table1映射到hdfs上的增量数据存储路径

2).通过row_number函数,按照主键,对数据进行去重,只保留最后一条数据,将数据写入表stg_a001_table1

2.基于去重数据对数据进行拉链处理( binlog_type为每条数据的操作类型 D标识删除 I为插入、U为更新)

select

    oat.ods_start_dt,

    if((sat.id is not null and oat.uuid<>sat.uuid) or sat.binlog_type='D','${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新和删除数据进行关链

    oat.uuid,

    oat.其它字段

from ods_a001_table1 oat

left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

union all

select

    '${ymd}' as ods_start_dt,

    '99991231' as ods_end_dt, -- 新增和更新数据进行开链

    sat.uuid,

    sat.其它字段

from stg_a001_table1 sat

where sat.ymd='${ymd}' and sat.binlog_type<>'D' -- 当日新增且当日删除的数据无需处理

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容