Azure Data Factory 中的 Data Flow主要用于做数据mapping等操作。
本文以一个示例说明如何通过Azure Data Factory 的 Data Flow组件,把多个source源的相关数据联系起来并输出。
数据源1: AzureSQL/AdventureWorks/Product
数据源2: AzureSQL/AdventureWorks/ProductModel
关联字段: Product.ProductModelID = ProductModel.ProductModelID
最终的数据结果集:所有Product,不管是否存在ProductModel都需要输出,如果存在ProductModel,则输出额外的ModelName,如果不存在,则ModleName设置为 N/A
说明:该示例只是为了演示Data Flow的用法,实际业务场景中,这种情况就直接用sql join来实现了。
1. 创建Data Flow
2. 创建Mapping Flow
2.1 Data Flow工作区域说明
下图你可以看到有这几部分
- 1.新建的数据源
- Data Flow debug - enable了这个可以一边设计一边data preview
- 这个区域拖拽DataFlow流程中的组件,可以通过+,组件很丰富
-
这个区域是对区域3部分组件的属性设置
image.png
-
DataFlow Component1
|
DataFlow Component2
|
---|
2.2 Data Flow相关组件说明
下面这个DataFlow就是为了实现本文一开始说的业务流程的。
- 数据源的定义,这里有两个源,分别是来自Azure SQL的两张表
- 定义关联方式(left,inner等),关联字段
- 定义output输出的字段
- 在3的基础上,业务需要额外把ProductModelName为空的数据设置为N/A
- 输出定义对应的data set
对于组件5 Sink,
需要注意,这里有个Optimize属性,设置partition。
- Use current partition
- Single partition (只有一个partition,如果数据量大的话性能会有影响)
-
Set partition (可以设置不同的partition type和partition个数)
由于DataFlow的所有信息会自动被Azure提交到Azure Databrick组件计算,所以如果partition设置为 use current partition, 那么Azure就会按照default partition机制运算,并且将结果保存在blob storage上。而不是我们期望的一个文件。具体看2.3测试部分的结果
image.png
2.3 测试
由于DataFlow这个组件自身没法运行,需要新建Pipeline中通过DataFlow组件使用,如下图
点击Debug
2.3.1 DataFlow/Sink(5)组件Partition - Current partition
得到的在blob container中看到的结果如下
2.3.2 DataFlow/Sink(5)组件Parition - Single partition
除了Optimize的parition组件选择 Single parititon,如果需要输出到一个文件里面,还需要设置 File name option -> Output to single file
Storage Explorer 可以查看到products container下面的文件