DAG运行

DAG运行是一个表示DAG在时间上实例化的对象。每当DAG被执行时,就会创建一个DAG运行,并执行其中的所有任务。DAG运行的状态取决于任务的状态。每个DAG运行都是相互独立运行的,这意味着您可以同时拥有多个DAG的运行实例。

DAG运行状态

DAG运行状态在其执行完成时确定。DAG的执行取决于其包含的任务及其依赖关系。当所有任务都处于终端状态(即无法再转换到其他状态)时,如success(成功)、failed(失败)或skipped(跳过),该状态会被赋予DAG运行。DAG运行的状态是基于所谓的"叶节点"或简称为"叶子"来确定的。叶节点是指没有子任务的任务。

DAG运行有两种可能的终止状态:

  • success 如果所有叶子节点的状态都是 successskipped

  • failed 如果任意叶子节点的状态是 failedupstream_failed

注意

请注意,如果你的某些任务定义了特定的触发规则。这可能导致一些意外行为,例如,如果你有一个叶子任务的触发规则是“all_done”,那么无论其他任务的状态如何,它都会被执行;如果它成功了,那么整个DAG运行也会被标记为success,即使中间有任务失败了。

Airflow 2.7版本新增

当前正在运行的DAG运行实例可以在UI仪表板的"运行中"标签页查看。同样,最新DAG运行被标记为失败的可以在"失败"标签页找到。

数据间隔

Airflow中的每个DAG运行都有一个指定的“数据间隔”,代表其运行的时间范围。例如,对于使用@daily调度的DAG,其每个数据间隔将从每天午夜(00:00)开始,并在午夜(24:00)结束。

DAG运行通常在其关联的数据间隔结束后被调度,以确保运行能够收集该时间段内的所有数据。换句话说,覆盖2020-01-01数据周期的运行通常要到2020-01-01结束后才开始执行,即在2020-01-02 00:00:00之后。

Airflow中的所有日期都以某种方式与数据区间概念相关联。例如,DAG运行的"逻辑日期"(在2.2版本之前的Airflow中也称为execution_date)表示数据区间的开始时间,而非DAG实际执行的时间。

类似地,由于DAG及其任务的start_date参数指向相同的逻辑日期,它标记的是DAG第一个数据间隔的开始时间,而非DAG中任务开始运行的时间。换句话说,DAG运行只会在start_date之后的一个间隔被调度。

提示

If a cron expression or timedelta object is not enough to express your DAG’s schedule, logical date, or data interval, see Timetables. For more information on logical date, see Running DAGs and What does execution_date mean?

重新运行DAG

在某些情况下,您可能需要重新执行DAG。例如当预定的DAG运行失败时。

追赶

一个定义了start_date(可能还有end_date)以及非数据集调度的Airflow DAG,会定义一系列时间间隔,调度器将这些间隔转换为单独的DAG运行并执行。 默认情况下,调度器会为自上一个数据间隔以来未运行(或已被清除)的任何数据间隔触发DAG运行。这个概念被称为追赶(Catchup)。

如果你的DAG没有编写处理追赶逻辑(例如,不限于时间间隔,而是基于Now等情况),那么你可能需要关闭追赶功能。这可以通过在DAG中设置catchup=False或在配置文件中设置catchup_by_default=False来实现。当关闭时,调度器只会为最新时间间隔创建DAG运行。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

import datetime
import pendulum

dag = DAG(
    "tutorial",
    default_args={
        "depends_on_past": True,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=3),
    },
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="A simple tutorial DAG",
    schedule="@daily",
    catchup=False,
)

在上面的例子中,如果DAG在2016-01-02早上6点被调度守护进程获取(或通过命令行),将会创建一个数据区间在2016-01-01和2016-01-02之间的DAG运行实例,下一个实例将在2016-01-03凌晨午夜后立即创建,其数据区间为2016-01-02至2016-01-03。

请注意,使用datetime.timedelta对象作为调度间隔可能导致不同的行为。 在这种情况下,创建的单个DAG运行将覆盖2016-01-01 06:00至2016-01-02 06:00之间的数据(一个调度间隔结束于当前时间)。如需更详细地了解基于cron和基于时间增量的调度之间的区别,请参阅timetables comparison

如果dag.catchup的值是True,调度器会为2015-12-01到2016-01-02之间每个已完成的时间间隔创建一个DAG运行(但不会为2016-01-02创建,因为该时间间隔尚未完成),然后调度器会按顺序执行它们。

当您关闭DAG一段时间后重新启用它时,也会触发Catchup。

这种行为非常适合可以轻松按时间段分割的原子数据集。如果您的DAG在内部执行追赶操作,关闭catchup功能会非常有效。

回填

在某些情况下,您可能需要为指定的历史时间段运行DAG,例如: 一个数据填充DAG创建时设置了start_date2019-11-21,但另一个用户需要一个月前的输出数据,即2019-10-21。 这个过程被称为回填(Backfill)。

即使在禁用追赶(catchup)的情况下,您可能仍希望回填数据。这可以通过CLI完成。 运行以下命令

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \
    dag_id

回填命令将重新运行dag_id在开始日期和结束日期之间的所有时间间隔内的所有实例。

重新运行任务

在计划运行期间,某些任务可能会失败。在查看日志并修复错误后,您可以通过清除计划日期的任务来重新运行它们。清除任务实例会创建该任务实例的记录。当前任务实例的try_number会增加,max_tries设置为0,状态设置为None,这将导致任务重新运行。

在树状图或图表视图中点击失败的任务,然后点击清除。 执行器会重新运行该任务。

您可以选择多个选项来重新运行 -

  • 过去 - DAG最近数据间隔之前的所有任务运行实例

  • Future - 该DAG最近数据间隔之后的所有运行中的任务实例

  • 上游 - 当前DAG中的上游任务

  • 下游 - 当前DAG中的下游任务

  • 递归 - 子DAG和父DAG中的所有任务

  • 失败 - 仅包含DAG最近一次运行中失败的任务

您也可以通过CLI使用以下命令清除任务:

airflow tasks clear dag_id \
    --task-regex task_regex \
    --start-date START_DATE \
    --end-date END_DATE

对于指定的dag_id和时间间隔,该命令会清除所有匹配正则表达式的任务实例。 如需更多选项,可查看clear命令的帮助文档:

airflow tasks clear --help

任务实例历史记录

当任务实例重试或被清除时,任务实例的历史记录会被保留。您可以通过在网格视图中点击任务实例来查看此历史记录。

../_images/task_instance_history.png

注意

上面显示的尝试选择器仅适用于已重试或清除的任务。

历史记录显示特定运行结束时任务实例属性的值。在日志页面,您还可以查看每个任务实例尝试的日志。这对于调试非常有用。

../_images/task_instance_history_log.png

注意

与任务实例相关的对象,如XComs、渲染的模板字段等,不会在历史记录中保留。只有任务实例属性(包括日志)会被保留。

外部触发器

请注意,DAG运行也可以通过CLI手动创建。只需运行命令 -

airflow dags trigger --exec-date logical_date run_id

由调度器外部创建的DAG运行会与触发时间戳关联,并在用户界面中与计划DAG运行一起显示。可以通过-e参数指定DAG内部传递的逻辑日期。默认值为UTC时区的当前日期。

此外,您也可以通过网页界面手动触发DAG运行(标签DAGs -> 列Links -> 按钮Trigger Dag

触发DAG时传递参数

当从CLI、REST API或UI触发DAG时,可以将DAG运行的配置作为JSON数据块传递。

参数化DAG的示例:

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    "example_parameterized_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
)

parameterized_task = BashOperator(
    task_id="parameterized_task",
    bash_command="echo value: {{ dag_run.conf['conf1'] }}",
    dag=dag,
)

注意: dag_run.conf 中的参数只能在操作器的模板字段中使用。

使用CLI

airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag

使用用户界面

在用户界面中,触发DAG的参数可以通过params定义更好地呈现,如Params文档所述。通过定义params,系统会渲染出合适的数值输入表单。

如果DAG没有定义params参数,通常会跳过表单显示,但通过配置选项show_trigger_form_if_no_params可以强制显示经典表单,以仅传递字典形式的配置选项。

../_images/example_passing_conf.png

请考虑将此类用法转换为params,因为这是更便捷的方式,同时也能对用户输入进行验证。

注意事项

  • 可以通过用户界面将任务实例标记为失败。这可用于停止正在运行的任务实例。

  • 可以通过用户界面将任务实例标记为成功。这主要用于修复误报情况,或者当修复已在Airflow外部实施时。

这篇内容对您有帮助吗?