0. 项目地址
1. 创建本地sql的专用user,并且将密码存储到azure key-value里
1.1 创建本地专用user并授权
- 创建账号db101lg
create login db101lg with password= 'shang123'
create user db101lg for login db101lg
-
给该账号授权数据库和数据库的使用权限
1.2 创建azure key-value存储该账号的信息
-
使用之前先需要在access control里给当前的用户添加权限
1.对密码设置,用户名不用设置,方便管理
2.创建integrationg run time 提取本地数据库到Azure(E,L)
3. 复制onPrem的表到datalake里
3.1 创建link service,只使用secret
-
在使用secret之前,需要给data factory——df101lg在access control里授权
-
配置Link service
3.2 复制所有的表到blob里的broze
3.2.1动态获取所有数据库的schema的表
1.查询出来schema是sales
的所有表
SELECT
s.name AS SchemaName,
t.name AS TableName
FROM sys.tables t
INNER JOIN sys.schemas s
ON t.schema_id = s.schema_id
where s.name ='Sales'
3.2.2 创建pipleline
①lookup获取所有表名
区别于从blob里拿所有文件,这里我们从sql拿所有表的数据,需要先查询出表名,所以使用lookup,可以简单的理解,表用lookup,文件用get metadata
②循环表名,动态拼接sql,复制表
-
设置forEach循环
- 设置forEach循环里copy
- source里的表,一样不设置表明需要动态获取
@{concat('select * from ',item().SchemaName,'.',item().TableName)}
③设置sink层级关系
sink一样不用设置表名,动态添加
-
先添加两个参数
-
将参数填写到文件路径,根据文件层级拼接
#folder
@{concat(dataset().schema_name,'/',dataset().table_name)}
#fileName
@{concat(dataset().table_name,'.parquet')}
3.将ForEach循环的参数传递进入
④这里如果系统没有安装java,会报错,无法解析parquet
- 安装java 默认路径安装
3.设置系统环境变量
(1)搜索系统环境变量
(2)设置path
(3)添加java的路径,复制安装位置的bin
(4)删除之前默认添加的环境变量,将刚才添加的上移到第一个位置
如何安装
⑤ Trigger,运行成功
4.使用Databricks对原始数据进行处理(T)
4.1挂载blob里的container
1.创建bronze的mount
dbutils.fs.mount(source = 'wasbs://bronze@dl108lg.blob.core.windows.net',
mount_point= '/mnt/blob108_bronze',
extra_configs = {'fs.azure.account.key.dl108lg.blob.core.windows.net':'wOPYFJZ3KTUGBifHK+FRNytCVVFhobkoiCHRTDdGsoaRltNER'}
)
4.1.1 改进:将Mount里的Key改为secrets
- 进入到databricks里的Create Secret Scope,
https://<databricks-instance>#secrets/createScope
https://adb-1495688896694953.13.azuredatabricks.net/?o=1495688896694953#secrets/createScope
-
填写内容
-
创建好之后,在databricks页面时找不到的管理这个页面的,需要从databricks的cli里进入
4.给dl108lg的access-key创建一个screct
- 使用刚才我们创建好的名称和密码,在databricks里Mount
dbutils.fs.mount(source = 'wasbs://bronze@dl108lg.blob.core.windows.net',
mount_point= '/mnt/blob108_bronze',
{'fs.azure.account.key.dl108lg.blob.core.windows.net': dbutils.secrets.get('dl108lgScope','storage108lgKey')}
2.查看当前的所有Mounts
dbutils.fs.mounts()
4.2 处理各种数据的Transform(T)
4.2.1 bronze to silver修改日期格式并存储为delta
将所有的bronze的日期改为date格式,并且存入到silver层,以delta的格式
from pyspark.sql.functions import *
from pyspark.sql.types import *
table_name_silver = [i.name.split('/')[0] for i in dbutils.fs.ls('/mnt/blob108_bronze')]
for i in table_name:
path = '/mnt/blob108_bronze/Sales/'+ i + '/' +i + '.parquet'
df = spark.read.parquet(path)
columns = df.columns
for col in columns:
if 'Date' in col or 'date' in col:
df = df.withColumn(col, date_format(from_utc_timestamp(df[col].cast(TimestampType()),"UTC"),"yyyy-MM-dd"))
silver_path = '/mnt/blob108_silver/Sales'+ i +'/'
df.write.format('delta').mode('overwrite').save(silver_path)
4.2.2 silver to gold
将表名格式为AxxxBxxx
改为Axxx_Bxxx
table_name_silver = [i.name.split('/')[0] for i in dbutils.fs.ls('/mnt/blob108_silver')]
for i in table_name_silver:
path = '/mnt/blob108_silver/'+ i
df = spark.read.format('delta').load(path)
columns = df.columns
for old_col_name in columns:
new_col_name= "".join(["_" + char if char.isupper() and not old_col_name[i -1].isupper() else char for i, char in enumerate(old_col_name)]).lstrip("_")
df = df.withColumnRenamed(old_col_name, new_col_name)
out_put = '/mnt/blob108_gold/Sales/'+i+'/'
df.write.format('delta').mode('overwrite').save(out_put)
4.2.3 创建pipeline运行这两个Notebook
-
创建Link Servic
-
配置:这里使用access token登录
-
去databricks里面配置access token
- 填写好Access tokens,配置完成
4.3 Use Synapese 保存view(L)
4.3.1创建serverless数据库gold_db
用来将我们需要的表格转为视图
1.创建一个动态的procedure用来将表转为视图
Use gold_db
GO
CREATE OR ALTER PROC CreateSQLserverlessView_gold @ViewName nvarchar(100)
AS
BEGIN
DECLARE @statement VARCHAR(MAX)
SET @statement = 'CREATE OR ALTER VIEW ' + @ViewName + ' AS
SELECT *
FROM
OPENROWSET(
BULK ''https://dl108lg.dfs.core.windows.net/gold/Sales/' + @ViewName + '/'',
FORMAT = ''DELTA''
) as [result]
'
EXEC(@statement)
END
GO
注意:拼接字符串都用的单引!!!
2.创建serverlessdb的LinkService,由于是内置的,订阅里没有,需要手动填写;用户密码选择系统
3.去synapse里面找到endpoin
-
将需要使用serverless的用户添加,这里是df105lg
5.测试链接成功
-
创建pipeline将gold里的表格全部加载到serverless里的
①创建get metadata获取
②ForEach循环输出表名,将表名传递给procedure
- 运行pipeline所有的表都进入到了view里
4.4 poweBI 链接Synapse
-
一般使用的是microsoft认证,只需要将需要的人添加到里面即可;这里我们使用sql认证,为了简单直接在synapse里修改