DBT(1)(最新)

Words

scalable 可扩展
wrangle 处理
wraps up 包括
procurement 采购
hardware 硬件
hight analytical workloads 高分析工作负载
set up auto scaling 设置自动缩放
peak workloads 峰值工作负载
decouple 解耦
storage component 存储组件

0.环境配置

0.1创建项目文件

工作目录下创建一个为dbt_workspace_v1的文件夹

  1. 使用 venv 模块在该文件夹下创建一个虚拟环境
python -m venv dbt_venv

2.在 Windows 系统中激活名为 dbt_venv 的虚拟环境

.\dbt_venv\Scripts\Activate.ps1

3.安装处理snowflake的包

pip install dbt-snowflake
  1. 在用户的根目录下创建dbt的系统文件
mkdir $home\.dbt

5.创建第一个名为dbtlearn的项目

dbt init dbtlearn

6.根据操作一步一步走,走完后去.dbtprofile.yml里查看参数是否正确

image.png

注意:这里的account是如果是最新版本的话,需要自己手动更改格式accountname1-accountname2
7.检查是否配置成功,进入cd dbtlearn

dbt debug
image.png

0.2 VScode 配置dbt power user

在vscode的setting里,配置


image.png

1.Concepts of Data engineering

1.1what 's ETL and ELT

1.ETL(tradition)


image.png

2.ELT:The cost of storage is very cheap, and because transforming raw data can be time-consuming and less flexible in response to structural changes, ELT is used. 存储的费用很便宜,而且由于transform原始数据会很花费时间,并且应对结构改变没那么灵活,所以使用了ELT


image.png

1.2 why we use cloud?

traditional data warehouse costs so much, for example time-cusoming including pass procurement process, replace broken hardware,need to hire different people mantain the warehouse and a harder to scale the storage


image.png

-smp to modern data stack


image.png
  • decoupling of storage and compute


    image.png
  • row-Oriented to column-Oriented: row databaseds are not good at analytical workloads,columns are optimized for action reading and computation


    image.png

1.3 data lake VS data factory

1.Data lake only strorage data: it doesn't have analysis ability.It can only storage data unstructure ,semi-struture and struture data like videos,images and text data.
2.Data factory combines the data lakes and data warehouses:can storage data and analysis

2. DBT Data Flow

image.png

2.1 Models

  1. 编写第一个Model
WITH raw_listings AS(
    SELECT* FROM AIRBNB.RAW.RAW_LISTINGS
)
SELECT
 id AS listing_id,
 name AS listing_name,
 listing_url,
 room_type,
 minimum_nights,
 host_id,
 price AS price_str,
 created_at,
 updated_at
FROM
 raw_listings

注意:WITH里的语句一定是要将完整的schema和表写出来

2.2 Meterializations

image.png

2.3 Models

image.png

2.3.1 create staging layer

  • scr_listings.sql&scr_hosts.sql&src_reviews.sql
WITH raw_reviews AS (
 SELECT
 *
 FROM
 AIRBNB.RAW.RAW_REVIEWS
)
SELECT
 listing_id,
 date AS review_date,
 reviewer_name,
 comments AS review_text,
 sentiment AS review_sentiment
FROM
 raw_reviews
image.png

2.3.2 create staging layer

  1. create fct_reviews.sql:This is an incremental table, add config to set and use if to check latest data
{{
  config(
    materialized = 'incremental',
    on_schema_change = 'fail'
    )
}}

WITH src_reviews AS(
    SELECT * FROM {{ ref("src_reviews") }}
)
SELECT * FROM src_reviews 
WHERE review_text is not null 
{% if  is_incremental() %}
  and review_date >= (select max(review_date) from {{ this }})
{% endif %}
  1. create dim_listings.sql:It is used to cleaned the listings data
{{
  config(
    materialized = 'view',
    )
}}
WITH src_listings AS (
 SELECT  *  FROM {{ ref("src_listings") }}
)
SELECT
    listing_id,
    listing_name,
    room_type,
    CASE
        WHEN minimum_nights = 0 THEN 1
    ELSE minimum_nights
        END AS minimum_nights,
    host_id,
    REPLACE(price_str,'$') :: NUMBER(10,2 ) AS price, 
    {# CAST(REPLACE(price_str,'$', '') AS NUMBER(10,2)), #}
    created_at, 
    updated_at
FROM
    src_listings
  1. create dim_listings_w_hosts.sql:merge to dim table
{{
  config(
    materialized = 'view',
    )
}}
WITH l AS(
    SELECT * FROM {{ ref("dim_listings_cleaned") }}
),
h AS(
    SELECT * FROM {{ ref("dim_raw_hosts_cleaned") }}
)

SELECT 
 l.listing_id,
 l.listing_name,
 l.room_type,
 l.minimum_nights,
 l.price,
 l.host_id,
 h.host_name,
 h.is_superhost as host_is_superhost,
 l.created_at,
 GREATEST(l.updated_at, h.updated_at) as updated_at
FROM l
LEFT JOIN h ON (h.host_id = l.host_id)
  1. edit project.yml changing Meterializations and delete src_xxx.sql
models:
  dbtlearn:
      +materialized: view
      dim:
        +materialized: table
      src:
        +materialized: ephemeral
image.png

2.4 seeds&source

2.4.1 upload local files to dw using seed

  1. put csv files into seed folder
dbt seed
image.png

2.4.2 using source to manage tables

  • create source.yml under the source folder
version: 2
sources:
  - name: airbnb
    schema: raw
    tables:
      - name: listings
        identifier: raw_listings
      - name: hosts
        identifier: raw_hosts
      - name: reviews
        identifier: raw_reviews
        loaded_at_field: date
        freshness:
          warn_after: {count: 1, period: hour}
          error_after: {count: 24, period: hour}
  • how to use, in scr_listings.sql
WITH raw_listings AS(
    SELECT* FROM {{ source('airbnb', 'listings') }}
)

2.5 snapshorts

2.6 Tests

2.6.1 general tests

use default fields creating tests

  • create schema.yml under models folder and codify a simple test for dim_listings_cleaned to check column listing_id
version: 2

models:
  - name: dim_listings_cleaned
    columns:
      - name: listing_id
        tests:
          - unique
          - not_null
  • run test
dbt test

create our own tests

  • Sometimes datalake databricks do not allow unique etc. We can check relationships between two tables.
    检查a表里的a.1字段是否和b表的b.2字段完全关联
version: 2

models:          
      - name: host_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_listings_hosts') #需要检查关联的表
              field: host_id #关联表的字段
  • check the filed has specified values
models:
      - name: room_type
        tests:
         - accepted_values:
            values: [
              'Entire home/apt',
              'Private room',
              'Shared room',
              'Hotel room'
            ]

2.6.2 singular tests

  • You can define any kind of sql statement in an singular test
    create t_dim_listings_minimum_nights.sql under tests folder
SELECT * FROM {{ ref("dim_listings_cleaned") }}
WHERE minimum_nights < 1
LIMIT 10
  • run test
dbt test

2.7 macros & custom tests

2.7.1 macros

  • create a mcro to check a table has no null columns, create no_nulls_in_columns.sql under macros folder
{% macro no_nulls_in_columns(model) %}
    SELECT * FROM {{ model }} WHERE
    {% for col in adapter.get_columns_in_relation(model) -%}
        {{ col.column }} IS NULL OR
    {% endfor %}
    FALSE
{% endmacro %}
  • use no_nulls macros, create t_dim_listings_minum_nights.sql under tests folder
{{ no_nulls_in_columns(ref("dim_listings_cleaned")) }}
  • run test by single table
dbt test --select dim_listings_cleaned

2.7.2 Costum generic Tests

  • 在2.6.2中我们定义了一个<1的test,但这个只针对一张表,这<1意味着 值不是正数,应该是一个比较通用的测试规则,所以我们使用Costum定制我们的测试
  • 在macros文件夹下创建positve_value.sql
{% test positive_value(model, column_name) %}
    SELECT * FROM  {{ model }}
    WHERE {{ column_name}} < 1
{% endtest %}
  • 修改models文件夹下的schema.yml添加我们的规则给minimum_nights字段
version: 2

models:
  - name: dim_listings_cleaned
    columns:
      - name: minimum_nights
        tests: 
         - positive_value
  • 运行test
dbt test --select dim_listings_cleaned

2.7.3 install Third-Part Packages

  1. 创建packages.yml在项目的跟目录下,并且添加需要安装的第三方包
packages:
  - package: dbt-labs/dbt_utils
    version: 1.1.1

2.重新build dbt

dbt deps

3.给fct_reviews表创建一个新的review_id的hash列,根据几个字段,修改fct_reviews.sql,查看dbt_utils的文档

{{
  config(
    materialized = 'incremental',  
    on_schema_change = 'fail'
    )
}}

WITH src_reviews AS(
    SELECT * FROM {{ ref("src_reviews") }}
)
SELECT 
 {{ dbt_utils.generate_surrogate_key(['listing_id', 'review_date', 'reviewer_name', 'review_text']) }} AS review_id,
 * 
FROM src_reviews 
WHERE review_text is not null 
{% if  is_incremental() %}
  and review_date >= (select max(review_date) from {{ this }})
{% endif %}

4.运行dbt

dbt run --full-refresh --select fct_reviews

注意:这里我们不是直接使用dbt run因为fct表是一个incremental表,默认不能改变表的结构
5.成功后进入snowflake里,可以看到原来的表多了一个新的review_id列

image.png

2.8 Documentation

2.8.1 创建一个简单的docs

  1. 修改schema.yml并且添加文件字段的描述
    image.png
  2. 生成docs
dbt docs generate
  1. 运行自带的serve,需要指定端口,默认8080和airflow冲突
dbt docs serve --port 8001

2.8.2给表的字段添加更加详细的文档

1.models folder下创建docs.md

{% docs dim_listing_cleaned__minimum_nights %}
Minimum number of nights required to rent this property. 
Keep in mind that old listings might have `minimum_nights` set 
to 0 in the source tables. Our cleansing algorithm updates this to `1`.
{% enddocs %}
  1. 修改schema.yml
      - name: minimum_nights
        description: '{{doc("dim_listing_cleaned__minimum_nights")}}'
        tests: 
         - positive_value

3.查看


image.png

2.8.3自定义overview页面,添加图片

  1. models folder下添加overview.md
{% docs __overview__ %}
# Airbnb pipeline
Hey, welcome to our Airbnb pipeline documentation!
Here is the schema of our input data:
![input schema](https://dbtlearn.s3.us-east-2.amazonaws.com/input_schema.png)
{% enddocs %}
  1. 生成并且运行
  • 有时候我们需要使用本地图片而不是网上的
  1. 创建assets文件夹,将图片放入到assets文件夹里
  2. 设置dbt_project.yml将文件夹名设置到dbt里,以便于dbt识别
    image.png
  3. 生成,运行,完成

2.9 Hooks

2.9.1 创建一个hook

  1. 创建一个新的用户,不设置表的使用权限,只设置schema的权限
USE ROLE ACCOUNTADMIN;
CREATE ROLE IF NOT EXISTS REPORTER;
CREATE USER IF NOT EXISTS PRESET
 PASSWORD='presetPassword123'
 LOGIN_NAME='preset'
 MUST_CHANGE_PASSWORD=FALSE
 DEFAULT_WAREHOUSE='COMPUTE_WH'
 DEFAULT_ROLE='REPORTER'
 DEFAULT_NAMESPACE='AIRBNB.DEV'
 COMMENT='Preset user for creating reports';
GRANT ROLE REPORTER TO USER PRESET;
GRANT ROLE REPORTER TO ROLE ACCOUNTADMIN;
GRANT ALL ON WAREHOUSE COMPUTE_WH TO ROLE REPORTER;
GRANT USAGE ON DATABASE AIRBNB TO ROLE REPORTER;
GRANT USAGE ON SCHEMA AIRBNB.DEV TO ROLE REPORTER;
image.png
  1. 创建一个Hook,给所有dbtlearn下的表,修改dbt_project.yml
models:
  dbtlearn:
      +materialized: view
      +post-hook:
        - "GRANT SELECT ON {{ this }} TO ROLE REPORTER"
      dim:
        +materialized: table
      src:
        +materialized: ephemeral
  1. 运行,我们看到DEV下已经有了我们项目所有的表


    image.png

2.10 第三方测试dbt-expectations

1.packages.yml添加expectations

packages:
  - package: calogica/dbt_expectations
    version: [">=0.9.0", "<0.10.0"]
  1. 给项目安装packages
dbt deps

2.10.1 测试table

修改schema.yml添加表级别的测试

  - name: dim_listings_hosts
    tests:
      - dbt_expectations.expect_table_column_count_to_be_between:
          max_value: 8 # (Optional)

2.10.2 测试列

    columns: 
      - name: price
        tests:
          - dbt_expectations.expect_column_quantile_values_to_be_between:
              quantile: .95
              min_value: 50
              max_value: 500 
          - dbt_expectations.expect_column_max_to_be_between:
              max_value: 5000 
              config:
                severity: warn 
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容