创建data factory
-
需要我们在azure里,有一个sql database, 一个sql server服务,一个gen2 data lake account
image.png -
选择Data Factories服务并创建,创建成功后,我们launch进去,发现和synapse像
image.png
使用data factory 将datalake的数据transform到sql pool里
1.创建Data source的链接
-
创建成功,我们可以看到里面的内容和文件
image.png
3.创建Destination的链接
image.png - 后面和Syanpse的pipeline一样
使用pipeline将csv文件转化为parquet 07
-
这次我们直接创建一个 新的pipeline
image.png - Source选择csv文件,Sink 需要我们新建一个gen2的任务并且将数据格式选为parquet
-
发布pipeline,在发布前记得validate,发布成功后,add trigger执行pipeline,会出现一个错误
image.png
4.上面错误的原因是,parquet的列名是不能有空格的,为了解决这个问题, 我们进入到刚才出错的pipeline里,找到mapping,这里的Mapping是azure自动给我们分配的表格的对应结构,我们修改里面的parquet的列名去掉空格
image.png -
修改完成后,validata成功后,然后publish,成功
image.png
执行多个步骤的pipeline
我们的需求是:将datalake里的csv文件转换成parquet后,存入container的parquet文件夹后,在自动进行将这个parquet文件导入到sql pool里的pool_logdata_qarquet表里。我们将复用上面的已经创建好的转换格式的pipeline
- 首先,删除datalake01lg的container的parquet文件,确保我们pipeline1是有用的
-
我们在刚才的pipeline界面里,添加一个新的pipeline
image.png
4.设置Source,注意这里的source dataset不是具体的文件,因为在执行这2个pipeline前,还没有文件生成,所以这里的Source只是一个地址,指向了container里的parquet文件夹,然后我们选择woldcard file,来通配里面的parquet文件,因为这里只有一个logparquet,实际中,会有多个文件,慎重使用通配符
image.png -
设置Sink
image.png
6.设置成功后我们建立链接,pipeline1成功后,在执行pipeline2
image.png -
publish and add trigger,成功后,我们可以看到datalake01lg里的log.csv,成功的转换了parquet格式,并且存储在了container的parquet文件夹下,并且这个parquet文件也被迁移到了synapse里pool的logdata_parquet表里
image.png
自定义我们需要的数据类型,在传输的过程中
上面的传输,我们都使用了默认的格式,可以看出Time时间类型,在这里被默认成了string类型,
-
修改csv文件转parquet时后的Time类型,改为DateTime
image.png -
删除pool里的dlogdata_parquet表,创建新的表,将Time字段改为DateTime
image.png
3.建表语句
CREATE TABLE [logdata_parquet]
(
[Correlationid] [varchar](200) NULL,
[Operationname] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Eventcategory] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [datetime] NULL,
[Subscription] [varchar](200) NULL,
[Eventinitiatedby] [varchar](1000) NULL,
[Resourcetype] [varchar](1000) NULL,
[Resourcegroup] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL
)
- 然后发布pipeline,然后trigger,这样2个pipeline都运行成功,container里有了log.parquet 且被传到了sql pool里
使用query来进行数据Transfer 012
1.建立于sqldatabase的链接,
2.在sql pool里面创建factSales的事实表
-
创建pipeline,使用query
image.png -
执行pipeline,数据迁移成功
image.png
使用copy command 复制数据到synapse 015
-
使用copy command
image.png
使用ploybase 复制数据到synapse 016
-
选择使用ploybase
image.png
2.在datalake的container里创建一个缓存文件夹staging
image.png
3.修改setting,添加刚才的staging
image.png
Mapping data flows 017-019
我们在进行复杂的transformation的时候可以使用这个功能
使用mapping data flows 迁移fact table的数据 018
我们根据我们的刚才创建FactSales表的query来创建Data flow
SELECT dt.[ProductID],dt.[SalesOrderID],dt.[OrderQty],dt.[UnitPrice],hd.[OrderDate],hd.[CustomerID],hd.[TaxAmt]
FROM [SalesLT].[SalesOrderDetail] dt
LEFT JOIN [SalesLT].[SalesOrderHeader] hd
ON dt.[SalesOrderID]=hd.[SalesOrderID]
-
添加一个source data 从detail表里
image.png -
添加left jion的另外一个表作为source data
image.png -
加入一个join
image.png -
设置join
image.png -
添加sink,就是destination table
image.png
6.设置sink里的Mapping
image.png - 设置完成后,点击validate all,确认无误,然后点击publish
- running我们的mapping data flow
-
创建一个pipeline,这里选择的是Data flow
image.png
10.设置Settings
image.png
11.设置完成,validate成功,publish,trigger的时候,会花很多的时间,相对于普通的transfer数据用时几秒,这里花费很多时间的原因是因为她再创建spark 集群服务
- 为什么使用mapping data flow呢,因为它可以在不改变原始数据的情况下,进行数据传输,给了我们更加灵活的取数方法
Mapping data flow 迁移Dimension Table 020-021
DimCustomer表
- 在sql pool里创建一个dimension table DimCustomer
CREATE TABLE [dbo].[DimCustomer](
[CustomerID] [int] NOT NULL,
[CompanyName] varchar(200) NOT NULL,
[SalesPerson] varchar(300) NOT NULL
)
WITH
(
DISTRIBUTION = REPLICATE
)
-
创建一个新的data flow,添加source 数据
image.png -
添加一个sink(destination),注意里面的mapping,表头的对应关系
image.png - validate全部没错,点击Publish,发布成功,注意此时并没有执行,表里是没数据的
-
添加一个data flow的pipeline,并且add trigger
image.png
DimProduct 表
1.在synapse 的Pool 里创建一个表,这里使用都是REPLICATE,因为他是dimension table
CREATE TABLE [dbo].[DimProduct](
[ProductID] [int] NOT NULL,
[ProductModelID] [int] NOT NULL,
[ProductcategoryID] [int] NOT NULL,
[ProductName] varchar(50) NOT NULL,
[ProductModelName] varchar(50) NULL,
[ProductCategoryName] varchar(50) NULL
)
WITH
(
DISTRIBUTION = REPLICATE
)
- 根据下面sql创建我们的data flow
SELECT prod.[ProductID],prod.[Name] as ProductName,model.[ProductModelID],model.[Name] as ProductModelName,category.[ProductcategoryID],category.[Name] AS ProductCategoryName
FROM [SalesLT].[Product] prod
LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]
LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID]
2.1 创建一个Product 的source
2.2 创建一个ProductModel的source
2.3创建一个ProductCategory的source
- 根据
FROM [SalesLT].[Product] prod LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]
创建一个left join
image.png
4.再根据LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID]
创建另外一个left join
image.png
5.创建一个sink
image.png -
注意:每次添加sink的时候,都需要调整Mapping
image.png
7.最后,创建pipeline,注意,这里我们是两个Data flow的pipeline,所以,在trigger这个pipeline的时候,里面的所有data flow都会执行
添加一个derived column(派生列) 022
- 我们可以在原来的Stream添加,也可以在新建的stream里添加。
- 创建一个新的FactSales表,增加TotalAmount 列
-- Lab - Derived Column
CREATE TABLE [dbo].[FactSales](
[ProductID] [int] NOT NULL,
[SalesOrderID] [int] NOT NULL,
[CustomerID] [int] NOT NULL,
[OrderQty] [smallint] NOT NULL,
[UnitPrice] [money] NOT NULL,
[OrderDate] [datetime] NULL,
[TaxAmt] [money] NULL,
[TotalAmount] [money] NOT NULL
)
WITH
(
DISTRIBUTION = HASH (CustomerID)
)
2.在之前生成FactSales的Steam里的Sink之前,添加一个select column来选择我们需要的列
3.设置我们需要的列,这里有个技巧,先全选,然后使用filter 选掉我们需要的列,然后删除不需要的
4.在这个之后,添加一个derived column
5.添加一个需要的列和操作,这里我们需要使用价格*数量,算出总额
6.到sink里,添加新的列,这里注意,我们需要重新获取一下表格的schema,不然totoalAmount列出不来
-
这样sink里的就有了新的列TotalAmount
image.png -
validate all 然后publish,最后add trigger,总额计算出来
image.png
在Dimention Table里创建 surrogate keys 026 (未完待续)
- 修改DimProduct,添加一个surrogate key名为Product SK
CREATE TABLE [dbo].[DimProduct](
[ProductSK] [int] NOT NULL,
[ProductID] [int] NOT NULL,
[ProductModelID] [int] NOT NULL,
[ProductcategoryID] [int] NOT NULL,
[ProductName] varchar(50) NOT NULL,
[ProductModelName] varchar(50) NULL,
[ProductCategoryName] varchar(50) NULL
)
WITH
(
DISTRIBUTION = REPLICATE
)
-
和上面一样,在sink之前先添加一个select的stream选择我们需要的列,然后修改mapping
image.png
3.接着添加一个surrogate key
4.修改Sink里的schema,最后发布,整个流程结束
cache sink
- 创建一个新的Customer表,并且加上CustomerSK字段
CREATE TABLE [dbo].[DimCustomer](
[CustomerSK] [int] NOT NULL,
[CustomerID] [int] NOT NULL,
[CompanyName] varchar(200) NOT NULL,
[SalesPerson] varchar(300) NOT NULL
)
WITH
(
DISTRIBUTION = REPLICATE
)
-
此时,我们更换source将之前的sql database里的customer表换成data lake gen2里container的Csv文件
image.png
3.创建一个选择列的Stream,将csv里面我们需要的列选出来
4.更新一下sink里的mapping让customerSK显示出来
-
publish and trigger, 成功,查看DimCustomer
image.png
避免重复 handling duplicate 028
1.添加一个stream用来处理最新的写入的product的数据
2.添加一个处理重复的stream,在之前的ProductStream上
3.设置exsist,如果productID有就不插入
Filtering rows 029
- 创建一个新的data flow,将datalake里的parquet作为数据源
2.创建一个sink,用来接收数据到pool里 -
创建一个filter,用来添加过滤的条件
image.png
Generating Joson data 030
-
将parquet转换为Json数据
image.png
Load json into SQL pool 031
1.在上面的Data flow上,我们可以添加一个Copy Data的pipeline
2.设置copy datad的 source 和sink
-
修改data flow里, sink输出的json名称
image.png
4.完成后,执行pipeline
image.png
处理json数据 032
-
在container里有一个customer的json,结构如下
image.png
2.需要将找个json格式的表,存入到pool里面的customer表里,表的结构如下
CREATE TABLE [Customercourse]
(
[CustomerID] int,
[CustomerName] varchar(200),
[Registered] BIT,
[Courses] varchar(200)
)