Words
scalable 可扩展
wrangle 处理
wraps up 包括
procurement 采购
hardware 硬件
hight analytical workloads 高分析工作负载
set up auto scaling 设置自动缩放
peak workloads 峰值工作负载
decouple 解耦
storage component 存储组件
0.环境配置
0.1创建项目文件
工作目录下创建一个为dbt_workspace_v1
的文件夹
- 使用 venv 模块在该文件夹下创建一个虚拟环境
python -m venv dbt_venv
2.在 Windows 系统中激活名为 dbt_venv 的虚拟环境
.\dbt_venv\Scripts\Activate.ps1
3.安装处理snowflake的包
pip install dbt-snowflake
- 在用户的根目录下创建dbt的系统文件
mkdir $home\.dbt
5.创建第一个名为dbtlearn
的项目
dbt init dbtlearn
6.根据操作一步一步走,走完后去.dbt
的profile.yml
里查看参数是否正确
注意:这里的account是如果是最新版本的话,需要自己手动更改格式accountname1-accountname2
7.检查是否配置成功,进入cd dbtlearn
dbt debug
0.2 VScode 配置dbt power user
在vscode的setting里,配置
1.Concepts of Data engineering
1.1what 's ETL and ELT
1.ETL(tradition)
2.ELT:The cost of storage is very cheap, and because transforming raw data can be time-consuming and less flexible in response to structural changes, ELT is used. 存储的费用很便宜,而且由于transform原始数据会很花费时间,并且应对结构改变没那么灵活,所以使用了ELT
1.2 why we use cloud?
traditional data warehouse costs so much, for example time-cusoming including pass procurement process, replace broken hardware,need to hire different people mantain the warehouse and a harder to scale the storage
-smp to modern data stack
-
decoupling of storage and compute
-
row-Oriented to column-Oriented: row databaseds are not good at analytical workloads,columns are optimized for action reading and computation
1.3 data lake VS data factory
1.Data lake only strorage data: it doesn't have analysis ability.It can only storage data unstructure ,semi-struture and struture data like videos,images and text data.
2.Data factory combines the data lakes and data warehouses:can storage data and analysis
2. DBT Data Flow
2.1 Models
- 编写第一个Model
WITH raw_listings AS(
SELECT* FROM AIRBNB.RAW.RAW_LISTINGS
)
SELECT
id AS listing_id,
name AS listing_name,
listing_url,
room_type,
minimum_nights,
host_id,
price AS price_str,
created_at,
updated_at
FROM
raw_listings
注意:WITH里的语句一定是要将完整的schema和表写出来
2.2 Meterializations
2.3 Models
2.3.1 create staging layer
-
scr_listings.sql
&scr_hosts.sql
&src_reviews.sql
WITH raw_reviews AS (
SELECT
*
FROM
AIRBNB.RAW.RAW_REVIEWS
)
SELECT
listing_id,
date AS review_date,
reviewer_name,
comments AS review_text,
sentiment AS review_sentiment
FROM
raw_reviews
2.3.2 create staging layer
- create
fct_reviews.sql
:This is an incremental table, add config to set and use if to check latest data
{{
config(
materialized = 'incremental',
on_schema_change = 'fail'
)
}}
WITH src_reviews AS(
SELECT * FROM {{ ref("src_reviews") }}
)
SELECT * FROM src_reviews
WHERE review_text is not null
{% if is_incremental() %}
and review_date >= (select max(review_date) from {{ this }})
{% endif %}
- create
dim_listings.sql
:It is used to cleaned the listings data
{{
config(
materialized = 'view',
)
}}
WITH src_listings AS (
SELECT * FROM {{ ref("src_listings") }}
)
SELECT
listing_id,
listing_name,
room_type,
CASE
WHEN minimum_nights = 0 THEN 1
ELSE minimum_nights
END AS minimum_nights,
host_id,
REPLACE(price_str,'$') :: NUMBER(10,2 ) AS price,
{# CAST(REPLACE(price_str,'$', '') AS NUMBER(10,2)), #}
created_at,
updated_at
FROM
src_listings
- create
dim_listings_w_hosts.sql
:merge to dim table
{{
config(
materialized = 'view',
)
}}
WITH l AS(
SELECT * FROM {{ ref("dim_listings_cleaned") }}
),
h AS(
SELECT * FROM {{ ref("dim_raw_hosts_cleaned") }}
)
SELECT
l.listing_id,
l.listing_name,
l.room_type,
l.minimum_nights,
l.price,
l.host_id,
h.host_name,
h.is_superhost as host_is_superhost,
l.created_at,
GREATEST(l.updated_at, h.updated_at) as updated_at
FROM l
LEFT JOIN h ON (h.host_id = l.host_id)
- edit
project.yml
changing Meterializations and deletesrc_xxx.sql
models:
dbtlearn:
+materialized: view
dim:
+materialized: table
src:
+materialized: ephemeral
2.4 seeds&source
2.4.1 upload local files to dw using seed
- put csv files into seed folder
dbt seed
2.4.2 using source to manage tables
- create
source.yml
under the source folder
version: 2
sources:
- name: airbnb
schema: raw
tables:
- name: listings
identifier: raw_listings
- name: hosts
identifier: raw_hosts
- name: reviews
identifier: raw_reviews
loaded_at_field: date
freshness:
warn_after: {count: 1, period: hour}
error_after: {count: 24, period: hour}
- how to use, in
scr_listings.sql
WITH raw_listings AS(
SELECT* FROM {{ source('airbnb', 'listings') }}
)
2.5 snapshorts
2.6 Tests
2.6.1 general tests
use default fields creating tests
- create
schema.yml
under models folder and codify a simple test for dim_listings_cleaned to check column listing_id
version: 2
models:
- name: dim_listings_cleaned
columns:
- name: listing_id
tests:
- unique
- not_null
- run test
dbt test
create our own tests
- Sometimes datalake databricks do not allow
unique
etc. We can check relationships between two tables.
检查a表里的a.1字段是否和b表的b.2字段完全关联
version: 2
models:
- name: host_id
tests:
- not_null
- relationships:
to: ref('dim_listings_hosts') #需要检查关联的表
field: host_id #关联表的字段
- check the filed has specified values
models:
- name: room_type
tests:
- accepted_values:
values: [
'Entire home/apt',
'Private room',
'Shared room',
'Hotel room'
]
2.6.2 singular tests
- You can define any kind of sql statement in an singular test
createt_dim_listings_minimum_nights.sql
under tests folder
SELECT * FROM {{ ref("dim_listings_cleaned") }}
WHERE minimum_nights < 1
LIMIT 10
- run test
dbt test
2.7 macros & custom tests
2.7.1 macros
- create a mcro to check a table has no null columns, create
no_nulls_in_columns.sql
under macros folder
{% macro no_nulls_in_columns(model) %}
SELECT * FROM {{ model }} WHERE
{% for col in adapter.get_columns_in_relation(model) -%}
{{ col.column }} IS NULL OR
{% endfor %}
FALSE
{% endmacro %}
- use no_nulls macros, create
t_dim_listings_minum_nights.sql
under tests folder
{{ no_nulls_in_columns(ref("dim_listings_cleaned")) }}
- run test by single table
dbt test --select dim_listings_cleaned
2.7.2 Costum generic Tests
- 在2.6.2中我们定义了一个<1的test,但这个只针对一张表,这<1意味着 值不是正数,应该是一个比较通用的测试规则,所以我们使用Costum定制我们的测试
- 在macros文件夹下创建
positve_value.sql
{% test positive_value(model, column_name) %}
SELECT * FROM {{ model }}
WHERE {{ column_name}} < 1
{% endtest %}
- 修改
models
文件夹下的schema.yml
添加我们的规则给minimum_nights字段
version: 2
models:
- name: dim_listings_cleaned
columns:
- name: minimum_nights
tests:
- positive_value
- 运行test
dbt test --select dim_listings_cleaned
2.7.3 install Third-Part Packages
- 创建
packages.yml
在项目的跟目录下,并且添加需要安装的第三方包
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
2.重新build dbt
dbt deps
3.给fct_reviews表创建一个新的review_id的hash列,根据几个字段,修改fct_reviews.sql
,查看dbt_utils
的文档
{{
config(
materialized = 'incremental',
on_schema_change = 'fail'
)
}}
WITH src_reviews AS(
SELECT * FROM {{ ref("src_reviews") }}
)
SELECT
{{ dbt_utils.generate_surrogate_key(['listing_id', 'review_date', 'reviewer_name', 'review_text']) }} AS review_id,
*
FROM src_reviews
WHERE review_text is not null
{% if is_incremental() %}
and review_date >= (select max(review_date) from {{ this }})
{% endif %}
4.运行dbt
dbt run --full-refresh --select fct_reviews
注意:这里我们不是直接使用dbt run
因为fct表是一个incremental表,默认不能改变表的结构
5.成功后进入snowflake里,可以看到原来的表多了一个新的review_id列
2.8 Documentation
2.8.1 创建一个简单的docs
- 修改
schema.yml
并且添加文件字段的描述
- 生成docs
dbt docs generate
- 运行自带的serve,需要指定端口,默认8080和airflow冲突
dbt docs serve --port 8001
2.8.2给表的字段添加更加详细的文档
1.models
folder下创建docs.md
{% docs dim_listing_cleaned__minimum_nights %}
Minimum number of nights required to rent this property.
Keep in mind that old listings might have `minimum_nights` set
to 0 in the source tables. Our cleansing algorithm updates this to `1`.
{% enddocs %}
- 修改
schema.yml
- name: minimum_nights
description: '{{doc("dim_listing_cleaned__minimum_nights")}}'
tests:
- positive_value
3.查看
2.8.3自定义overview页面,添加图片
-
models
folder下添加overview.md
{% docs __overview__ %}
# Airbnb pipeline
Hey, welcome to our Airbnb pipeline documentation!
Here is the schema of our input data:
![input schema](https://dbtlearn.s3.us-east-2.amazonaws.com/input_schema.png)
{% enddocs %}
- 生成并且运行
- 有时候我们需要使用本地图片而不是网上的
- 创建
assets
文件夹,将图片放入到assets文件夹里 - 设置
dbt_project.yml
将文件夹名设置到dbt里,以便于dbt识别
- 生成,运行,完成
2.9 Hooks
2.9.1 创建一个hook
- 创建一个新的用户,不设置表的使用权限,只设置schema的权限
USE ROLE ACCOUNTADMIN;
CREATE ROLE IF NOT EXISTS REPORTER;
CREATE USER IF NOT EXISTS PRESET
PASSWORD='presetPassword123'
LOGIN_NAME='preset'
MUST_CHANGE_PASSWORD=FALSE
DEFAULT_WAREHOUSE='COMPUTE_WH'
DEFAULT_ROLE='REPORTER'
DEFAULT_NAMESPACE='AIRBNB.DEV'
COMMENT='Preset user for creating reports';
GRANT ROLE REPORTER TO USER PRESET;
GRANT ROLE REPORTER TO ROLE ACCOUNTADMIN;
GRANT ALL ON WAREHOUSE COMPUTE_WH TO ROLE REPORTER;
GRANT USAGE ON DATABASE AIRBNB TO ROLE REPORTER;
GRANT USAGE ON SCHEMA AIRBNB.DEV TO ROLE REPORTER;
- 创建一个Hook,给所有dbtlearn下的表,修改
dbt_project.yml
models:
dbtlearn:
+materialized: view
+post-hook:
- "GRANT SELECT ON {{ this }} TO ROLE REPORTER"
dim:
+materialized: table
src:
+materialized: ephemeral
-
运行,我们看到DEV下已经有了我们项目所有的表
2.10 第三方测试dbt-expectations
1.packages.yml
添加expectations
packages:
- package: calogica/dbt_expectations
version: [">=0.9.0", "<0.10.0"]
- 给项目安装packages
dbt deps
2.10.1 测试table
修改schema.yml
添加表级别的测试
- name: dim_listings_hosts
tests:
- dbt_expectations.expect_table_column_count_to_be_between:
max_value: 8 # (Optional)
2.10.2 测试列
columns:
- name: price
tests:
- dbt_expectations.expect_column_quantile_values_to_be_between:
quantile: .95
min_value: 50
max_value: 500
- dbt_expectations.expect_column_max_to_be_between:
max_value: 5000
config:
severity: warn