动态DAG生成

本文档描述了如何创建具有动态生成结构的DAG,但这些DAG在不同运行周期中的任务数量保持不变。如果您需要实现任务数量(或自Airflow 2.6版本起的任务组数量)能根据前置任务输出/结果而变化的DAG,请参阅动态任务映射

注意

生成任务和任务组的一致顺序

在所有动态生成DAG的情况下,您应确保每次生成DAG时任务和任务组都以一致的顺序生成,否则您可能会遇到每次刷新页面时任务和任务组在网格视图中改变顺序的情况。这可以通过例如在数据库查询中使用稳定的排序机制,或使用Python中的sorted()函数来实现。

使用环境变量实现动态DAG

如果你想使用变量来配置代码,应该始终在顶层代码中使用 环境变量而非Airflow Variables。在顶层代码中使用Airflow Variables 会创建与Airflow元数据库的连接以获取值,这可能会减慢解析速度并给数据库带来额外负载。参阅Airflow Variables最佳实践 了解如何通过Jinja模板在DAG中最佳使用Airflow Variables。

例如,您可以为生产环境和开发环境设置不同的DEPLOYMENT变量。在生产环境中,可以将DEPLOYMENT变量设置为PROD,而在开发环境中设置为DEV。然后根据环境变量的值,您可以在生产环境和开发环境中构建不同的DAG。

deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
    task = Operator(param="prod-param")
elif deployment == "DEV":
    task = Operator(param="dev-param")

生成带有嵌入式元数据的Python代码

您可以在外部生成包含元数据的Python代码,这些元数据以可导入常量的形式存在。 这些常量随后可以直接被您的DAG导入,并用于构建对象和建立依赖关系。这使得从多个DAG中导入此类代码变得容易,无需查找、 加载和解析存储在常量中的元数据——这些操作会在Python解释器处理"import"语句时自动完成。初听起来可能有些奇怪,但生成这样的代码 并确保它是可以从您的DAG中导入的有效Python代码,实际上出人意料地简单。

例如假设你动态生成(在你的DAG文件夹中)my_company_utils/common.py文件:

# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]

然后你可以在所有DAG中像这样导入并使用ALL_TASKS常量:

from my_company_utils.common import ALL_TASKS

with DAG(
    dag_id="my_dag",
    schedule=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
):
    for task in ALL_TASKS:
        # create your operators and relations here
        ...

请注意,在这种情况下,您需要在my_company_utils文件夹中添加一个空的__init__.py文件, 并且应该将my_company_utils/.*这行添加到.airflowignore文件中(如果使用正则表达式忽略语法), 这样调度器在查找DAG时就会忽略整个文件夹。

基于结构化数据文件外部配置的动态DAG

如果您需要使用更复杂的元数据来准备DAG结构,并且更倾向于将数据保存在结构化的非Python格式中,您应该将数据导出到DAG文件夹中的文件并推送至该文件夹,而不是尝试通过DAG的顶层代码拉取数据——原因已在父级Top level Python Code中说明。

元数据应当被导出并以方便的文件格式(JSON、YAML格式是不错的选择)与DAGs一起存储在DAG文件夹中。理想情况下,元数据应当发布在与加载它的DAG文件模块相同的包/文件夹中,因为这样您可以轻松地在DAG中找到元数据文件的位置。要读取的文件位置可以通过包含DAG的模块的__file__属性找到:

my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
    configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here

注册动态DAG

在使用@dag装饰器或with DAG(..)上下文管理器时,您可以动态生成DAGs,Airflow会自动注册它们。

from datetime import datetime
from airflow.decorators import dag, task

configs = {
    "config1": {"message": "first DAG will receive this message"},
    "config2": {"message": "second DAG will receive this message"},
}

for config_name, config in configs.items():
    dag_id = f"dynamic_generated_dag_{config_name}"

    @dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
    def dynamic_generated_dag():
        @task
        def print_message(message):
            print(message)

        print_message(config["message"])

    dynamic_generated_dag()

以下代码将为每个配置生成一个DAG:dynamic_generated_dag_config1dynamic_generated_dag_config2。 每个DAG都可以根据相关配置独立运行。

如果不希望自动注册DAG,可以通过在DAG上设置auto_register=False来禁用此行为。

版本2.4变更:从2.4版本开始,通过调用@dag装饰函数创建的DAG(或在with DAG(...)上下文管理器中使用的DAG)会自动注册,不再需要存储在全局变量中。

优化执行期间DAG解析延迟

在2.4版本中新增。

这是一个实验性功能

有时当你从一个DAG文件生成大量动态DAG时,在任务执行期间解析该DAG文件可能会导致不必要的延迟。其影响表现为任务启动前的延迟。

为什么会发生这种情况?你可能没有意识到,但在任务执行之前,Airflow会解析DAG所在的Python文件。

Airflow调度器(更准确地说,是DAG文件处理器)需要加载完整的DAG文件来处理所有元数据。然而,任务执行仅需单个DAG对象即可运行任务。基于此认知,我们可以在任务执行时跳过生成不必要的DAG对象,从而缩短解析时间。当生成的DAG数量较多时,此优化效果最为显著。

有一种实验性方法可以用来优化这一行为。需要注意的是,并非所有情况都适用(例如当后续DAG的生成依赖于前序DAG时),或者当您的DAG生成存在某些副作用时。此外,下面的代码片段相当复杂,虽然我们已进行测试且在大多数情况下有效,但在某些情况下可能无法正确检测当前解析的DAG,从而导致回退到创建所有DAG或失败。请谨慎使用此解决方案,并彻底测试。

一个展示性能提升的绝佳例子可以在Airflow的魔法循环博客文章中找到,该文描述了任务执行期间的解析时间如何从120秒减少到200毫秒。(这个例子是在Airflow 2.4之前编写的,因此它使用了Airflow未记录的行为。)

在Airflow 2.4中,您可以使用get_parsing_context()方法 以文档化和可预测的方式检索当前上下文。

在遍历需要生成DAG的集合时,您可以使用上下文来确定是需要生成所有DAG对象(在DAG文件处理器中解析时),还是仅生成单个DAG对象(在执行任务时)。

get_parsing_context() 返回当前的解析上下文。该上下文属于 AirflowParsingContext 类型, 当只需要单个DAG/任务时,它包含已设置的 dag_idtask_id 字段。 当需要"完整"解析时(例如在DAG文件处理器中),上下文的 dag_idtask_id 会被设为 None

from airflow.models.dag import DAG
from airflow.utils.dag_parsing_context import get_parsing_context

current_dag_id = get_parsing_context().dag_id

for thing in list_of_things:
    dag_id = f"generated_dag_{thing}"
    if current_dag_id is not None and current_dag_id != dag_id:
        continue  # skip generation of non-selected DAG

    with DAG(dag_id=dag_id, ...):
        ...

这篇内容对您有帮助吗?