方法1:直接airflow环境里安装dbt,映射本地dbt文件到airflow环境下
1. 首次加载官方的airflow镜像
1.官网下载最新版本的airflow的docker-compose文件
https://airflow.apache.org/docs/apache-airflow/2.8.4/docker-compose.yaml
2.新建文件夹airflow_dbt
,创建docker-compose.yml
文件并且复制1的内容进去
略
注意:将example改为false
3.运行docker compose up
,第一次执行,会自动下载官方的images。运行成功后,我们可以看到8080端口的airflow webservice已经启动了
- 使用bash进入容器内部
docker exec -it ③的容器id /bin/bash
2.配置自己的dockerfile,安装dbt
1.根目录下创建Dockerfile文件
2.编写Dockerfile文件
# 基础镜像
FROM apache/airflow:2.8.4
#将当前的文件(前)根目录的re.txt 拷贝到项目的跟目录下
COPY requirements.txt requirements.txt
#项目内更新pip install保持最新
RUN pip install --user --upgrade pip
#运行项目内的requirements.txt
RUN pip install -r requirements.txt
#
3.根目录创建requirement.txt
,以aiflow为基础,安装本次项目所需要的所有包,例如dbt以及所有的airflow的provider,本次只演示将dbt-snowflake与airflow相结合
dbt-core
dbt-snowflake
3.配置docker-compose.yml
文件,映射本地dbt环境
1.在windows的环境下,我们使用的是.venv
的虚拟环境,所以我们创建一个新的dbt项目,这里就省略拆dbt创建的过程,总之,我们将得到一个dbt的项目,以及profiles.yml
文件,存放着链接snowflake的信息
2.在airflow文件夹下创建
my_dbt
文件夹,用来将①的项目文件复制到该文件夹- 设置
docker-compose.yml
文件,将dbt所需要的文件映射到docker容器内,主要修改的是airflow的volumn配置
volumes:
- ./dags:/opt/airflow/dags #
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./test:/opt/airflow/test
- ./my_dbt:/opt/airflow/my_dbt
- ./root/.dbt:/home/airflow/.dbt
- 本机的项目根目录下创建
root/.dbt/profiles.yml
用来存放snowflake的配置参数,该文件就是windows下,c盘下,用户的profiles.yml文件,非常重要
- 注意:这里填写的database,role,schema,应该是我们实际项目的数据库名称,而不是被迁移的数据库名称,
4. 运行docker build,创建我们新配置的images镜像
docker build . --tag airflow_dbt_snowflake:latest
-
查看我们新建的镜像
5. 修改docker-compose文件,替换我们自己的镜像
- 运行compose-up 启动我们的服务
5. 编写第一个测试dag,测试当前环境是否正确
- 1.编辑
test.py
测试dbt是否配置正确
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
_default_args = {
'max_active_runs': 1,
'catchup': False,
'start_date': datetime(year=2020, month=9, day=1)
}
with DAG(
dag_id = 'test1',
default_args= _default_args,
start_date = datetime(2024,2,3,1),
schedule_interval = None
) as dag:
task1 = BashOperator(
task_id='first_dbt_dag',
bash_command='dbt run'
)
task1
-
运行dag,报错原因,运行的环境是airflow的tmp文件里,并不是dbt的项目文件
-
3.修改
test.py
,将bash_command里添加cd /my_dbt
,意思是直接当前文件进入到my_dbt
文件夹下
task1 = BashOperator(
task_id='first_dbt_dag',
bash_command='cd /my_dbt && dbt run'
)
- 再次运行报错为:没有找到
my_dbt
文件夹
-5. 修改docker-compose.yml
文件的volume,将本地的文件,映射到虚拟机环境的根目录下
- 再次运行报错为:没有找到
-
再次trigger我们的dag,运行成功
-
6.编写自己的DbtOperator
-
文件结构
编写自己的Operator
dbt_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import subprocess
from subprocess import run, PIPE
class DbtOperator(BaseOperator):
"""
自定义的 DbtOperator,用于执行 dbt 命令
"""
@apply_defaults
def __init__(self, dbt_command, dbt_project_dir='/my_dbt', *args, **kwargs):
"""
初始化方法,用于设置 Operator 的参数
:param dbt_command: 要执行的 dbt 命令
:param dbt_project_dir: dbt 项目目录路径
"""
super(DbtOperator, self).__init__(*args, **kwargs)
self.dbt_command = dbt_command
self.dbt_project_dir = dbt_project_dir
def execute(self, context):
"""
执行 dbt 命令的逻辑
"""
self.log.info(f'Executing DbtOperator with dbt_command={self.dbt_command}')
# 构建要执行的 bash 命令
bash_command = f'cd {self.dbt_project_dir} && dbt {self.dbt_command}'
# 使用 subprocess 模块执行 bash 命令,并实时输出到标准输出
process = subprocess.Popen(
bash_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
universal_newlines=True
)
# 读取并打印命令输出
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
self.log.info(output.strip())
# 检查命令执行结果
if process.returncode != 0:
# 如果命令返回值不为 0,则表示命令执行失败
raise AirflowException(f'Dbt command execution failed with return code {process.returncode}')
else:
# 如果命令返回值为 0,则表示命令执行成功
self.log.info('Dbt command execution successful')
3.Dag里使用
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from dbtOp.dbt_operator import DbtOperator
_default_args = {
'max_active_runs': 1,
'catchup': False,
'start_date': datetime(year=2020, month=9, day=1)
}
with DAG(
dag_id = 'test1',
default_args= _default_args,
start_date = datetime(2024,2,3,1),
schedule_interval = None
) as dag:
task1 = DbtOperator(
task_id='test_my_DbtOperator',
dbt_command='test'
)
task1
7. 安装dbt项目所需要的包
在dbt项目的根目录下创建packages.yml
将所需要的包全部安装
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
- package: dbt-labs/codegen
version: 0.12.1
- package: calogica/dbt_expectations
version: 0.10.3
在dbt 根目录下运行
dbt deps