0.项目地址
Azure Data Factory Real Time Scenarios
1. Mapping DataFlow处理错误的数据
文件:department_error_date.csv
- 情景:一张有错误数据的csv需要Copy到sql表里,例如Date日期列的格式有错误,需要将正确的数据存入到目标表里,错误的数据记录起来
1.1 重点
- DataFlow的condiction split用于将错误和正确的分表
- DataFlow的Derived column 用于1.派生新列,可以是其他列的运算 2. 转换格式不对的列的格式
- csv to sql时,列的type一定要正确
1.2 例子
2. 动态获取folder的所有文件名称
- 情景:获取blob名为raw文件夹下的所有文件名
2.1 实现
-
使用getMetadata 获取文件夹的所有childItems
- ForEach循环childItems
3.在ForEach循环里获取每个childItem的name(之后就可以用来复制)
3. 增量复制最新的编辑的表/删除60天之前的表
Incrementallly copy new and changed files bases on last modified
- 情景(Copy):一个blob-input文件夹里,时不时的会进来新的一批文件,需要每隔5分钟检查,并且将新进来csv文件复制output文件夹里
- 情景(Delete): 删除input文件夹里60天之前的所有文件
3.1重点
copy
- Timbling windwos trigger 设置
- copy 里的filter by last modified
- 通配符匹配所有csv文件
- 当前时间
utcnow()
以及adddays(utcnow(),-day)
delete - 设置filter by last modified
- for循环动态删除文件
3.2例子
pass
5. 使用dataflow修复一列都为字符串的csv文件
-
情景:有一个一行都是string的文件,需要根据他的字段分成3列
5.1 重点
- 使用DataFlow里的Derived修改
- 使用dynamic content的字符串方法
substring
- 使用dataflow的select方法选择需要的列
5.2 使用derived修改数据都在一行的表
-
现有一张表的数据如下,表头都在一行,表的值也都在一行
0.整体流程
-
读取source,注意不要选择first row as header,之后使用skip line跳过这个表头,就得到了纯净的数据
2.derived column,这里我们添加新的列,内容是substring 字符串
- 使用select 删除之前错误的列,保留新的2列数据
- 设置sink,注意如果只想输出一个文件,选择single partition
7.使用dataflow删除重复的行
文件:employee_duplicated.csv
-
情景:现有一张csv表,其中有几行数据是重复的,其中除了ID重复,里面的字段也是重复的
7.1 去重方法一:
1.先根据name和country对employee进行分组group by
-
然后对剩下不是country 和name的列,只取他们的第一列
3.结果
7.1 去重方法二:使用sha2创建finger print
8. 使用dataflow合并一个无ID的表
文件:employee_key0,employee_key1
思路:首先计算出来原来有Id的文件的最大文件id
1.derivedColumn:在有ID的文件中,创建一个虚拟列,dummpy
2.groupby dummpy然后计算出最大的max id
3.join:然后将计算好的于没有ID的key0文件cross join ,条件是1==1
4.surrogate key:然后添加一个自增的surrogate key
5.derivedColumn:添加一个新的id列,用最大id+surrogate key,就得到了新的id
6.select:选择需要的列
7.new branch: key1添加新的分支
8.union: 新表和Key1进行union
9.sink:完成
9.滚动 和 running total
使用window
10. log ADF pipeline
需要两个data flow,判断今天是否存在了log,有就append,无就create
- data flow1:df_log该df主要用来创建日期为
2022-01-01_log.csv
的文件
1.创建一个dummy.csv用来做起始文件(注意:dummy文件一定是正确的cvs格式,不然写不进去数据)
2.derived column:添加log所需要要的字段
3.由于ADFName这些字段是从pipeline的参数过来的,所以我们需要创建parameters用来接收一会传递进来的系统参数
4.select:将原来的dummy的列删除,只保留Log需要的列
5.sink:由于文件名是以当天时间为准,所以我们需要动态的添加文件名,在parameter里添加LogFileName
- data flow2:df_log_append:该df主要是在已有当天Log文件前提下,添加新的log
- clone上面的data flow1
11. 慢修改
读取新的数据,如果有更新且新增了数据,更改原表将新数据合并 alter row,目标数据库是sql.
- 使用upsert,"Upsert" 一词是 "update" 和 "insert" 的合成,它表示一种在目标数据存储中执行更新操作,如果记录不存在则执行插入操作的操作。具体来说,当在数据源中找到匹配的记录时,执行更新操作;当没有找到匹配记录时,执行插入操作。
12. 获取文件夹的所有文件数
注意:.childItmes返回的是array.length可以读取array的长度,返回 是一个object,需要转为string
13. 在copy过程中,添加新的列
直接在设置里,选择
14-15. 使用concat和join将array转为string
16. 验证文件得格式
17. 慢修改2
每次有新的数据,将旧的数据的isActive改为0,新的为1,并且新表的添加一个Surrkey来区分
19.执行带参数和返回值的procedure
直接使用pipleline里的procedure是无法执行带返回值的,需要使用loop up直接
20. 获取blob里最新的文件get latest file in blob
21. 动态的Mapping
就是将mapping的json,写去表里,或者文件中,使用getmeta读出来
22. 将多行合并成一行
- 将左边的表 合并成位右表:使用collect 方法
23. 当报错时候,发送提醒 send email when pipeline fails
没看
24. 一行拆成多行
- 将上面的表分为多行和22相反,先用split分开,再用flatten将array扁平
28. 时区转换
用转为时间戳然后进行加减法
29. 执行一个活动,如果任何一个分支发生了错误
Run an activity if any one of set of activities fail
30. Get Error message of Failed activities in Pipeline
36. 在look up中执行创建表的sql query
create table employee (id int)
select 1 as abc
37. 查看某天是否在某周之内
- 查看 2023-10-01 是否是 2023- 10-07的周
39. 查看数据更改的列
使用sha2和exists来判断