Data Flow SCD Type 2

1.基础版本只更新一个表

  • 创建一个sql表,用来存储数据Fifa数据
CREATE TABLE [dbo].[tbl_FIFAData](
    [SKey] [int] NOT NULL,
    [ID] [nvarchar](9) NOT NULL,
    [Name] [nvarchar](60) NOT NULL,
    [Age] [nvarchar](7) NULL,
    [Nationality] [nvarchar](60) NULL,
    [Club] [nvarchar](60) NULL,
    [Wage] [nvarchar](10) NULL,
    [PreferredFoot] [nvarchar](60) NULL,
    [Position] [nvarchar](60) NULL, 
    [Active] [int] NULL,
    [ActiveStartTime] [datetime2](7) NULL,
    [ActiveEndTime] [datetime2](7) NULL
) ON [PRIMARY]
GO
  • 具体流程


    image.png

    ①创建scd的参数;②分别创建两个source,一个是读取db里的旧数据,一个是读取blob里的新数据;③对newData和oldData的scd参数进行hash;④ 使用exists方法,对比新旧数据的哈希值,找到更新或者新增的所有数据;⑤为刚才改变的数据增加surrogateKey;⑥添加ActiveStatus的三个列;⑦选择有用的列;⑧找出只进行了更新操作的列

1. 创建scd的参数

创建一个SCD_ID参数,为了当数据进行更新时候使用;创建一个SDC_Columns参数,用来和旧的数据对比是否发生了变化

image.png

2. 创建source

创建一个读取sql里的旧数据的source 和一个读取blob里的新数据

3. 分别对newData和oldData里的SCD参数进行hash

使用Derived Column 分别生成对scd_id和scd_columns的hash列

image.png

注意:DataFlow里如果想要根据列名取数需要使用byname或者bynames方法

4. 使用exists方法获得所有改变了的数据

所有改变的数据包括了更新或者新增,所以我们的判断条件是scd_id和scd_columns,进行Doesn' t exists只获得newData变化的数据

image.png

5. 为发生改变的数据增加surrogateKey
  • 在newData分支创建一个名字为SKey的surrogateKey,刚好对应的是sql数据库里的SKey,但是这里有个bug,每次数据都是从1开始的,这不符合我们写入到sql表里的时候的需要接着旧表里的SKey。所以,我们需要首先读取到旧表里最大的SKey,然后加上新的SKey
  1. 计算oldData里的最大的SKey,直接使用Agggregate计算Max


    image.png

注意:这里的获取sql里的最大SKey的时候,一样使用了byname,由于他不是一个参数,所以SKey需要加引号,其次由于计算max需要的数字类型,原表里的是string,所以需要转换
2.newData里新建一个名字为SKey的surrogateKey,步长为1,起步为1,必须和sql表里的一致

image.png

3.join两个数据,将MaxSKey添加到每一行
image.png

注意:这里之所以使用cross join且条件是MaxSKey == SKey || true()的意思是永远都是真,其实就是想在所有的SKey后面添加一列MaxSKey,方便以后的计算
image.png

  1. 添加一个derived columns列,计算MaxSKey和SKey的和
    image.png

    注意:当首次使用的时候,由于旧表里的数据为空,所以计算出来的MaxSKey是null, 直接和SKey相加得到的也是null,所以使用iifnull()将null值转为0
6. 添加Active Status所需要的列

我们新增3个列,分别是Active表示该数据是否在使用,ActiveStartTime表示新数据写入的时间,ActiveEndTime表示数据什么时候结束使用的,null表示还在使用

image.png

注意:之所以这里的ActiveStartTime可以直接使用currentTime的原因是由于它使用的hash来区分新旧数据的,如果使用waterMarts方法的scd,时间必须是新数据表里的数据时间;如果想让时间类型显示为null,必须使用上面的写法

7. 使用select选择我们需要的列,删除不需要的列

这里我们使用role-based模式,使用表达式,直接排除我们不要列,留下有用的列。


image.png

至此,更新过的数据和新写入的数据的处理,我们都已经完成了,但是由于我们还需要将找出那些更新过的数据的,将Active Status 进行更新,例如将旧的数据的Active 状态改为0

8. 找出只进行了更新的列
  • 我们需要在oldData里的hash和newData里的已经匹配到的所有更新或者添加的数进行exists,这样就找到了oldData里哪些数据进行了更新
  1. 在hashOldData里派生出source
  2. 使用派生的source与newData里计算的所有更新或者更改的数据进行exists,判断的条件是hash_id,这样就可以找到,只进行了更新的数据,因为更新的数据他的ID没有变,是在原来的ID上面进行的操作


    image.png
9. 更改旧数据的状态为0
  • 这里,将旧数据Active改为0,并且将当前的时间添加为ActiveEndTime


    image.png
10. 选择相关的列

同上面一样

11. 使用alter rows确认更新
image.png
12. 使用alter rows在newData上确认insert
image.png
13. union 链接oldData的更新操作和newData的insert操纵
image.png
14. 设置sink将所有的插入和更新写入数据库
image.png

注意:这里在设置sink的时候,一定要将update勾选,不然会出现bug,无法更新;由于,更新时我们的标识符时ID字段,所以这里传入的时ID

2.动态的SCD typ2,对所有的表适用

  1. 更改上面的hard code的dataset,将dataset的信息写为动态的
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容