集群策略

如果你想在集群范围内检查或修改DAGs或Tasks,那么集群策略(Cluster Policy)可以实现这一需求。它们主要有三个用途:

  • 检查DAGs/Tasks是否符合特定标准

  • 在DAGs/任务上设置默认参数

  • 执行自定义路由逻辑

主要有三种集群策略类型:

  • dag_policy: 接收一个名为dagDAG参数。在从DagBag加载DAG时运行。

  • task_policy: 接收一个名为taskBaseOperator参数。该策略会在从DagBag加载任务时,在解析任务创建期间执行。这意味着可以在任务策略中修改整个任务定义。它不涉及在DagRun中运行的特定任务。定义的task_policy将应用于未来所有要执行的任务实例。

  • task_instance_mutation_hook: 接收一个名为task_instanceTaskInstance参数。该task_instance_mutation_hook不作用于任务本身,而是作用于与特定DagRun相关联的任务实例。它会在任务实例执行前,在"worker"中(而非dag文件处理器中)执行。该策略仅应用于当前正在执行的任务运行(即实例)。

DAG和任务集群策略可以通过抛出AirflowClusterPolicyViolation异常来表明传入的dag/任务不符合规范,不应被加载。

当需要有意跳过该DAG时,他们也可以抛出AirflowClusterPolicySkipDag异常。 与AirflowClusterPolicyViolation不同,该异常不会显示在Airflow网页界面上(内部来说,它不会被记录到元数据库的import_error表中)。

集群策略设置的任何额外属性优先级高于DAG文件中定义的属性;例如,如果在DAG文件中为任务设置了sla,而集群策略也设置了sla,则以集群策略设置的值为准。

如何定义策略函数

有两种方式可以配置集群策略:

  1. 在Python搜索路径中的某个位置创建一个airflow_local_settings.py文件($AIRFLOW_HOME下的config/文件夹是个不错的"默认"位置),然后向该文件添加与上述一个或多个集群策略名称匹配的可调用函数(例如dag_policy)。

有关如何配置本地设置的详细信息,请参阅配置本地设置

  1. 通过在自定义模块中使用setuptools入口点,并配合Pluggy接口实现。

    自版本2.6新增。

    该方法更为高级,适合已经熟悉python打包的人员使用。

    首先在模块中创建策略函数:

    from airflow.policies import hookimpl
    
    
    @hookimpl
    def task_policy(task) -> None:
        # 原地修改任务对象
        # ...
        print(f"Hello from {__file__}")
    

    然后将入口点添加到项目配置中。例如使用pyproject.tomlsetuptools

    [build-system]
    requires = ["setuptools", "wheel"]
    build-backend = "setuptools.build_meta"
    
    [project]
    name = "my-airflow-plugin"
    version = "0.0.1"
    # ...
    
    dependencies = ["apache-airflow>=2.6"]
    [project.entry-points.'airflow.policy']
    _ = 'my_airflow_plugin.policies'
    

    入口点组必须为airflow.policy,名称可忽略。值应为使用@hookimpl装饰器标记的模块(或类)。

    完成上述操作并将分发包安装到Airflow环境后,各Airflow组件将调用这些策略函数。(具体调用顺序未定义,因此若有多个插件时不要依赖特定调用顺序)。

需要注意的一个重要事项(无论采用哪种方式定义策略函数)是参数名称必须与下面文档中记录的完全一致。

可用策略函数

airflow.policies.task_policy(task)[来源]

允许在任务加载到DagBag后进行修改。

它允许管理员重新配置某些任务的参数。或者,您也可以抛出AirflowClusterPolicyViolation异常来阻止DAG执行。

以下是一些说明其用途的示例:

  • 你可以强制为使用SparkOperator的任务指定特定队列(例如spark队列),以确保这些任务被分配到正确的执行器上

  • 您可以强制执行任务超时策略,确保没有任务运行超过48小时

Parameters:

task (airflow.models.baseoperator.BaseOperator) – 待修改的任务

airflow.policies.dag_policy(dag)[来源]

允许在DAG加载到DagBag后对其进行修改。

它允许管理员重新配置某些DAG的参数。 或者您可以抛出AirflowClusterPolicyViolation异常 来阻止DAG被执行。

以下是一些说明其用法的示例:

  • 你可以为DAG强制执行默认用户

  • 检查每个DAG是否配置了标签

Parameters:

dag (airflow.models.dag.DAG) – 待变异的dag

airflow.policies.task_instance_mutation_hook(task_instance)[来源]

允许在任务实例被Airflow调度器排队之前对其进行修改。

例如,这可以用于在重试期间修改任务实例。

Parameters:

task_instance (airflow.models.taskinstance.TaskInstance) – 待修改的任务实例

airflow.policies.pod_mutation_hook(pod)[来源]

在调度前修改pod。

该设置允许在将kubernetes.client.models.V1Pod对象传递给Kubernetes客户端进行调度之前对其进行修改。

例如,这可以用于向KubernetesExecutor或KubernetesPodOperator启动的每个工作pod添加sidecar或init容器。

airflow.policies.get_airflow_context_vars(context)[来源]

将airflow上下文变量注入默认的airflow上下文变量中。

此设置允许获取airflow上下文变量,这些变量是键值对。它们随后会被注入到默认的airflow上下文变量中,最终在运行任务时作为环境变量可用。dag_id、task_id、execution_date、dag_run_id、try_number是保留键。

Parameters:

context – 感兴趣任务实例的上下文。

示例

DAG策略

该策略检查每个DAG是否至少定义了一个标签:

def dag_policy(dag: DAG):
    """Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
    if not dag.tags:
        raise AirflowClusterPolicyViolation(
            f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
        )

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )


注意

为了避免导入循环,如果在集群策略的类型注解中使用DAG,请确保从airflow.models导入,而不是从airflow导入。

注意

DAG策略在DAG完全加载后应用,因此覆盖default_args参数无效。如需覆盖默认操作器设置,请改用任务策略。

任务策略

以下是一个对每个任务强制执行最大超时策略的示例:

class TimedOperator(BaseOperator, ABC):
    timeout: timedelta


def task_policy(task: TimedOperator):
    if task.task_type == "HivePartitionSensor":
        task.queue = "sensor_queue"
    if task.timeout > timedelta(hours=48):
        task.timeout = timedelta(hours=48)


你也可以实施一些措施来防范常见错误,而不仅仅是作为技术安全控制。例如,不要在没有airflow所有者的情况下运行任务:

def task_must_have_owners(task: BaseOperator):
    if task.owner and not isinstance(task.owner, str):
        raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")

    if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
        raise AirflowClusterPolicyViolation(
            f"""Task must have non-None non-default owner. Current value: {task.owner}"""
        )


如果您需要应用多项检查,最佳实践是将这些规则整理到一个单独的Python模块中,并通过单一的策略/任务变更钩子来执行这些自定义检查并汇总各种错误信息,以便在用户界面(以及数据库中的导入错误表)中报告单个AirflowClusterPolicyViolation

例如,您的 airflow_local_settings.py 可能遵循以下模式:

TASK_RULES: list[Callable[[BaseOperator], None]] = [
    task_must_have_owners,
]


def _check_task_rules(current_task: BaseOperator):
    """Check task rules for given task."""
    notices = []
    for rule in TASK_RULES:
        try:
            rule(current_task)
        except AirflowClusterPolicyViolation as ex:
            notices.append(str(ex))
    if notices:
        notices_list = " * " + "\n * ".join(notices)
        raise AirflowClusterPolicyViolation(
            f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
            f"Notices:\n"
            f"{notices_list}"
        )


def example_task_policy(task: BaseOperator):
    """Ensure Tasks have non-default owners."""
    _check_task_rules(task)


有关如何配置本地设置的详细信息,请参阅配置本地设置

任务实例变更

以下是一个将处于第二次(或更多次)重试的任务重新路由到不同队列的示例:

def task_instance_mutation_hook(task_instance: TaskInstance):
    if task_instance.try_number >= 1:
        task_instance.queue = "retry_queue"


请注意,由于优先级权重是通过权重规则动态确定的,您无法在mutation钩子中修改任务实例的priority_weight

这篇内容对您有帮助吗?