打造Windows环境下的docker:airflow_dbt_snowflake

方法1:直接airflow环境里安装dbt,映射本地dbt文件到airflow环境下

1. 首次加载官方的airflow镜像

1.官网下载最新版本的airflow的docker-compose文件

https://airflow.apache.org/docs/apache-airflow/2.8.4/docker-compose.yaml

2.新建文件夹airflow_dbt,创建docker-compose.yml文件并且复制1的内容进去

注意:将example改为false
3.运行docker compose up,第一次执行,会自动下载官方的images。运行成功后,我们可以看到8080端口的airflow webservice已经启动了

image.png

  1. 使用bash进入容器内部
docker exec -it ③的容器id /bin/bash
image.png

2.配置自己的dockerfile,安装dbt

1.根目录下创建Dockerfile文件


image.png

2.编写Dockerfile文件

# 基础镜像
FROM apache/airflow:2.8.4 
#将当前的文件(前)根目录的re.txt 拷贝到项目的跟目录下
COPY requirements.txt requirements.txt
#项目内更新pip install保持最新
RUN pip install --user --upgrade pip
#运行项目内的requirements.txt
RUN pip install -r requirements.txt
#

3.根目录创建requirement.txt,以aiflow为基础,安装本次项目所需要的所有包,例如dbt以及所有的airflow的provider,本次只演示将dbt-snowflake与airflow相结合

dbt-core
dbt-snowflake

3.配置docker-compose.yml文件,映射本地dbt环境

1.在windows的环境下,我们使用的是.venv的虚拟环境,所以我们创建一个新的dbt项目,这里就省略拆dbt创建的过程,总之,我们将得到一个dbt的项目,以及profiles.yml文件,存放着链接snowflake的信息

image.png

2.在airflow文件夹下创建my_dbt文件夹,用来将①的项目文件复制到该文件夹
image.png

  1. 设置docker-compose.yml文件,将dbt所需要的文件映射到docker容器内,主要修改的是airflow的volumn配置
  volumes:
    - ./dags:/opt/airflow/dags #
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./test:/opt/airflow/test
    - ./my_dbt:/opt/airflow/my_dbt
    - ./root/.dbt:/home/airflow/.dbt
image.png
  1. 本机的项目根目录下创建root/.dbt/profiles.yml用来存放snowflake的配置参数,该文件就是windows下,c盘下,用户的profiles.yml文件,非常重要
    image.png
  • 注意:这里填写的database,role,schema,应该是我们实际项目的数据库名称,而不是被迁移的数据库名称,

4. 运行docker build,创建我们新配置的images镜像

docker build . --tag airflow_dbt_snowflake:latest
image.png
  • 查看我们新建的镜像


    image.png

5. 修改docker-compose文件,替换我们自己的镜像

image.png
  • 运行compose-up 启动我们的服务

5. 编写第一个测试dag,测试当前环境是否正确

  • 1.编辑test.py测试dbt是否配置正确
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

_default_args = {
    'max_active_runs': 1,
    'catchup': False,
    'start_date': datetime(year=2020, month=9, day=1)
}


with DAG(
    dag_id = 'test1',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = None
) as dag:
    task1 = BashOperator(
        task_id='first_dbt_dag',
        bash_command='dbt run'
    )
task1
    1. 运行dag,报错原因,运行的环境是airflow的tmp文件里,并不是dbt的项目文件


      image.png
  • 3.修改test.py,将bash_command里添加cd /my_dbt,意思是直接当前文件进入到my_dbt文件夹下

    task1 = BashOperator(
        task_id='first_dbt_dag',
        bash_command='cd /my_dbt && dbt run'
    )
    1. 再次运行报错为:没有找到my_dbt文件夹
      image.png

      -5. 修改docker-compose.yml文件的volume,将本地的文件,映射到虚拟机环境的根目录下
      image.png
    1. 再次trigger我们的dag,运行成功


      image.png

6.编写自己的DbtOperator

  1. 文件结构


    image.png
  2. 编写自己的Operatordbt_operator.py

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import subprocess
from subprocess import run, PIPE

class DbtOperator(BaseOperator):
    """
    自定义的 DbtOperator,用于执行 dbt 命令
    """

    @apply_defaults
    def __init__(self, dbt_command, dbt_project_dir='/my_dbt', *args, **kwargs):
        """
        初始化方法,用于设置 Operator 的参数
        :param dbt_command: 要执行的 dbt 命令
        :param dbt_project_dir: dbt 项目目录路径
        """
        super(DbtOperator, self).__init__(*args, **kwargs)
        self.dbt_command = dbt_command
        self.dbt_project_dir = dbt_project_dir

    def execute(self, context):
        """
        执行 dbt 命令的逻辑
        """
        self.log.info(f'Executing DbtOperator with dbt_command={self.dbt_command}')

        # 构建要执行的 bash 命令
        bash_command = f'cd {self.dbt_project_dir} && dbt {self.dbt_command}'
           # 使用 subprocess 模块执行 bash 命令,并实时输出到标准输出
        process = subprocess.Popen(
            bash_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            shell=True,
            universal_newlines=True
        )

        # 读取并打印命令输出
        while True:
            output = process.stdout.readline()
            if output == '' and process.poll() is not None:
                break
            if output:
                self.log.info(output.strip())

        # 检查命令执行结果
        if process.returncode != 0:
            # 如果命令返回值不为 0,则表示命令执行失败
            raise AirflowException(f'Dbt command execution failed with return code {process.returncode}')
        else:
            # 如果命令返回值为 0,则表示命令执行成功
            self.log.info('Dbt command execution successful')

3.Dag里使用

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from dbtOp.dbt_operator import DbtOperator

_default_args = {
    'max_active_runs': 1,
    'catchup': False,
    'start_date': datetime(year=2020, month=9, day=1)
}


with DAG(
    dag_id = 'test1',
    default_args= _default_args,
    start_date = datetime(2024,2,3,1),
    schedule_interval = None
) as dag:
    task1 = DbtOperator(
        task_id='test_my_DbtOperator',
        dbt_command='test'
    )
task1

7. 安装dbt项目所需要的包

在dbt项目的根目录下创建packages.yml将所需要的包全部安装

packages:
  - package: dbt-labs/dbt_utils
    version: 1.1.1
  - package: dbt-labs/codegen
    version: 0.12.1
  - package: calogica/dbt_expectations
    version: 0.10.3

在dbt 根目录下运行

dbt deps
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容