DBT复习

1.Snowflake创建 用户/角色/warehouse/Database/Table

1.1 Snowflake账户的初始化

  • 大概流程:①创建role;②创建warehouse;③创建database和schema表④创建独立的user⑤给user赋予transform角色,类似于给user一个该数据仓库的仓库管理员(transform)角色;
  1. 创建角色transform,并且授权ACCOUNTADMIN
CREATE ROLE IF NOT EXISTS transform;
GRANT ROLE TRANSFORM TO ROLE ACCOUNTADMIN;
  1. 查康当前的所有roles
show ROLES  
  1. 创建一个warehouse
CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH;
  1. 创建一个dbt账户,
CREATE USER IF NOT EXISTS dbt
  PASSWORD='shangxi123'
  LOGIN_NAME='dbt'
  MUST_CHANGE_PASSWORD=FALSE
  DEFAULT_WAREHOUSE='COMPUTE_WH'
  DEFAULT_ROLE='transform'
  DEFAULT_NAMESPACE='AIRBNB.RAW'
  COMMENT='DBT user used for data transformation';
  1. 创建database和schema
CREATE DATABASE IF NOT EXISTS AIRBNB;
CREATE SCHEMA IF NOT EXISTS AIRBNB.RAW;
  1. 给transform角色授权项目所需要的所有权限
--授权warehouse的操作给transform
GRANT OPERATE ON WAREHOUSE COMPUTE_WH TO ROLE TRANSFORM;
--将transform的角色赋给dbt
GRANT ROLE transform to USER dbt;
--授权warehouse给transform
GRANT ALL ON WAREHOUSE COMPUTE_WH TO ROLE transform; 
--授权schema给transform
GRANT ALL ON DATABASE AIRBNB to ROLE transform;
--授权database给warehouse
GRANT ALL ON DATABASE AIRBNB to ROLE transform;
--授权database给transform
GRANT ALL ON ALL SCHEMAS IN DATABASE AIRBNB to ROLE transform;
--授权schema给transform
GRANT ALL ON FUTURE SCHEMAS IN DATABASE AIRBNB to ROLE transform;
--授权表给transform
GRANT ALL ON ALL TABLES IN SCHEMA AIRBNB.RAW to ROLE transform;
GRANT ALL ON FUTURE TABLES IN SCHEMA AIRBNB.RAW to ROLE transform;

1.2 创建项目所需要的表并且从S3导入数据

-- Set up the defaults
USE WAREHOUSE COMPUTE_WH;
USE DATABASE airbnb;
USE SCHEMA RAW;

-- Create our three tables and import the data from S3
CREATE OR REPLACE TABLE raw_listings
                    (id integer,
                     listing_url string,
                     name string,
                     room_type string,
                     minimum_nights integer,
                     host_id integer,
                     price string,
                     created_at datetime,
                     updated_at datetime);
                    
COPY INTO raw_listings (id,
                        listing_url,
                        name,
                        room_type,
                        minimum_nights,
                        host_id,
                        price,
                        created_at,
                        updated_at)
                   from 's3://dbtlearn/listings.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');
                    

CREATE OR REPLACE TABLE raw_reviews
                    (listing_id integer,
                     date datetime,
                     reviewer_name string,
                     comments string,
                     sentiment string);
                    
COPY INTO raw_reviews (listing_id, date, reviewer_name, comments, sentiment)
                   from 's3://dbtlearn/reviews.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');
                    

CREATE OR REPLACE TABLE raw_hosts
                    (id integer,
                     name string,
                     is_superhost string,
                     created_at datetime,
                     updated_at datetime);
                    
COPY INTO raw_hosts (id, name, is_superhost, created_at, updated_at)
                   from 's3://dbtlearn/hosts.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

2.编写Model

2.1 添加项目层级

1.将添加staging和marts文件夹,结构化项目


image.png

2.在staging里,添加airbnb的文件夹,用来存放处理airbnb数据的stg,例如创建一个listings的View在snowflake里

WITH stg_listings AS (
 SELECT
 *
 FROM
 AIRBNB.RAW.RAW_LISTINGS
)
SELECT
 id AS listing_id,
 name AS listing_name,
 listing_url,
 room_type,
 minimum_nights,
 host_id,
 price AS price_str,
 created_at,
 updated_at
FROM
 raw_listings

3.修改dbt_project.ymlmodels的内容,将结构的默认表的形式更改

# files using the `{{ config(...) }}` macro.
models:
  dbt_airflow:
    # Config indicated by + and applies to all files under models/example/
    staging:
        +materialized: view
    marts:
      core:
        +materialized: table

2.2 将2.1的hard code的数据库名,改到sources里

1.文件夹新增staging/airbnb/src_airbnb.yml文件,用来添加表

version: 2

sources:
  - name: airbnb   #dbt里使用的名字
    database: AIRBNB    #实际数据库的库名
    schema: RAW   #数据库的schema名称
    tables: 
      - name: RAW_HOSTS  #数据库里的表名
      - name: RAW_REVIEWS  #数据库里的表名
      - name: RAW_LISTINGS  #数据库里的表名

models:
  - name: stg_listings
    columns:
      - name: LISTING_ID
        tests:
          - not_null
          - unique
image.png

3. Materializations

  • models之间如何链接的
  • 四种:①view:视图对应snowflake里的view;②table:对应snowflake里的table;③Ephemeral: 只有DBT有,Models里的CTEs就是;④Incremental: 一般用于dim和fact 表,只对数据进行incremental load

3.1 marts文件夹里创建不同的层级

1.在marts里创建/core/dim_airbnb文件夹,这里存放所有dimention表的处理

image.png

2.创建处理diimention表的文件dim_listings_cleansed.sql

WITH stg_listings as (
    SELECT * FROM {{ ref('stg_listings') }}
)
SELECT
    listing_id,
    listing_name,
    room_type,
    CASE WHEN minimum_nights = 0 THEN 1 ELSE minimum_nights END AS minimum_nights,
    host_id,
    REPLACE( price_str, '$' ) :: NUMBER( 10, 2 ) AS price,
    created_at,
    updated_at
FROM
    stg_listings
  1. 更改schema,将dimention表放入到snowflake的public schema里,如果跟该macros的话,他会创建一个新的schema,名字为RAW_PUBLIC这是一个bug
自定义schema将不同的表放入不同的schema里

①添加一个macros,用来修改bug,generate_schema.sql

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}

②更改dbt_project.yml将我们需要文件夹下的表全部更改他们的schema

models:
  dbt_airflow:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
    marts:
      core:
        +schema: PUBLIC
        +materialized: table

③运行,将所有的dimention表将进入到snowflake的PUBLIC schema里

models:
  dbt_airflow:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
    marts:
      core:
        +materialized: table
        +schema: PUBLIC

3.2 根据需求将表进行Materializations

  1. staging表其实是一个过程表,所以里面的stg_开头的表,是不应该放入到view里,增加数据库开销的,所以这里需ephemeral,改成ephemeral后,他将不会一直存放在数据库的view里,将会自动销毁
models:
  dbt_airflow:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: ephemeral

2.由于marts/core/dim_airbnb里,存放的都是demention表,所以放入public的并且改为view,根据需求,其中两个是过程表,默认成为view,一个是需要使用的业务表,所以在文件内部更改为table

    marts:
      core:
        +schema: PUBLIC
        dim_airbnb:
          +materialized: view
        fact_airbnb:
          +materialized: table

4. snapshots用于生SDC

1.创建一个snapshot,snapshots/scd_raw_listings.sql

{% snapshot scd_raw_listings %}

{{
    config(
      target_database='AIRBNB', #数据库
      target_schema='RAW', #schema
      unique_key='ID',   #unique 字段
 
      strategy='timestamp',  #模式
      updated_at='updated_at',  #模式的时间
    )
}}

select * from {{ source('airbnb', 'RAW_LISTINGS') }}

{% endsnapshot %}

注意:如果strategy是timestamp,必须有update时间
2.运行dbt snapshot创建快照,用来创建一个快照表

dbt snapshot

3.插入一条数据

UPDATE AIRBNB.RAW.RAW_LISTINGS SET MINIMUM_NIGHTS=30,
 updated_at=CURRENT_TIMESTAMP() WHERE ID=3176;

4.运行dbt snapshot查看,可以看出,dbt给我的创建了很多字段,其中valid to 为null,意思目前再用

image.png

注意:当表里没有 有效的update_time的话,可以使用stragery的check模式,添加多列为group

{% snapshot scd_raw_reviews %}

    {{
        config(
          target_schema='RAW',
          strategy='check',
          unique_key='LISTING_ID',
          check_cols=['LISTING_ID', 'REVIEWER_NAME'],
        )
    }}

    select * from {{ source('airbnb', 'RAW_REVIEWS') }}

{% endsnapshot %}

5. Macros

5.1 Macros的编写

  1. 不使用的macros的sql
select
order_id,
sum(case when payment_method = 'bank_transfer' then amount end) as bank_transfer_amount,
sum(case when payment_method = 'credit_card' then amount end) as credit_card_amount,
sum(case when payment_method = 'gift_card' then amount end) as gift_card_amount,
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1
  1. 改进1,使用循环减少代码量
select
order_id,
{% for payment_method in ['bank_transfer','credit_card','gift_card'] %}
    sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_transfer_amount,
{% endfor %}
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1
  1. 改进2,将里面的hard code的列表,改写成为变量
{% set payment_methods =  ["bank_transfer", "credit_card", "gift_card"] %}

select
    order_id,
    {% for payment_method in payment_methods %}
    sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount,
    {% endfor %}
    sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1
  1. 改进3,由于循环后最后有个,会导致sql报错,需要添加if判断
{% set review_sentiments =  ["negative", "neutral", "positive"] %}
select
    LISTING_ID,
    {% for review_sentiment in review_sentiments %}
    sum(case when review_sentiment = '{{review_sentiment}}' then 1 else 0 end) as {{review_sentiment}}_amount
    {% if not loop.last %},{% endif %}
    {% endfor %}
from {{ ref('full_moon_reviews') }}
group by 1
image.png

5.改进4中sql出现了很多空格,可能会造成报错,使用-消除空格

{%- set review_sentiments =  ["negative", "neutral", "positive"] -%}

select
    LISTING_ID,
    {%- for review_sentiment in review_sentiments -%}
    sum(case when review_sentiment = '{{review_sentiment}}' then 1 else 0 end) as {{review_sentiment}}_amount 
    {%- if not loop.last -%},{%- endif -%}
    {% endfor %}
from {{ ref('full_moon_reviews') }}
group by 1

image.png

注意:最后一个endfor是没有-,因为这里需要空格
6.改进5中review_sentiments是hard code,假设review_sentiments都是从一个macros为get_sentiments_method.sql里来的

{%- set review_sentiments = get_payment_methods() -%}

select
    LISTING_ID,
    {%- for review_sentiment in review_sentiments -%}
        sum(case when review_sentiment = '{{review_sentiment}}' then 1 else 0 end) as {{review_sentiment}}_amount 
            {%- if not loop.last -%}
                ,
            {%- endif -%}
    {% endfor %}
from {{ ref('full_moon_reviews') }}
group by 1
  1. 改进6中的get_sentiments_method里面的hard code,将返回的列表改为从其他表里读取出来的
{% macro get_payment_methods() %}

    {% set payment_methods_query %}
    select distinct
        payment_method
    from {{ ref('raw_payments') }}
    order by 1
    {% endset %}

    {% set results = run_query(payment_methods_query) %}

    {% if execute %}
    {# Return the first column #}
    {% set results_list = results.columns[0].values() %}
    {% else %}
    {% set results_list = [] %}
    {% endif %}

    {{ return(results_list) }}

{% endmacro %}

8.改进7,这里的payment和ref的表也是hard code的,我们改进之后可以从其他表读取dinstinct的列,

{% macro get_column_values(column_name, relation) %}

{% set relation_query %}
select distinct
{{ column_name }}
from {{ relation }}
order by 1
{% endset %}

{% set results = run_query(relation_query) %}

{% if execute %}
{# Return the first column #}
{% set results_list = results.columns[0].values() %}
{% else %}
{% set results_list = [] %}
{% endif %}

{{ return(results_list) }}

{% endmacro %}

{% macro get_payment_methods() %}

{{ return(get_column_values('payment_method', ref('raw_payments'))) }}

{% endmacro %}

5.2 第三方包里的macros(dbt_utils/codegen)

6. Testing

6.1 使用dbt_expectations进行Table级别的测试

  • 测试当前表和目标表的行数是否相同
    0.使用codegen先将dim_listings_w_hosts的结构生产出来
{{ codegen.generate_model_yaml(
    model_names=['dim_listings_w_hosts']
    
) }}
  1. 创建test_dim_listings_w_hosts.yml在同一个目录下,引入function测试,测试当前Model的表和 source('airbnb', 'RAW_LISTINGS')的rows是否相等
version: 2

models:
  - name: dim_listings_w_hosts
    tests:
      - dbt_expectations.expect_table_row_count_to_equal_other_table:
          compare_model: source('airbnb', 'RAW_LISTINGS')

    description: ""
    columns:
      - name: listing_id
        data_type: number
        description: ""
  1. 运行测试
dbt test --select dim_listings_w_hosts
  1. 通过测试


    image.png

6.2 使用dbt_expectations进行Column级别的测试

1.测试dim表里的price字段是否在0-500区间,并且type是number

      - name: price
        data_type: number
        description: ""
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0  # (Optional)
              max_value: 500 # (Optional)
              strictly: False
              config:
                severity: warn
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: number
  1. 测试stg表的字段是否都符合一个格式,使用正则
version: 2

models:
  - name: stg_listings
    columns:
      - name: LISTING_ID
        tests:
          - not_null
          - unique
      - name: price_str
        tests:
          - dbt_expectations.expect_column_values_to_match_regex:
              regex: "$+"
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容