import airflow
import MySQLdb
from impala.util import as_pandas
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
# todo 优化脚本
def get_datas():
db = MySQLdb.connect(host="xxx.xx.xx.xx", port=4313, user="xxxx", password="xxxx",
database="xxxx", charset='utf8')
cur = db.cursor()
cur.execute(
"select * from xxx ")
return as_pandas(cur).fillna(0).to_dict(orient='records')
def multitasking_task(xx, dag):
python_command = "python3"
py_path = "/data/airflow/dag_scripts/xxxx.py"
return BashOperator(
task_id='handle_task_with_id_{}'.format(xx['id']),
bash_command="{} {} {} {}".format(python_command, py_path, "{{ ds }}", xx['id']),
dag=dag,
)
def running():
default_args = {
'owner': 'xx',
'start_date': airflow.utils.dates.days_ago(1),
'email': ['xx@xx.cn'],
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(dag_id='xx',
default_args=default_args,
schedule_interval='10 05 * * *', )
start = DummyOperator(
task_id="start",
dag=dag
)
end = DummyOperator(
task_id="end",
dag=dag
)
# todo 优化task过多情况,可以动态创建dag任务
for data in get_datas():
start >> multitasking_task(data , dag) >> end
return dag
dag = running()
Airflow 动态创建Task
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 在Spring Boot项目中,通过@EnableScheduling可启用Spring自带的定时任务支持,在通过...
- 注:从别人博客借鉴过来的,纯属自学用 http://blog.sina.com.cn/s/blog_6262a50...
- 1.Gradle的Project从本质上说只是含有多个Task的容器,一个Task与Ant的Target相似,表示...
- 2.4为什么学节点操作 获取元素通常使用两种方式: 两种方式都可以获取元素节点,我们后面都会使用,但是节点操作更简...