ADF realtime Scenario

0.项目地址

Azure Data Factory Real Time Scenarios

1. Mapping DataFlow处理错误的数据

文件:department_error_date.csv

  • 情景:一张有错误数据的csv需要Copy到sql表里,例如Date日期列的格式有错误,需要将正确的数据存入到目标表里,错误的数据记录起来

1.1 重点

  • DataFlow的condiction split用于将错误和正确的分表
  • DataFlow的Derived column 用于1.派生新列,可以是其他列的运算 2. 转换格式不对的列的格式
  • csv to sql时,列的type一定要正确

1.2 例子

image.png

2. 动态获取folder的所有文件名称

  • 情景:获取blob名为raw文件夹下的所有文件名

2.1 实现

  1. 使用getMetadata 获取文件夹的所有childItems


    image.png
  2. ForEach循环childItems
    3.在ForEach循环里获取每个childItem的name(之后就可以用来复制)

3. 增量复制最新的编辑的表/删除60天之前的表

Incrementallly copy new and changed files bases on last modified

  • 情景(Copy):一个blob-input文件夹里,时不时的会进来新的一批文件,需要每隔5分钟检查,并且将新进来csv文件复制output文件夹里
  • 情景(Delete): 删除input文件夹里60天之前的所有文件

3.1重点

copy

  • Timbling windwos trigger 设置
  • copy 里的filter by last modified
  • 通配符匹配所有csv文件
  • 当前时间utcnow()以及adddays(utcnow(),-day)
    delete
  • 设置filter by last modified
  • for循环动态删除文件

3.2例子

pass

5. 使用dataflow修复一列都为字符串的csv文件

  • 情景:有一个一行都是string的文件,需要根据他的字段分成3列


    image.png

5.1 重点

  • 使用DataFlow里的Derived修改
  • 使用dynamic content的字符串方法substring
  • 使用dataflow的select方法选择需要的列

5.2 使用derived修改数据都在一行的表

  • 现有一张表的数据如下,表头都在一行,表的值也都在一行


    image.png

    0.整体流程


    image.png
  1. 读取source,注意不要选择first row as header,之后使用skip line跳过这个表头,就得到了纯净的数据


    image.png

    2.derived column,这里我们添加新的列,内容是substring 字符串


    image.png
  2. 使用select 删除之前错误的列,保留新的2列数据
  3. 设置sink,注意如果只想输出一个文件,选择single partition
    image.png

7.使用dataflow删除重复的行

文件:employee_duplicated.csv

  • 情景:现有一张csv表,其中有几行数据是重复的,其中除了ID重复,里面的字段也是重复的


    image.png

7.1 去重方法一:

1.先根据name和country对employee进行分组group by


image.png
  1. 然后对剩下不是country 和name的列,只取他们的第一列


    image.png

    3.结果


    image.png

7.1 去重方法二:使用sha2创建finger print

8. 使用dataflow合并一个无ID的表

文件:employee_key0,employee_key1
思路:首先计算出来原来有Id的文件的最大文件id
1.derivedColumn:在有ID的文件中,创建一个虚拟列,dummpy
2.groupby dummpy然后计算出最大的max id
3.join:然后将计算好的于没有ID的key0文件cross join ,条件是1==1
4.surrogate key:然后添加一个自增的surrogate key
5.derivedColumn:添加一个新的id列,用最大id+surrogate key,就得到了新的id
6.select:选择需要的列
7.new branch: key1添加新的分支
8.union: 新表和Key1进行union
9.sink:完成

9.滚动 和 running total

使用window

10. log ADF pipeline

image.png

需要两个data flow,判断今天是否存在了log,有就append,无就create

  • data flow1:df_log该df主要用来创建日期为2022-01-01_log.csv的文件
    1.创建一个dummy.csv用来做起始文件(注意:dummy文件一定是正确的cvs格式,不然写不进去数据)
    2.derived column:添加log所需要要的字段
    image.png

    3.由于ADFName这些字段是从pipeline的参数过来的,所以我们需要创建parameters用来接收一会传递进来的系统参数
    image.png

    4.select:将原来的dummy的列删除,只保留Log需要的列
    5.sink:由于文件名是以当天时间为准,所以我们需要动态的添加文件名,在parameter里添加LogFileName
    image.png
  • data flow2:df_log_append:该df主要是在已有当天Log文件前提下,添加新的log
  1. clone上面的data flow1

11. 慢修改

读取新的数据,如果有更新且新增了数据,更改原表将新数据合并 alter row,目标数据库是sql.

  • 使用upsert,"Upsert" 一词是 "update" 和 "insert" 的合成,它表示一种在目标数据存储中执行更新操作,如果记录不存在则执行插入操作的操作。具体来说,当在数据源中找到匹配的记录时,执行更新操作;当没有找到匹配记录时,执行插入操作。

12. 获取文件夹的所有文件数

注意:.childItmes返回的是array.length可以读取array的长度,返回 是一个object,需要转为string


image.png

13. 在copy过程中,添加新的列

直接在设置里,选择


image.png

14-15. 使用concat和join将array转为string

16. 验证文件得格式

image.png

17. 慢修改2

image.png

每次有新的数据,将旧的数据的isActive改为0,新的为1,并且新表的添加一个Surrkey来区分

19.执行带参数和返回值的procedure

直接使用pipleline里的procedure是无法执行带返回值的,需要使用loop up直接


image.png

20. 获取blob里最新的文件get latest file in blob

21. 动态的Mapping

就是将mapping的json,写去表里,或者文件中,使用getmeta读出来


image.png

22. 将多行合并成一行

image.png
  • 将左边的表 合并成位右表:使用collect 方法

23. 当报错时候,发送提醒 send email when pipeline fails

没看

24. 一行拆成多行

image.png
  • 将上面的表分为多行和22相反,先用split分开,再用flatten将array扁平

28. 时区转换

用转为时间戳然后进行加减法

29. 执行一个活动,如果任何一个分支发生了错误

Run an activity if any one of set of activities fail

30. Get Error message of Failed activities in Pipeline

36. 在look up中执行创建表的sql query

create table employee (id int)
select 1 as abc

37. 查看某天是否在某周之内

  • 查看 2023-10-01 是否是 2023- 10-07的周

39. 查看数据更改的列

使用sha2和exists来判断

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容