优先级权重

priority_weight 定义了执行器队列中的优先级。默认的 priority_weight1,可以调整为任何整数。此外,每个任务都有一个真实的 priority_weight,该值基于其 weight_rule 计算得出,weight_rule 定义了用于计算任务有效总优先级权重的加权方法。

以下是权重计算方法。默认情况下,Airflow的权重计算方法是downstream

downstream

任务的有效权重是所有下游子任务的累计总和。因此,当使用正权重值时,上游任务将具有更高的权重,并且会被更积极地调度。这在您有多个DAG运行实例时非常有用,因为您希望在所有运行的上游任务都完成后,每个DAG才能继续处理下游任务。

upstream

有效权重是所有上游祖先任务的聚合总和。 这与下游任务具有更高权重的情况相反, 当使用正权重值时,下游任务会被更积极地调度。 这在您有多个DAG运行实例时非常有用, 并且更倾向于让每个DAG先完成, 然后再启动其他DAG运行的上游任务。

absolute

有效权重是指精确指定的priority_weight值,不包含额外加权。当您明确知道每个任务应具有的优先级权重时,可能需要这样做。此外,当设置为absolute时,还能显著加快大型DAG的任务创建过程。

priority_weight参数可以与Pools结合使用。

注意

由于大多数数据库引擎使用32位整数,任何计算或定义的priority_weight最大值为2,147,483,647,最小值为-2,147,483,648。

自定义权重规则

在2.9.0版本中新增。

您可以通过扩展PriorityWeightStrategy类并在插件中注册来实现自己的自定义权重方法。

airflow/example_dags/plugins/decreasing_priority_weight_strategy.py[source]

class DecreasingPriorityStrategy(PriorityWeightStrategy):
    """A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""

    def get_weight(self, ti: TaskInstance):
        return max(3 - ti.try_number + 1, 1)


class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
    name = "decreasing_priority_weight_strategy_plugin"
    priority_weight_strategies = [DecreasingPriorityStrategy]


要使用它,您可以创建自定义类的实例,并在任务的weight_rule参数中提供该实例,或者提供自定义类的路径:

from custom_weight_rule_module import CustomPriorityWeightStrategy

# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())

# or provide the path of the class
task1 = BashOperator(
    task_id="task",
    bash_command="echo 1",
    weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)

这是一个实验性功能

这篇内容对您有帮助吗?