Azure data factory 05

创建data factory

  • 需要我们在azure里,有一个sql database, 一个sql server服务,一个gen2 data lake account


    image.png
  • 选择Data Factories服务并创建,创建成功后,我们launch进去,发现和synapse像


    image.png

使用data factory 将datalake的数据transform到sql pool里

1.创建Data source的链接


image.png
  1. 创建成功,我们可以看到里面的内容和文件


    image.png

    3.创建Destination的链接


    image.png
  2. 后面和Syanpse的pipeline一样

使用pipeline将csv文件转化为parquet 07

  1. 这次我们直接创建一个 新的pipeline


    image.png
  2. Source选择csv文件,Sink 需要我们新建一个gen2的任务并且将数据格式选为parquet
image.png
image.png
  1. 发布pipeline,在发布前记得validate,发布成功后,add trigger执行pipeline,会出现一个错误


    image.png

    4.上面错误的原因是,parquet的列名是不能有空格的,为了解决这个问题, 我们进入到刚才出错的pipeline里,找到mapping,这里的Mapping是azure自动给我们分配的表格的对应结构,我们修改里面的parquet的列名去掉空格


    image.png
  2. 修改完成后,validata成功后,然后publish,成功


    image.png

执行多个步骤的pipeline

我们的需求是:将datalake里的csv文件转换成parquet后,存入container的parquet文件夹后,在自动进行将这个parquet文件导入到sql pool里的pool_logdata_qarquet表里。我们将复用上面的已经创建好的转换格式的pipeline

  1. 首先,删除datalake01lg的container的parquet文件,确保我们pipeline1是有用的
  2. 我们在刚才的pipeline界面里,添加一个新的pipeline


    image.png

    4.设置Source,注意这里的source dataset不是具体的文件,因为在执行这2个pipeline前,还没有文件生成,所以这里的Source只是一个地址,指向了container里的parquet文件夹,然后我们选择woldcard file,来通配里面的parquet文件,因为这里只有一个logparquet,实际中,会有多个文件,慎重使用通配符


    image.png
  3. 设置Sink


    image.png

    6.设置成功后我们建立链接,pipeline1成功后,在执行pipeline2


    image.png
  4. publish and add trigger,成功后,我们可以看到datalake01lg里的log.csv,成功的转换了parquet格式,并且存储在了container的parquet文件夹下,并且这个parquet文件也被迁移到了synapse里pool的logdata_parquet表里


    image.png

自定义我们需要的数据类型,在传输的过程中

上面的传输,我们都使用了默认的格式,可以看出Time时间类型,在这里被默认成了string类型,

  1. 修改csv文件转parquet时后的Time类型,改为DateTime


    image.png
  2. 删除pool里的dlogdata_parquet表,创建新的表,将Time字段改为DateTime


    image.png

    3.建表语句

CREATE TABLE [logdata_parquet]
(
    [Correlationid] [varchar](200) NULL,
    [Operationname] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Eventcategory] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [datetime] NULL,
    [Subscription] [varchar](200) NULL,
    [Eventinitiatedby] [varchar](1000) NULL,
    [Resourcetype] [varchar](1000) NULL,
    [Resourcegroup] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL
)
  1. 然后发布pipeline,然后trigger,这样2个pipeline都运行成功,container里有了log.parquet 且被传到了sql pool里

使用query来进行数据Transfer 012

1.建立于sqldatabase的链接,
2.在sql pool里面创建factSales的事实表

  1. 创建pipeline,使用query


    image.png
  2. 执行pipeline,数据迁移成功


    image.png

使用copy command 复制数据到synapse 015

  • 使用copy command


    image.png

使用ploybase 复制数据到synapse 016

  1. 选择使用ploybase


    image.png

    2.在datalake的container里创建一个缓存文件夹staging


    image.png

    3.修改setting,添加刚才的staging
    image.png

Mapping data flows 017-019

我们在进行复杂的transformation的时候可以使用这个功能

使用mapping data flows 迁移fact table的数据 018

我们根据我们的刚才创建FactSales表的query来创建Data flow

SELECT dt.[ProductID],dt.[SalesOrderID],dt.[OrderQty],dt.[UnitPrice],hd.[OrderDate],hd.[CustomerID],hd.[TaxAmt]
  FROM [SalesLT].[SalesOrderDetail] dt
  LEFT JOIN [SalesLT].[SalesOrderHeader] hd
  ON dt.[SalesOrderID]=hd.[SalesOrderID]
  1. 添加一个source data 从detail表里


    image.png
  2. 添加left jion的另外一个表作为source data


    image.png
  3. 加入一个join


    image.png
  4. 设置join


    image.png
  5. 添加sink,就是destination table


    image.png

    6.设置sink里的Mapping


    image.png
  6. 设置完成后,点击validate all,确认无误,然后点击publish
  7. running我们的mapping data flow
  8. 创建一个pipeline,这里选择的是Data flow


    image.png

    10.设置Settings


    image.png

    11.设置完成,validate成功,publish,trigger的时候,会花很多的时间,相对于普通的transfer数据用时几秒,这里花费很多时间的原因是因为她再创建spark 集群服务
  • 为什么使用mapping data flow呢,因为它可以在不改变原始数据的情况下,进行数据传输,给了我们更加灵活的取数方法

Mapping data flow 迁移Dimension Table 020-021

DimCustomer表
  1. 在sql pool里创建一个dimension table DimCustomer
CREATE TABLE [dbo].[DimCustomer](
    [CustomerID] [int] NOT NULL,
    [CompanyName] varchar(200) NOT NULL,
    [SalesPerson] varchar(300) NOT NULL
)
    WITH  
(   
    DISTRIBUTION = REPLICATE
)   
  1. 创建一个新的data flow,添加source 数据


    image.png
  2. 添加一个sink(destination),注意里面的mapping,表头的对应关系


    image.png
  3. validate全部没错,点击Publish,发布成功,注意此时并没有执行,表里是没数据的
  4. 添加一个data flow的pipeline,并且add trigger


    image.png
DimProduct 表

1.在synapse 的Pool 里创建一个表,这里使用都是REPLICATE,因为他是dimension table

CREATE TABLE [dbo].[DimProduct](
    [ProductID] [int] NOT NULL,
    [ProductModelID] [int] NOT NULL,
    [ProductcategoryID] [int] NOT NULL,
    [ProductName] varchar(50) NOT NULL, 
    [ProductModelName] varchar(50) NULL,
    [ProductCategoryName] varchar(50) NULL
)
WITH  
(   
    DISTRIBUTION = REPLICATE
)   
  1. 根据下面sql创建我们的data flow
SELECT prod.[ProductID],prod.[Name] as ProductName,model.[ProductModelID],model.[Name] as ProductModelName,category.[ProductcategoryID],category.[Name] AS ProductCategoryName
FROM [SalesLT].[Product] prod
LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]
LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID]

2.1 创建一个Product 的source
2.2 创建一个ProductModel的source
2.3创建一个ProductCategory的source


image.png
  1. 根据FROM [SalesLT].[Product] prod LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]创建一个left join
    image.png

    4.再根据LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID] 创建另外一个left join
    image.png

    5.创建一个sink
    image.png
  2. 注意:每次添加sink的时候,都需要调整Mapping


    image.png

    7.最后,创建pipeline,注意,这里我们是两个Data flow的pipeline,所以,在trigger这个pipeline的时候,里面的所有data flow都会执行

添加一个derived column(派生列) 022

  • 我们可以在原来的Stream添加,也可以在新建的stream里添加。
  1. 创建一个新的FactSales表,增加TotalAmount 列
-- Lab - Derived Column

CREATE TABLE [dbo].[FactSales](
    [ProductID] [int] NOT NULL,
    [SalesOrderID] [int] NOT NULL,
    [CustomerID] [int] NOT NULL,
    [OrderQty] [smallint] NOT NULL,
    [UnitPrice] [money] NOT NULL,
    [OrderDate] [datetime] NULL,
    [TaxAmt] [money] NULL,
    [TotalAmount] [money] NOT NULL
)
WITH  
(   
    DISTRIBUTION = HASH (CustomerID)
)

2.在之前生成FactSales的Steam里的Sink之前,添加一个select column来选择我们需要的列


image.png

3.设置我们需要的列,这里有个技巧,先全选,然后使用filter 选掉我们需要的列,然后删除不需要的


image.png

4.在这个之后,添加一个derived column


image.png

5.添加一个需要的列和操作,这里我们需要使用价格*数量,算出总额


image.png

6.到sink里,添加新的列,这里注意,我们需要重新获取一下表格的schema,不然totoalAmount列出不来

image.png
  1. 这样sink里的就有了新的列TotalAmount


    image.png
  2. validate all 然后publish,最后add trigger,总额计算出来


    image.png

在Dimention Table里创建 surrogate keys 026 (未完待续)

  1. 修改DimProduct,添加一个surrogate key名为Product SK
CREATE TABLE [dbo].[DimProduct](
    [ProductSK] [int] NOT NULL,
    [ProductID] [int] NOT NULL,
    [ProductModelID] [int] NOT NULL,
    [ProductcategoryID] [int] NOT NULL,
    [ProductName] varchar(50) NOT NULL, 
    [ProductModelName] varchar(50) NULL,
    [ProductCategoryName] varchar(50) NULL
)
WITH  
(   
    DISTRIBUTION = REPLICATE
)
  1. 和上面一样,在sink之前先添加一个select的stream选择我们需要的列,然后修改mapping


    image.png

3.接着添加一个surrogate key


image.png

4.修改Sink里的schema,最后发布,整个流程结束


image.png

cache sink

  1. 创建一个新的Customer表,并且加上CustomerSK字段
CREATE TABLE [dbo].[DimCustomer](
    [CustomerSK] [int] NOT NULL,
    [CustomerID] [int] NOT NULL,
    [CompanyName] varchar(200) NOT NULL,
    [SalesPerson] varchar(300) NOT NULL
)
    WITH  
(   
    DISTRIBUTION = REPLICATE
)       
  1. 此时,我们更换source将之前的sql database里的customer表换成data lake gen2里container的Csv文件


    image.png

3.创建一个选择列的Stream,将csv里面我们需要的列选出来


image.png

4.更新一下sink里的mapping让customerSK显示出来

  1. publish and trigger, 成功,查看DimCustomer


    image.png

避免重复 handling duplicate 028

1.添加一个stream用来处理最新的写入的product的数据


image.png

2.添加一个处理重复的stream,在之前的ProductStream上


image.png

3.设置exsist,如果productID有就不插入
image.png

Filtering rows 029

  1. 创建一个新的data flow,将datalake里的parquet作为数据源
    2.创建一个sink,用来接收数据到pool里
  2. 创建一个filter,用来添加过滤的条件


    image.png

Generating Joson data 030

  1. 将parquet转换为Json数据


    image.png

Load json into SQL pool 031

1.在上面的Data flow上,我们可以添加一个Copy Data的pipeline


image.png

2.设置copy datad的 source 和sink


image.png

image.png
  1. 修改data flow里, sink输出的json名称


    image.png

    4.完成后,执行pipeline


    image.png

处理json数据 032

  1. 在container里有一个customer的json,结构如下


    image.png

2.需要将找个json格式的表,存入到pool里面的customer表里,表的结构如下

CREATE TABLE [Customercourse]
(
    [CustomerID] int,
    [CustomerName] varchar(200),
    [Registered] BIT,
    [Courses] varchar(200)
)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容