DAG文件处理

DAG文件处理指的是将DAGs文件夹中的Python文件转换为包含待调度任务的DAG对象的过程。

DAG文件处理涉及两个主要组件。DagFileProcessorManager是一个执行无限循环的进程,用于确定需要处理的文件;而DagFileProcessorProcess是一个独立进程,负责将单个文件转换为一个或多个DAG对象。

DagFileProcessorManager 负责运行用户代码。因此,您可以将其作为独立进程运行在与调度器进程不同的主机上。 如果决定将其作为独立进程运行,需要设置以下配置:AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True 并 执行 airflow dag-processor 命令行指令,否则启动调度器进程 (airflow scheduler) 时也会同时启动 DagFileProcessorManager

../_images/dag_file_processing_diagram.png

DagFileProcessorManager 包含以下步骤:

  1. 检查新文件:如果自DAG上次刷新后经过的时间大于 dag_dir_list_interval 则更新文件路径列表

  2. 排除最近处理过的文件:排除那些处理时间早于min_file_process_interval且未被修改的文件

  3. 队列文件路径:将发现的文件添加到文件路径队列中

  4. 处理文件:为每个文件启动一个新的DagFileProcessorProcess,最多不超过parsing_processes

  5. 收集结果:从任何已完成的DAG处理器中收集结果

  6. 日志统计:打印统计信息并发送dag_processing.total_parse_time

DagFileProcessorProcess 包含以下步骤:

  1. 处理文件:整个流程必须在 dag_file_processor_timeout 内完成

  2. DAG文件作为Python模块加载:必须在dagbag_import_timeout内完成

  3. 处理模块:在Python模块中查找DAG对象

  4. 返回DagBag:为DagFileProcessorManager提供已发现的DAG对象列表

这篇内容对您有帮助吗?