airflow scheduler 执行过程

image.png

0、airflow scheduler命令执行,创建schedulerJob并开始执行

1、根据conf初始化Executor,执行器启动 (executor.start())
初始化DAG处理管理器代理(DagFileProcessorAgent)并启动
2、创建一个新的进程,DagFileProcessorManager初始化并启动

self._process = multiprocessing.Process(
            target=type(self)._run_processor_manager,
            args=(
                self._dag_directory,
                self._file_paths,
                self._max_runs,
                self._processor_factory,
                self._processor_timeout,
                child_signal_conn,
                self._async_mode,
            )
        )
        self._process.start()

4、遍历Dags文件一个文件创建一个进程
DagFileProcessor.start() →DagFileProcessor._run_file_processor()
5、定时任务类(SchedulerJob)处理单个DAG文件,判断是否满足执行条件;创建DAGRun和TaskInstance,修改DAGRun状态

SchedulerJob.process_file(file_path, pickle_dags)
                     self._process_dags(dagbag, dags, ti_keys_to_schedule)
                     self._process_task_instances(dag, tis_out)
                         -> run.verify_integrity(session=session) 创建及校验TaskInstance,
                            run.update_state(session=session)   根据TI修改DAGRun的状态
       遍历DAGRun中TaskInstance将满足依赖条件修改TI运行状态,将已处理完的DAG发送到队列中

查询可执行TI

for ti in tis:
    task = dag.get_task(ti.task_id)
 
    # fixme: ti.task is transient but needs to be set
    ti.task = task
 
    if ti.are_dependencies_met(  ##判断是否满足依赖条件
            dep_context=DepContext(flag_upstream_failed=True),
            session=session):
        self.log.debug('Queuing task: %s', ti)
        task_instances_list.append(ti.key)

发送已处理完成的DAG

result = scheduler_job.process_file(file_path, pickle_dags)
result_channel.send(result)

7、SchedulerJob,循环收集各个DAG文件处理器进程中处理完成的DAG
simple_dags = self.processor_agent.harvest_simple_dags()
根据已处理完的DAG,根据pool的大小和Task的权重,执行器将TaskInstance发送到队列中
self._execute_task_instances(simple_dag_bag,(State.SCHEDULED,session))
发送TaskInstance到队列中

# actually enqueue them
       for simple_task_instance in simple_task_instances:
           simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
           command = TI.generate_command(
               simple_task_instance.dag_id,
               simple_task_instance.task_id,
               simple_task_instance.execution_date,
               local=True,
               mark_success=False,
               ignore_all_deps=False,
               ignore_depends_on_past=False,
               ignore_task_deps=False,
               ignore_ti_state=False,
               pool=simple_task_instance.pool,
               file_path=simple_dag.full_filepath,
               pickle_id=simple_dag.pickle_id)
 
           priority = simple_task_instance.priority_weight
           queue = simple_task_instance.queue
           self.log.info(
               "Sending %s to executor with priority %s and queue %s",
               simple_task_instance.key, priority, queue
           )
 
           self.executor.queue_command(
               simple_task_instance,
               command,
               priority=priority,
               queue=queue)
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 又来到了一个老生常谈的问题,应用层软件开发的程序员要不要了解和深入学习操作系统呢? 今天就这个问题开始,来谈谈操...
    tangsl阅读 9,658评论 0 23
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 8,981评论 0 5
  • Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校...
    达微阅读 3,762评论 0 0
  • 本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合! Spark术语 Application:用户编写的Sp...
    胖滚猪学编程阅读 4,603评论 0 1
  • 午夜的街道,街灯、穿梭的汽车和雾气笼罩的街道。这是11月底的一天,她又是最晚离开公司的那一个。风很大,北方冬天的大...
    旖宾阅读 3,081评论 0 0

友情链接更多精彩内容