DBT(Kahan)

1.创建项目

1.1 创建多个项目

这次我们不使用命令行配置,直接更改profiles.yml

  1. profiles.yml新建一个环境dbt_kahan,项目里的yml和profiles.yml名字要对应
    image.png
  2. 填写profiles.yml的配置信息(多项目)
    image.png

    3.测试
dbt debug

1.2 创建多个staging来区分不同的database

image.png
  1. 根据snowflake上面的数据库名称创建对应的staging


    image.png
  2. 创建需要的表的schema.yml
version: 2

sources:
  - name: snowflake_sample_data   #dbt里引用的名字
    database: SNOWFLAKE_SAMPLE_DATA    #实际数据库的库名
    schema: TPCDS_SF10TCL_OLD   #数据库的schema名称
    tables: 
      - name: STORE_SALES  #数据库里的表名
  1. 创建一个dag名为ssr_store_sales.sql,用来将sources里的表,移动到demo_db的view里
WITH ssr_store_sales as(
    select * from {{ source('snowflake_sample_data', 'STORE_SALES') }}
)
select * from ssr_store_sales

2. 定制自己的schema

  • 默认情况下,我们在最开始配置profiles.yml已经hard code了schema为public
    image.png

    但是,实际工作中,我们根据业务的处理用到很多的schema,所以需要我们根据业务的不同场景来切换staging
  1. 添加macros,用来更改默认的schema的generate_schema.sql
{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}
  1. 修改dbt_project.yml,添加名为staging的schema,这样在staing文件夹下的所有的表,都会进入到snowflake的staging下
models:
  dbt_kahan:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
      +schema: staging
image.png

3. 对dbt的表进行持久化materialization

3.1 方法一:整体修改dbt_project.yml

  • 如果我们想对表进行各种持久化的操作需要用materialization,例如将表改为table,view等,可以在dbt_project.yml直接修改
  1. 将staging下example的表改为table,默认是view
models:
  dbt_kahan:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
      +schema: staging
      example:
        +materialized: table
image.png

3.2 方法二:对单个modol进行修改

  • 将上面的first example改为view,在my_first_example_dbt.sql里添加

image.png
  1. macros 本质是test
  2. seed 里面的数据应该是很少变动的,用来被引用的

3.3 ephemeral

  • 不是snowflake或者其他工具的内置类型,只是dbt自定义的,所以将model设置为ephemeral不会出现在snowflake里,只会在dbt内部当作CTE来使用
  • 优点:可以当作表来使用,可以保持数据库干净
  • 缺点:难以调试和显示

4. 添加变量

  1. dbt_project.yml添加变量
name: 'dbt_kahan'
version: '1.0.0'
config-version: 2

vars:
  current_champion: Lakers
  1. 使用变量
with champion_team as (
    select *,
        case when team = '{{var("current_champion")}}' then 'is_champion else NULL END as flag_champion
    from teams

6. Hooks

当我们想在model执行的之前或者之后执行一些重复操作的时候,不需要每个Model都添加操作,可以使用Hooks来

6.1 dbt运行前或者后执行操作

  • dbt run之前和之后,执行我们定义的sql statement,在dbt test这类操作也可以添加测试前的hooks
on-run-start:
  - "{% for schema in schemas %}grant usage on schema {{ schema }} to group reporter; {% endfor %}"

on-run-end:
  "{{ grant_select(schemas) }}"

models:
  dbt_kahan:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
      +schema: staging
      example:
        +materialized: table

6.2 model级别的Hooks

  • 有两种添加方式,
  1. 方法一:直接去dbt_project.yml指定的model下添加
  • 执行所有Model之前,先给example下的所有Model授权给REPORTER角色,然后执行完所有Model之后,在移除PC的使用权限
models:
  dbt_kahan:
    staging:
      +materialized: view
      +schema: staging
      example:
        +pre-hook:
          - "GRANT SELECT ON {{ this }} TO ROLE REPORTER"
        +post-hook:
          - "REMOVE SELECT ON {{ this }} TO ROLE PC"
        +materialized: table

  1. 方法二:去指定的model的.sql里添加
{{ config(
  post_hook = "unload ('select from {{ this }}') to 's3:/bucket_name/{{ this }}"
) }}

select ...

7. 用docs文件夹管理开发文档

  1. 在model文件夹下创建一个docs文件夹,用来管理所有的docs


    image.png
  2. 创建一个关于所有销售数据的md文档sales_details.md
{% docs salers_type %}
This is type of salers:TOP1 GOLDEN SALER

{% enddocs %}

{% docs salers_name %}
This is name of salers: PJJ,GJJ

{% enddocs %}

{% docs salers_department %}
This is department of salers: IT, HR

{% enddocs %}

  1. 使用,分别在example和snowflake_sample_data里的schema.yml使用

version: 2

models:
  - name: my_first_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: '{{doc("salers_type")}}'
        tests:
          - unique
          - not_null

  - name: my_second_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: '{{doc("salers_name")}}'
        tests:
          - unique
          - not_null

8.freshness & add snowflake query tag

  1. 检查数据和当前时间的差,并定义错误,不是UTC时间需要转

    tables:
      - name: customers # this will use the freshness defined above

      - name: orders
        freshness: # make this a little more strict
          warn_after: {count: 6, period: hour}
          error_after: {count: 12, period: hour}
          # Apply a where clause in the freshness query
          filter: datediff('day', _etl_loaded_at, current_timestamp) < 2
  1. 添加snowflake query tag
{{
  config(
    query_tag = 'demo',
    )
}}

select *
from {{ ref('my_first_dbt_model') }}
where id = 1

9. 引用其他项目的文件到自己的

  • 假设我们需要引用一个其他项目dbtlearn的一些文件
  1. 创建一个Package将其他的项目复制的本地,在本地package.yml里添加
packages:
  - git: "https://github.com/CXTV/dbt_demo.git"
    revision: master
  1. 编译包,此时我们当前的项目就有了导入的其他项目
dbt deps
image.png

3.使用dbt_demo里的macros

select *, {{dbt_packages.dbtlearn.new_macors('Mike')}} as test_col from teams

4.修改本地项目的.gitignore防止上传了其他项目代码

10. 使用dbt创建uuid&自动清理文件夹clean&添加tag

10.1创建uuid

  • dbt内置了自动创建唯一标识符的功能,同一个执行的上下文,是相同的,这样可以用来审核一些执行
select *, '{{ invocation_id}}' as invocation_id from teams

10.2 自动清理文件夹clean

1.配置我们需要清理的文件夹在dbt_project.yml

clean-targets:       
  - "target"  
  - "dbt_packages"
  - "logs"
  1. 执行清理
dbt clean

10.3 添加Tag

  1. 添加一个tag,dbt_project.yml
models:
  dbt_kahan:
    staging:
      +materialized: view
      +schema: staging
      example:
        +tag: p1
        +materialized: table
  1. 方法二:在Model里用config添加
{{ config(materialized='view',tags=['special']) }}
  1. 执行指定的tag
dbt run -m tag:p1

11. DBT一些知识

  1. 可以指定每个库使用的snowflake的 warehourse
models:
  dbt_kahan:
    staging:
      +materialized: view
      +schema: staging 
      example:
        +tag: p1
        +materialized: ephemeral
        snowflake_warehouse: pc_small
      marts:
        +materialized: table
        snowflake_warehouse: pc_large
  1. 文件结构需要有staging(处理数据的文件夹,一般都是view),marts(最后处理完成的表,一般是table,snowflake里的实表)
  2. 最好给Models加上这个,用来测试


    image.png

4.package-codegen

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容