1.基础版本只更新一个表
- 创建一个sql表,用来存储数据Fifa数据
CREATE TABLE [dbo].[tbl_FIFAData](
[SKey] [int] NOT NULL,
[ID] [nvarchar](9) NOT NULL,
[Name] [nvarchar](60) NOT NULL,
[Age] [nvarchar](7) NULL,
[Nationality] [nvarchar](60) NULL,
[Club] [nvarchar](60) NULL,
[Wage] [nvarchar](10) NULL,
[PreferredFoot] [nvarchar](60) NULL,
[Position] [nvarchar](60) NULL,
[Active] [int] NULL,
[ActiveStartTime] [datetime2](7) NULL,
[ActiveEndTime] [datetime2](7) NULL
) ON [PRIMARY]
GO
-
具体流程
①创建scd的参数;②分别创建两个source,一个是读取db里的旧数据,一个是读取blob里的新数据;③对newData和oldData的scd参数进行hash;④ 使用exists方法,对比新旧数据的哈希值,找到更新或者新增的所有数据;⑤为刚才改变的数据增加surrogateKey;⑥添加ActiveStatus的三个列;⑦选择有用的列;⑧找出只进行了更新操作的列
1. 创建scd的参数
创建一个SCD_ID
参数,为了当数据进行更新时候使用;创建一个SDC_Columns
参数,用来和旧的数据对比是否发生了变化
2. 创建source
创建一个读取sql里的旧数据的source 和一个读取blob里的新数据
3. 分别对newData和oldData里的SCD参数进行hash
使用Derived Column 分别生成对scd_id和scd_columns的hash列
注意:DataFlow里如果想要根据列名取数需要使用byname或者bynames方法
4. 使用exists方法获得所有改变了的数据
所有改变的数据包括了更新或者新增,所以我们的判断条件是scd_id和scd_columns,进行Doesn' t exists
只获得newData变化的数据
5. 为发生改变的数据增加surrogateKey
- 在newData分支创建一个名字为SKey的surrogateKey,刚好对应的是sql数据库里的SKey,但是这里有个bug,每次数据都是从1开始的,这不符合我们写入到sql表里的时候的需要接着旧表里的SKey。所以,我们需要首先读取到旧表里最大的SKey,然后加上新的SKey
-
计算oldData里的最大的SKey,直接使用Agggregate计算Max
注意:这里的获取sql里的最大SKey的时候,一样使用了byname,由于他不是一个参数,所以SKey需要加引号,其次由于计算max需要的数字类型,原表里的是string,所以需要转换
2.newData里新建一个名字为SKey的surrogateKey,步长为1,起步为1,必须和sql表里的一致
3.join两个数据,将MaxSKey添加到每一行
注意:这里之所以使用cross join且条件是MaxSKey == SKey || true()的意思是永远都是真,其实就是想在所有的SKey后面添加一列MaxSKey,方便以后的计算
- 添加一个derived columns列,计算MaxSKey和SKey的和
注意:当首次使用的时候,由于旧表里的数据为空,所以计算出来的MaxSKey是null, 直接和SKey相加得到的也是null,所以使用iifnull()将null值转为0
6. 添加Active Status所需要的列
我们新增3个列,分别是Active表示该数据是否在使用,ActiveStartTime表示新数据写入的时间,ActiveEndTime表示数据什么时候结束使用的,null表示还在使用
注意:之所以这里的ActiveStartTime可以直接使用currentTime的原因是由于它使用的hash来区分新旧数据的,如果使用waterMarts方法的scd,时间必须是新数据表里的数据时间;如果想让时间类型显示为null,必须使用上面的写法
7. 使用select选择我们需要的列,删除不需要的列
这里我们使用role-based模式,使用表达式,直接排除我们不要列,留下有用的列。
至此,更新过的数据和新写入的数据的处理,我们都已经完成了,但是由于我们还需要将找出那些更新过的数据的,将Active Status 进行更新,例如将旧的数据的Active 状态改为0
8. 找出只进行了更新的列
- 我们需要在oldData里的hash和newData里的已经匹配到的所有更新或者添加的数进行exists,这样就找到了oldData里哪些数据进行了更新
- 在hashOldData里派生出source
-
使用派生的source与newData里计算的所有更新或者更改的数据进行exists,判断的条件是hash_id,这样就可以找到,只进行了更新的数据,因为更新的数据他的ID没有变,是在原来的ID上面进行的操作
9. 更改旧数据的状态为0
-
这里,将旧数据Active改为0,并且将当前的时间添加为ActiveEndTime
10. 选择相关的列
同上面一样
11. 使用alter rows确认更新
12. 使用alter rows在newData上确认insert
13. union 链接oldData的更新操作和newData的insert操纵
14. 设置sink将所有的插入和更新写入数据库
注意:这里在设置sink的时候,一定要将update勾选,不然会出现bug,无法更新;由于,更新时我们的标识符时ID字段,所以这里传入的时ID
2.动态的SCD typ2,对所有的表适用
- 更改上面的hard code的dataset,将dataset的信息写为动态的