本文介绍如何使用Pycharm对Airflow进行Debug。
首先准备一个项目,目录如下:
├── airflow-project
│ ├── dags
│ │ └── debug_demo.py
其中debug_demo.py,包含如下内容,作为我们的调试对象
"""
## 演示debug
"""
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 8),
}
dag = DAG('debug_demo', catchup=False, default_args=default_args, schedule_interval="*/5 * * * *")
dag.doc_md = __doc__
def foo(**context):
print(len(context))
raise Exception("故意抛出异常")
t1 = PythonOperator(python_callable=foo, task_id="foo", dag=dag)
t2 = DummyOperator(task_id="dummy", dag=dag)
t1.doc_md = """
### 一个task
"""
t2.doc_md = """
### 另一个task
"""
t1 >> t2
通过上面的程序,我们创建了一个dag_id为debug_demo
的工作流,其中有两个顺序执行的节点,task_id依次是foo
、dummy
。其中taskfoo
对应了python方法def foo(**context)
,如果方法foo
由复杂的代码组成,我们需要在其中打断点进行单步调试。
参照下图中的步骤,添加一个Run /Debug Configurations,在script path中选择airflow命令文件,Parameters中填写test {dag_id} {task_id} {execute_date}
,这需要注意的是{execute_date}
一定要精确到毫秒,否则会运行失败。
参考资料: