打造Windows环境下的docker:airflow_dbt_snowflake

方法1:直接airflow环境里安装dbt,映射本地dbt文件到airflow环境下

1. 首次加载官方的airflow镜像

1.官网下载最新版本的airflow的docker-compose文件

https://airflow.apache.org/docs/apache-airflow/2.8.4/docker-compose.yaml

2.新建文件夹airflow_dbt,创建docker-compose.yml文件并且复制1的内容进去

注意:将example改为false
3.运行docker compose up,第一次执行,会自动下载官方的images。运行成功后,我们可以看到8080端口的airflow webservice已经启动了

image.png

  1. 使用bash进入容器内部
docker exec -it ③的容器id /bin/bash
image.png

2.配置自己的dockerfile,安装dbt

1.根目录下创建Dockerfile文件


image.png

2.编写Dockerfile文件

# 基础镜像
FROM apache/airflow:2.8.4 
#将当前的文件(前)根目录的re.txt 拷贝到项目的跟目录下
COPY requirements.txt requirements.txt
#项目内更新pip install保持最新
RUN pip install --user --upgrade pip
#运行项目内的requirements.txt
RUN pip install -r requirements.txt
#

3.根目录创建requirement.txt,以aiflow为基础,安装本次项目所需要的所有包,例如dbt以及所有的airflow的provider,本次只演示将dbt-snowflake与airflow相结合

dbt-core
dbt-snowflake

3.配置docker-compose.yml文件,映射本地dbt环境

1.在windows的环境下,我们使用的是.venv的虚拟环境,所以我们创建一个新的dbt项目,这里就省略拆dbt创建的过程,总之,我们将得到一个dbt的项目,以及profiles.yml文件,存放着链接snowflake的信息

image.png

2.在airflow文件夹下创建my_dbt文件夹,用来将①的项目文件复制到该文件夹
image.png

  1. 设置docker-compose.yml文件,将dbt所需要的文件映射到docker容器内,主要修改的是airflow的volumn配置
  volumes:
    - ./dags:/opt/airflow/dags #
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./test:/opt/airflow/test
    - ./my_dbt:/opt/airflow/my_dbt
    - ./root/.dbt:/home/airflow/.dbt
image.png
  1. 本机的项目根目录下创建root/.dbt/profiles.yml用来存放snowflake的配置参数,该文件就是windows下,c盘下,用户的profiles.yml文件,非常重要
    image.png
  • 注意:这里填写的database,role,schema,应该是我们实际项目的数据库名称,而不是被迁移的数据库名称,

4. 运行docker build,创建我们新配置的images镜像

docker build . --tag airflow_dbt_snowflake:latest
image.png
  • 查看我们新建的镜像


    image.png

5. 修改docker-compose文件,替换我们自己的镜像

image.png
  • 运行compose-up 启动我们的服务

5. 编写第一个测试dag,测试当前环境是否正确

  • 1.编辑test.py测试dbt是否配置正确
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

_default_args = {
    'max_active_runs': 1,
    'catchup': False,
    'start_date': datetime(year=2020, month=9, day=1)
}


with DAG(
    dag_id = 'test1',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = None
) as dag:
    task1 = BashOperator(
        task_id='first_dbt_dag',
        bash_command='dbt run'
    )
task1
    1. 运行dag,报错原因,运行的环境是airflow的tmp文件里,并不是dbt的项目文件


      image.png
  • 3.修改test.py,将bash_command里添加cd /my_dbt,意思是直接当前文件进入到my_dbt文件夹下

    task1 = BashOperator(
        task_id='first_dbt_dag',
        bash_command='cd /my_dbt && dbt run'
    )
    1. 再次运行报错为:没有找到my_dbt文件夹
      image.png

      -5. 修改docker-compose.yml文件的volume,将本地的文件,映射到虚拟机环境的根目录下
      image.png
    1. 再次trigger我们的dag,运行成功


      image.png

6.编写自己的DbtOperator

  1. 文件结构


    image.png
  2. 编写自己的Operatordbt_operator.py

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import subprocess
from subprocess import run, PIPE

class DbtOperator(BaseOperator):
    """
    自定义的 DbtOperator,用于执行 dbt 命令
    """

    @apply_defaults
    def __init__(self, dbt_command, dbt_project_dir='/my_dbt', *args, **kwargs):
        """
        初始化方法,用于设置 Operator 的参数
        :param dbt_command: 要执行的 dbt 命令
        :param dbt_project_dir: dbt 项目目录路径
        """
        super(DbtOperator, self).__init__(*args, **kwargs)
        self.dbt_command = dbt_command
        self.dbt_project_dir = dbt_project_dir

    def execute(self, context):
        """
        执行 dbt 命令的逻辑
        """
        self.log.info(f'Executing DbtOperator with dbt_command={self.dbt_command}')

        # 构建要执行的 bash 命令
        bash_command = f'cd {self.dbt_project_dir} && dbt {self.dbt_command}'
           # 使用 subprocess 模块执行 bash 命令,并实时输出到标准输出
        process = subprocess.Popen(
            bash_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            shell=True,
            universal_newlines=True
        )

        # 读取并打印命令输出
        while True:
            output = process.stdout.readline()
            if output == '' and process.poll() is not None:
                break
            if output:
                self.log.info(output.strip())

        # 检查命令执行结果
        if process.returncode != 0:
            # 如果命令返回值不为 0,则表示命令执行失败
            raise AirflowException(f'Dbt command execution failed with return code {process.returncode}')
        else:
            # 如果命令返回值为 0,则表示命令执行成功
            self.log.info('Dbt command execution successful')

3.Dag里使用

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from dbtOp.dbt_operator import DbtOperator

_default_args = {
    'max_active_runs': 1,
    'catchup': False,
    'start_date': datetime(year=2020, month=9, day=1)
}


with DAG(
    dag_id = 'test1',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = None
) as dag:
    task1 = DbtOperator(
        task_id='test_my_DbtOperator',
        dbt_command='test'
    )
task1

7. 安装dbt项目所需要的包

在dbt项目的根目录下创建packages.yml将所需要的包全部安装

packages:
  - package: dbt-labs/dbt_utils
    version: 1.1.1
  - package: dbt-labs/codegen
    version: 0.12.1
  - package: calogica/dbt_expectations
    version: 0.10.3

在dbt 根目录下运行

dbt deps
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容