1.创建项目
1.1 创建多个项目
这次我们不使用命令行配置,直接更改profiles.yml
- 在
profiles.yml
新建一个环境dbt_kahan,项目里的yml和profiles.yml名字要对应
image.png - 填写
profiles.yml
的配置信息(多项目)
image.png
3.测试
dbt debug
1.2 创建多个staging来区分不同的database
image.png
-
根据snowflake上面的数据库名称创建对应的staging
image.png - 创建需要的表的
schema.yml
version: 2
sources:
- name: snowflake_sample_data #dbt里引用的名字
database: SNOWFLAKE_SAMPLE_DATA #实际数据库的库名
schema: TPCDS_SF10TCL_OLD #数据库的schema名称
tables:
- name: STORE_SALES #数据库里的表名
- 创建一个dag名为
ssr_store_sales.sql
,用来将sources里的表,移动到demo_db的view里
WITH ssr_store_sales as(
select * from {{ source('snowflake_sample_data', 'STORE_SALES') }}
)
select * from ssr_store_sales
2. 定制自己的schema
- 默认情况下,我们在最开始配置
profiles.yml
已经hard code了schema为public
image.png
但是,实际工作中,我们根据业务的处理用到很多的schema,所以需要我们根据业务的不同场景来切换staging
- 添加macros,用来更改默认的schema的
generate_schema.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
- 修改
dbt_project.yml
,添加名为staging
的schema,这样在staing文件夹下的所有的表,都会进入到snowflake的staging下
models:
dbt_kahan:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
+schema: staging
image.png
3. 对dbt的表进行持久化materialization
3.1 方法一:整体修改dbt_project.yml
- 如果我们想对表进行各种持久化的操作需要用materialization,例如将表改为table,view等,可以在dbt_project.yml直接修改
- 将staging下example的表改为table,默认是view
models:
dbt_kahan:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
+schema: staging
example:
+materialized: table
image.png
3.2 方法二:对单个modol进行修改
- 将上面的first example改为view,在my_first_example_dbt.sql里添加
image.png
- macros 本质是test
- seed 里面的数据应该是很少变动的,用来被引用的
3.3 ephemeral
- 不是snowflake或者其他工具的内置类型,只是dbt自定义的,所以将model设置为ephemeral不会出现在snowflake里,只会在dbt内部当作CTE来使用
- 优点:可以当作表来使用,可以保持数据库干净
- 缺点:难以调试和显示
4. 添加变量
- 在
dbt_project.yml
添加变量
name: 'dbt_kahan'
version: '1.0.0'
config-version: 2
vars:
current_champion: Lakers
- 使用变量
with champion_team as (
select *,
case when team = '{{var("current_champion")}}' then 'is_champion else NULL END as flag_champion
from teams
6. Hooks
当我们想在model执行的之前或者之后执行一些重复操作的时候,不需要每个Model都添加操作,可以使用Hooks来
6.1 dbt运行前或者后执行操作
- 在
dbt run
之前和之后,执行我们定义的sql statement,在dbt test
这类操作也可以添加测试前的hooks
on-run-start:
- "{% for schema in schemas %}grant usage on schema {{ schema }} to group reporter; {% endfor %}"
on-run-end:
"{{ grant_select(schemas) }}"
models:
dbt_kahan:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
+schema: staging
example:
+materialized: table
6.2 model级别的Hooks
- 有两种添加方式,
- 方法一:直接去
dbt_project.yml
指定的model下添加
- 执行所有Model之前,先给example下的所有Model授权给REPORTER角色,然后执行完所有Model之后,在移除PC的使用权限
models:
dbt_kahan:
staging:
+materialized: view
+schema: staging
example:
+pre-hook:
- "GRANT SELECT ON {{ this }} TO ROLE REPORTER"
+post-hook:
- "REMOVE SELECT ON {{ this }} TO ROLE PC"
+materialized: table
- 方法二:去指定的model的
.sql
里添加
{{ config(
post_hook = "unload ('select from {{ this }}') to 's3:/bucket_name/{{ this }}"
) }}
select ...
7. 用docs文件夹管理开发文档
-
在model文件夹下创建一个docs文件夹,用来管理所有的docs
image.png - 创建一个关于所有销售数据的md文档
sales_details.md
{% docs salers_type %}
This is type of salers:TOP1 GOLDEN SALER
{% enddocs %}
{% docs salers_name %}
This is name of salers: PJJ,GJJ
{% enddocs %}
{% docs salers_department %}
This is department of salers: IT, HR
{% enddocs %}
- 使用,分别在example和snowflake_sample_data里的schema.yml使用
version: 2
models:
- name: my_first_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: '{{doc("salers_type")}}'
tests:
- unique
- not_null
- name: my_second_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: '{{doc("salers_name")}}'
tests:
- unique
- not_null
8.freshness & add snowflake query tag
- 检查数据和当前时间的差,并定义错误,不是UTC时间需要转
tables:
- name: customers # this will use the freshness defined above
- name: orders
freshness: # make this a little more strict
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}
# Apply a where clause in the freshness query
filter: datediff('day', _etl_loaded_at, current_timestamp) < 2
- 添加snowflake query tag
{{
config(
query_tag = 'demo',
)
}}
select *
from {{ ref('my_first_dbt_model') }}
where id = 1
9. 引用其他项目的文件到自己的
- 假设我们需要引用一个其他项目dbtlearn的一些文件
- 创建一个Package将其他的项目复制的本地,在本地
package.yml
里添加
packages:
- git: "https://github.com/CXTV/dbt_demo.git"
revision: master
- 编译包,此时我们当前的项目就有了导入的其他项目
dbt deps
image.png
3.使用dbt_demo里的macros
select *, {{dbt_packages.dbtlearn.new_macors('Mike')}} as test_col from teams
4.修改本地项目的.gitignore
防止上传了其他项目代码
10. 使用dbt创建uuid&自动清理文件夹clean&添加tag
10.1创建uuid
- dbt内置了自动创建唯一标识符的功能,同一个执行的上下文,是相同的,这样可以用来审核一些执行
select *, '{{ invocation_id}}' as invocation_id from teams
10.2 自动清理文件夹clean
1.配置我们需要清理的文件夹在dbt_project.yml
里
clean-targets:
- "target"
- "dbt_packages"
- "logs"
- 执行清理
dbt clean
10.3 添加Tag
- 添加一个tag,
dbt_project.yml
里
models:
dbt_kahan:
staging:
+materialized: view
+schema: staging
example:
+tag: p1
+materialized: table
- 方法二:在Model里用config添加
{{ config(materialized='view',tags=['special']) }}
- 执行指定的tag
dbt run -m tag:p1
11. DBT一些知识
- 可以指定每个库使用的snowflake的 warehourse
models:
dbt_kahan:
staging:
+materialized: view
+schema: staging
example:
+tag: p1
+materialized: ephemeral
snowflake_warehouse: pc_small
marts:
+materialized: table
snowflake_warehouse: pc_large
- 文件结构需要有staging(处理数据的文件夹,一般都是view),marts(最后处理完成的表,一般是table,snowflake里的实表)
-
最好给Models加上这个,用来测试
image.png
4.package-codegen