DAG样例(执行MySql存储过程)
Airflow通过MySqlOperator执行sql语句,项目中需要执行带参数的存储过程,具体的DAG样例如下:
from airflow import DAG
from airflow.operators import BashOperator, DummyOperator,MySqlOperator
from airflow.models import DAG
from datetime import datetime, timedelta
# 定义一些时间参数
seven_days_ago = datetime.combine(datetime.today() - timedelta(7), datetime.min.time())
one_day_ago = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
deal_date = datetime.strftime(datetime.combine(datetime.today() - timedelta(1), datetime.min.time()), '%Y%m%d')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 3, 15),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'schedule_interval': timedelta(1),
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('dwd_rpt_m', default_args=default_args, schedule_interval='0 3 * * *')
# MySqlOperator
sql1 = ["set @v_txdate = %s" %deal_date,
"set @v_retcode = 0",
"call dw_dev.p_test_1(@v_txdate, @v_retcode)"
]
# guoshu_dev 需要在WebUi中设置变量
t1 = MySqlOperator(
mysql_conn_id='dev',
sql = sql1,
task_id='p_test_1',
dag=dag)
sql2 = ["set @v_txdate = %s" %deal_date,
"set @v_retcode = 0",
"call dw_dev.p_test_2(@v_txdate, @v_retcode)"
]
t2 = MySqlOperator(
mysql_conn_id='dev',
sql = sql2,
task_id='p_test_2',
dag=dag)
t2.set_upstream(t1)