import airflow
import MySQLdb
from impala.util import as_pandas
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
# todo 优化脚本
def get_datas():
db = MySQLdb.connect(host="xxx.xx.xx.xx", port=4313, user="xxxx", password="xxxx",
database="xxxx", charset='utf8')
cur = db.cursor()
cur.execute(
"select * from xxx ")
return as_pandas(cur).fillna(0).to_dict(orient='records')
def multitasking_task(xx, dag):
python_command = "python3"
py_path = "/data/airflow/dag_scripts/xxxx.py"
return BashOperator(
task_id='handle_task_with_id_{}'.format(xx['id']),
bash_command="{} {} {} {}".format(python_command, py_path, "{{ ds }}", xx['id']),
dag=dag,
)
def running():
default_args = {
'owner': 'xx',
'start_date': airflow.utils.dates.days_ago(1),
'email': ['xx@xx.cn'],
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(dag_id='xx',
default_args=default_args,
schedule_interval='10 05 * * *', )
start = DummyOperator(
task_id="start",
dag=dag
)
end = DummyOperator(
task_id="end",
dag=dag
)
# todo 优化task过多情况,可以动态创建dag任务
for data in get_datas():
start >> multitasking_task(data , dag) >> end
return dag
dag = running()
Airflow 动态创建Task
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- 在Spring Boot项目中,通过@EnableScheduling可启用Spring自带的定时任务支持,在通过...
- 注:从别人博客借鉴过来的,纯属自学用 http://blog.sina.com.cn/s/blog_6262a50...
- 1.Gradle的Project从本质上说只是含有多个Task的容器,一个Task与Ant的Target相似,表示...
- 2.4为什么学节点操作 获取元素通常使用两种方式: 两种方式都可以获取元素节点,我们后面都会使用,但是节点操作更简...