ADF项目(一)

0. 项目地址

项目地址

1. 创建本地sql的专用user,并且将密码存储到azure key-value里

1.1 创建本地专用user并授权

  1. 创建账号db101lg
create login db101lg with password= 'shang123'

create user db101lg for login db101lg
  1. 给该账号授权数据库和数据库的使用权限


    image.png

1.2 创建azure key-value存储该账号的信息

  • 使用之前先需要在access control里给当前的用户添加权限
    1.对密码设置,用户名不用设置,方便管理


    image.png

2.创建integrationg run time 提取本地数据库到Azure(E,L)

image.png

3. 复制onPrem的表到datalake里

3.1 创建link service,只使用secret

  • 在使用secret之前,需要给data factory——df101lg在access control里授权


    image.png
  1. 配置Link service


    image.png

    image.png

3.2 复制所有的表到blob里的broze

3.2.1动态获取所有数据库的schema的表

1.查询出来schema是sales的所有表

SELECT  
s.name AS SchemaName,
t.name AS TableName
FROM sys.tables t
INNER JOIN sys.schemas s
ON t.schema_id = s.schema_id
where s.name ='Sales'
image.png

3.2.2 创建pipleline

①lookup获取所有表名

区别于从blob里拿所有文件,这里我们从sql拿所有表的数据,需要先查询出表名,所以使用lookup,可以简单的理解,表用lookup,文件用get metadata

image.png

②循环表名,动态拼接sql,复制表

  1. 设置forEach循环


    image.png
  2. 设置forEach循环里copy
  • source里的表,一样不设置表明需要动态获取
@{concat('select * from ',item().SchemaName,'.',item().TableName)}
image.png

③设置sink层级关系

sink一样不用设置表名,动态添加

  1. 先添加两个参数


    image.png
  2. 将参数填写到文件路径,根据文件层级拼接


    image.png
#folder
@{concat(dataset().schema_name,'/',dataset().table_name)}
#fileName
@{concat(dataset().table_name,'.parquet')}

3.将ForEach循环的参数传递进入


image.png

④这里如果系统没有安装java,会报错,无法解析parquet

1.下载jdk8u201

  1. 安装java 默认路径安装
    3.设置系统环境变量
    (1)搜索系统环境变量
    (2)设置path
    image.png

    (3)添加java的路径,复制安装位置的bin
    image.png

    (4)删除之前默认添加的环境变量,将刚才添加的上移到第一个位置
    如何安装

⑤ Trigger,运行成功

4.使用Databricks对原始数据进行处理(T)

4.1挂载blob里的container

1.创建bronze的mount

dbutils.fs.mount(source = 'wasbs://bronze@dl108lg.blob.core.windows.net',
                          mount_point= '/mnt/blob108_bronze',
                          extra_configs = {'fs.azure.account.key.dl108lg.blob.core.windows.net':'wOPYFJZ3KTUGBifHK+FRNytCVVFhobkoiCHRTDdGsoaRltNER'}
)

4.1.1 改进:将Mount里的Key改为secrets

  1. 进入到databricks里的Create Secret Scope,https://<databricks-instance>#secrets/createScope
https://adb-1495688896694953.13.azuredatabricks.net/?o=1495688896694953#secrets/createScope
  1. 填写内容


    image.png

    image.png
  2. 创建好之后,在databricks页面时找不到的管理这个页面的,需要从databricks的cli里进入


    image.png

    4.给dl108lg的access-key创建一个screct


    image.png
  3. 使用刚才我们创建好的名称和密码,在databricks里Mount
dbutils.fs.mount(source = 'wasbs://bronze@dl108lg.blob.core.windows.net',
                          mount_point= '/mnt/blob108_bronze',
{'fs.azure.account.key.dl108lg.blob.core.windows.net': dbutils.secrets.get('dl108lgScope','storage108lgKey')}

2.查看当前的所有Mounts

dbutils.fs.mounts()
image.png

4.2 处理各种数据的Transform(T)

4.2.1 bronze to silver修改日期格式并存储为delta

将所有的bronze的日期改为date格式,并且存入到silver层,以delta的格式

from pyspark.sql.functions import *
from pyspark.sql.types import *

table_name_silver = [i.name.split('/')[0] for i in dbutils.fs.ls('/mnt/blob108_bronze')]

for i in table_name:
    path = '/mnt/blob108_bronze/Sales/'+ i + '/' +i + '.parquet'
    df = spark.read.parquet(path)
    columns = df.columns

    for col in columns:
        if 'Date' in col or 'date' in col:
            df = df.withColumn(col, date_format(from_utc_timestamp(df[col].cast(TimestampType()),"UTC"),"yyyy-MM-dd"))

    silver_path = '/mnt/blob108_silver/Sales'+ i +'/'
    df.write.format('delta').mode('overwrite').save(silver_path)

4.2.2 silver to gold

将表名格式为AxxxBxxx改为Axxx_Bxxx

table_name_silver = [i.name.split('/')[0] for i in dbutils.fs.ls('/mnt/blob108_silver')]

for i in table_name_silver:
    path = '/mnt/blob108_silver/'+ i 
    df = spark.read.format('delta').load(path)
    columns = df.columns

    for old_col_name in columns:
        new_col_name= "".join(["_" + char if char.isupper() and not old_col_name[i -1].isupper() else char for i, char in enumerate(old_col_name)]).lstrip("_")
        df = df.withColumnRenamed(old_col_name, new_col_name) 
        
    out_put = '/mnt/blob108_gold/Sales/'+i+'/'
    df.write.format('delta').mode('overwrite').save(out_put)

4.2.3 创建pipeline运行这两个Notebook

  1. 创建Link Servic


    image.png
  2. 配置:这里使用access token登录


    image.png
  3. 去databricks里面配置access token


    image.png
  4. 填写好Access tokens,配置完成

4.3 Use Synapese 保存view(L)

4.3.1创建serverless数据库gold_db用来将我们需要的表格转为视图

1.创建一个动态的procedure用来将表转为视图

Use gold_db
GO

CREATE OR ALTER PROC CreateSQLserverlessView_gold @ViewName nvarchar(100)
AS
BEGIN

DECLARE @statement VARCHAR(MAX)

    SET @statement =  'CREATE OR ALTER VIEW ' + @ViewName + ' AS
        SELECT *
        FROM 
            OPENROWSET(
            BULK ''https://dl108lg.dfs.core.windows.net/gold/Sales/' + @ViewName + '/'',
            FORMAT = ''DELTA''
            ) as [result]
    '
    
EXEC(@statement)

END
GO

注意:拼接字符串都用的单引!!!

2.创建serverlessdb的LinkService,由于是内置的,订阅里没有,需要手动填写;用户密码选择系统


image.png

3.去synapse里面找到endpoin


image.png
  1. 将需要使用serverless的用户添加,这里是df105lg


    image.png

    5.测试链接成功

  2. 创建pipeline将gold里的表格全部加载到serverless里的
    ①创建get metadata获取


    image.png

    ②ForEach循环输出表名,将表名传递给procedure


    image.png
  3. 运行pipeline所有的表都进入到了view里

4.4 poweBI 链接Synapse

  • 一般使用的是microsoft认证,只需要将需要的人添加到里面即可;这里我们使用sql认证,为了简单直接在synapse里修改


    image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容