Pycharm调试Airflow

本文介绍如何使用Pycharm对Airflow进行Debug。
首先准备一个项目,目录如下:

├── airflow-project
│   ├── dags
│   │   └── debug_demo.py

其中debug_demo.py,包含如下内容,作为我们的调试对象

"""
## 演示debug
"""
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

from airflow import DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 8),
}
dag = DAG('debug_demo', catchup=False, default_args=default_args, schedule_interval="*/5 * * * *")
dag.doc_md = __doc__


def foo(**context):
    print(len(context))
    raise Exception("故意抛出异常")


t1 = PythonOperator(python_callable=foo, task_id="foo", dag=dag)
t2 = DummyOperator(task_id="dummy", dag=dag)
t1.doc_md = """
### 一个task
"""
t2.doc_md = """
### 另一个task
"""
t1 >> t2

通过上面的程序,我们创建了一个dag_iddebug_demo的工作流,其中有两个顺序执行的节点,task_id依次是foodummy。其中taskfoo对应了python方法def foo(**context),如果方法foo由复杂的代码组成,我们需要在其中打断点进行单步调试。
参照下图中的步骤,添加一个Run /Debug Configurations,在script path中选择airflow命令文件,Parameters中填写test {dag_id} {task_id} {execute_date},这需要注意的是{execute_date}一定要精确到毫秒,否则会运行失败。

Pycharm配置

参考资料:

  1. http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容