1.Snowflake创建 用户/角色/warehouse/Database/Table
1.1 Snowflake账户的初始化
- 大概流程:①创建role;②创建warehouse;③创建database和schema表④创建独立的user⑤给user赋予transform角色,类似于给user一个该数据仓库的仓库管理员(transform)角色;
- 创建角色transform,并且授权ACCOUNTADMIN
CREATE ROLE IF NOT EXISTS transform;
GRANT ROLE TRANSFORM TO ROLE ACCOUNTADMIN;
- 查康当前的所有roles
show ROLES
- 创建一个warehouse
CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH;
- 创建一个dbt账户,
CREATE USER IF NOT EXISTS dbt
PASSWORD='shangxi123'
LOGIN_NAME='dbt'
MUST_CHANGE_PASSWORD=FALSE
DEFAULT_WAREHOUSE='COMPUTE_WH'
DEFAULT_ROLE='transform'
DEFAULT_NAMESPACE='AIRBNB.RAW'
COMMENT='DBT user used for data transformation';
- 创建database和schema
CREATE DATABASE IF NOT EXISTS AIRBNB;
CREATE SCHEMA IF NOT EXISTS AIRBNB.RAW;
- 给transform角色授权项目所需要的所有权限
--授权warehouse的操作给transform
GRANT OPERATE ON WAREHOUSE COMPUTE_WH TO ROLE TRANSFORM;
--将transform的角色赋给dbt
GRANT ROLE transform to USER dbt;
--授权warehouse给transform
GRANT ALL ON WAREHOUSE COMPUTE_WH TO ROLE transform;
--授权schema给transform
GRANT ALL ON DATABASE AIRBNB to ROLE transform;
--授权database给warehouse
GRANT ALL ON DATABASE AIRBNB to ROLE transform;
--授权database给transform
GRANT ALL ON ALL SCHEMAS IN DATABASE AIRBNB to ROLE transform;
--授权schema给transform
GRANT ALL ON FUTURE SCHEMAS IN DATABASE AIRBNB to ROLE transform;
--授权表给transform
GRANT ALL ON ALL TABLES IN SCHEMA AIRBNB.RAW to ROLE transform;
GRANT ALL ON FUTURE TABLES IN SCHEMA AIRBNB.RAW to ROLE transform;
1.2 创建项目所需要的表并且从S3导入数据
-- Set up the defaults
USE WAREHOUSE COMPUTE_WH;
USE DATABASE airbnb;
USE SCHEMA RAW;
-- Create our three tables and import the data from S3
CREATE OR REPLACE TABLE raw_listings
(id integer,
listing_url string,
name string,
room_type string,
minimum_nights integer,
host_id integer,
price string,
created_at datetime,
updated_at datetime);
COPY INTO raw_listings (id,
listing_url,
name,
room_type,
minimum_nights,
host_id,
price,
created_at,
updated_at)
from 's3://dbtlearn/listings.csv'
FILE_FORMAT = (type = 'CSV' skip_header = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"');
CREATE OR REPLACE TABLE raw_reviews
(listing_id integer,
date datetime,
reviewer_name string,
comments string,
sentiment string);
COPY INTO raw_reviews (listing_id, date, reviewer_name, comments, sentiment)
from 's3://dbtlearn/reviews.csv'
FILE_FORMAT = (type = 'CSV' skip_header = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"');
CREATE OR REPLACE TABLE raw_hosts
(id integer,
name string,
is_superhost string,
created_at datetime,
updated_at datetime);
COPY INTO raw_hosts (id, name, is_superhost, created_at, updated_at)
from 's3://dbtlearn/hosts.csv'
FILE_FORMAT = (type = 'CSV' skip_header = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"');
2.编写Model
2.1 添加项目层级
1.将添加staging和marts文件夹,结构化项目
2.在staging里,添加airbnb的文件夹,用来存放处理airbnb数据的stg,例如创建一个listings的View在snowflake里
WITH stg_listings AS (
SELECT
*
FROM
AIRBNB.RAW.RAW_LISTINGS
)
SELECT
id AS listing_id,
name AS listing_name,
listing_url,
room_type,
minimum_nights,
host_id,
price AS price_str,
created_at,
updated_at
FROM
raw_listings
3.修改dbt_project.yml
里models
的内容,将结构的默认表的形式更改
# files using the `{{ config(...) }}` macro.
models:
dbt_airflow:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
marts:
core:
+materialized: table
2.2 将2.1的hard code的数据库名,改到sources里
1.文件夹新增staging/airbnb/src_airbnb.yml
文件,用来添加表
version: 2
sources:
- name: airbnb #dbt里使用的名字
database: AIRBNB #实际数据库的库名
schema: RAW #数据库的schema名称
tables:
- name: RAW_HOSTS #数据库里的表名
- name: RAW_REVIEWS #数据库里的表名
- name: RAW_LISTINGS #数据库里的表名
models:
- name: stg_listings
columns:
- name: LISTING_ID
tests:
- not_null
- unique
3. Materializations
- models之间如何链接的
- 四种:①view:视图对应snowflake里的view;②table:对应snowflake里的table;③Ephemeral: 只有DBT有,Models里的CTEs就是;④Incremental: 一般用于dim和fact 表,只对数据进行incremental load
3.1 marts文件夹里创建不同的层级
1.在marts里创建/core/dim_airbnb
文件夹,这里存放所有dimention表的处理
2.创建处理diimention表的文件
dim_listings_cleansed.sql
WITH stg_listings as (
SELECT * FROM {{ ref('stg_listings') }}
)
SELECT
listing_id,
listing_name,
room_type,
CASE WHEN minimum_nights = 0 THEN 1 ELSE minimum_nights END AS minimum_nights,
host_id,
REPLACE( price_str, '$' ) :: NUMBER( 10, 2 ) AS price,
created_at,
updated_at
FROM
stg_listings
- 更改schema,将dimention表放入到snowflake的public schema里,如果跟该macros的话,他会创建一个新的schema,名字为
RAW_PUBLIC
这是一个bug
自定义schema将不同的表放入不同的schema里
①添加一个macros,用来修改bug,generate_schema.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
②更改dbt_project.yml
将我们需要文件夹下的表全部更改他们的schema
models:
dbt_airflow:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
marts:
core:
+schema: PUBLIC
+materialized: table
③运行,将所有的dimention表将进入到snowflake的PUBLIC schema里
models:
dbt_airflow:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
marts:
core:
+materialized: table
+schema: PUBLIC
3.2 根据需求将表进行Materializations
- staging表其实是一个过程表,所以里面的stg_开头的表,是不应该放入到view里,增加数据库开销的,所以这里需
ephemeral
,改成ephemeral后,他将不会一直存放在数据库的view里,将会自动销毁
models:
dbt_airflow:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: ephemeral
2.由于marts/core/dim_airbnb
里,存放的都是demention表,所以放入public的并且改为view,根据需求,其中两个是过程表,默认成为view,一个是需要使用的业务表,所以在文件内部更改为table
marts:
core:
+schema: PUBLIC
dim_airbnb:
+materialized: view
fact_airbnb:
+materialized: table
4. snapshots用于生SDC
1.创建一个snapshot,snapshots/scd_raw_listings.sql
{% snapshot scd_raw_listings %}
{{
config(
target_database='AIRBNB', #数据库
target_schema='RAW', #schema
unique_key='ID', #unique 字段
strategy='timestamp', #模式
updated_at='updated_at', #模式的时间
)
}}
select * from {{ source('airbnb', 'RAW_LISTINGS') }}
{% endsnapshot %}
注意:如果strategy是timestamp,必须有update时间
2.运行dbt snapshot
创建快照,用来创建一个快照表
dbt snapshot
3.插入一条数据
UPDATE AIRBNB.RAW.RAW_LISTINGS SET MINIMUM_NIGHTS=30,
updated_at=CURRENT_TIMESTAMP() WHERE ID=3176;
4.运行dbt snapshot
查看,可以看出,dbt给我的创建了很多字段,其中valid to 为null,意思目前再用
注意:当表里没有 有效的update_time的话,可以使用stragery的check模式,添加多列为group
{% snapshot scd_raw_reviews %}
{{
config(
target_schema='RAW',
strategy='check',
unique_key='LISTING_ID',
check_cols=['LISTING_ID', 'REVIEWER_NAME'],
)
}}
select * from {{ source('airbnb', 'RAW_REVIEWS') }}
{% endsnapshot %}
5. Macros
5.1 Macros的编写
- 不使用的macros的sql
select
order_id,
sum(case when payment_method = 'bank_transfer' then amount end) as bank_transfer_amount,
sum(case when payment_method = 'credit_card' then amount end) as credit_card_amount,
sum(case when payment_method = 'gift_card' then amount end) as gift_card_amount,
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1
- 改进1,使用循环减少代码量
select
order_id,
{% for payment_method in ['bank_transfer','credit_card','gift_card'] %}
sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_transfer_amount,
{% endfor %}
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1
- 改进2,将里面的hard code的列表,改写成为变量
{% set payment_methods = ["bank_transfer", "credit_card", "gift_card"] %}
select
order_id,
{% for payment_method in payment_methods %}
sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount,
{% endfor %}
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1
- 改进3,由于循环后最后有个
,
会导致sql报错,需要添加if判断
{% set review_sentiments = ["negative", "neutral", "positive"] %}
select
LISTING_ID,
{% for review_sentiment in review_sentiments %}
sum(case when review_sentiment = '{{review_sentiment}}' then 1 else 0 end) as {{review_sentiment}}_amount
{% if not loop.last %},{% endif %}
{% endfor %}
from {{ ref('full_moon_reviews') }}
group by 1
5.改进4中sql出现了很多空格,可能会造成报错,使用-
消除空格
{%- set review_sentiments = ["negative", "neutral", "positive"] -%}
select
LISTING_ID,
{%- for review_sentiment in review_sentiments -%}
sum(case when review_sentiment = '{{review_sentiment}}' then 1 else 0 end) as {{review_sentiment}}_amount
{%- if not loop.last -%},{%- endif -%}
{% endfor %}
from {{ ref('full_moon_reviews') }}
group by 1
注意:最后一个endfor是没有-,因为这里需要空格
6.改进5中review_sentiments是hard code,假设review_sentiments都是从一个macros为
get_sentiments_method.sql
里来的
{%- set review_sentiments = get_payment_methods() -%}
select
LISTING_ID,
{%- for review_sentiment in review_sentiments -%}
sum(case when review_sentiment = '{{review_sentiment}}' then 1 else 0 end) as {{review_sentiment}}_amount
{%- if not loop.last -%}
,
{%- endif -%}
{% endfor %}
from {{ ref('full_moon_reviews') }}
group by 1
- 改进6中的
get_sentiments_method
里面的hard code,将返回的列表改为从其他表里读取出来的
{% macro get_payment_methods() %}
{% set payment_methods_query %}
select distinct
payment_method
from {{ ref('raw_payments') }}
order by 1
{% endset %}
{% set results = run_query(payment_methods_query) %}
{% if execute %}
{# Return the first column #}
{% set results_list = results.columns[0].values() %}
{% else %}
{% set results_list = [] %}
{% endif %}
{{ return(results_list) }}
{% endmacro %}
8.改进7,这里的payment和ref的表也是hard code的,我们改进之后可以从其他表读取dinstinct的列,
{% macro get_column_values(column_name, relation) %}
{% set relation_query %}
select distinct
{{ column_name }}
from {{ relation }}
order by 1
{% endset %}
{% set results = run_query(relation_query) %}
{% if execute %}
{# Return the first column #}
{% set results_list = results.columns[0].values() %}
{% else %}
{% set results_list = [] %}
{% endif %}
{{ return(results_list) }}
{% endmacro %}
{% macro get_payment_methods() %}
{{ return(get_column_values('payment_method', ref('raw_payments'))) }}
{% endmacro %}
5.2 第三方包里的macros(dbt_utils/codegen)
6. Testing
6.1 使用dbt_expectations进行Table级别的测试
- 测试当前表和目标表的行数是否相同
0.使用codegen先将dim_listings_w_hosts的结构生产出来
{{ codegen.generate_model_yaml(
model_names=['dim_listings_w_hosts']
) }}
- 创建
test_dim_listings_w_hosts.yml
在同一个目录下,引入function测试,测试当前Model的表和source('airbnb', 'RAW_LISTINGS')
的rows是否相等
version: 2
models:
- name: dim_listings_w_hosts
tests:
- dbt_expectations.expect_table_row_count_to_equal_other_table:
compare_model: source('airbnb', 'RAW_LISTINGS')
description: ""
columns:
- name: listing_id
data_type: number
description: ""
- 运行测试
dbt test --select dim_listings_w_hosts
-
通过测试
6.2 使用dbt_expectations进行Column级别的测试
1.测试dim表里的price字段是否在0-500区间,并且type是number
- name: price
data_type: number
description: ""
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0 # (Optional)
max_value: 500 # (Optional)
strictly: False
config:
severity: warn
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: number
- 测试stg表的字段是否都符合一个格式,使用正则
version: 2
models:
- name: stg_listings
columns:
- name: LISTING_ID
tests:
- not_null
- unique
- name: price_str
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: "$+"