时间表

对于具有基于时间调度(而非事件驱动)的DAG,其内部"时间表"会驱动调度过程。该时间表还决定了为DAG创建的每次运行的数据间隔和逻辑日期。

使用cron表达式或timedelta对象调度的DAG会在内部转换为始终使用时间表。

如果cron表达式或timedelta已经满足您的使用场景,您无需担心编写自定义时间表,因为Airflow已内置了处理这些情况的默认时间表。但对于更复杂的调度需求,您可以创建自己的时间表类并将其传递给DAG的schedule参数。

自定义时间表实现有用的几个例子:

  • 每天在不同时间运行的任务。例如,天文学家可能会发现,在黎明时分运行一个任务来处理前一天夜间收集的数据很有用。

  • 不遵循公历的调度计划。例如,为农历中的每个月创建一个运行任务。这在概念上与日出案例类似,但适用于不同的时间尺度。

  • 滚动窗口或重叠数据区间。例如,您可能希望每天运行一次,但每次运行覆盖前七天的周期。虽然可以通过cron表达式实现,但自定义数据区间能提供更自然的表达方式。

  • 数据间隔之间存在"空洞"而非连续间隔,因为无论是cron表达式还是timedelta调度都表示连续间隔。详见数据间隔

Airflow允许您在插件中编写自定义时间表,并由DAG使用。您可以在使用时间表自定义DAG调度操作指南中找到演示自定义时间表的示例。

注意

作为一般规则,应尽可能在代码的后期阶段访问变量、连接或任何其他需要访问数据库的内容。有关更多应遵循的最佳实践,请参阅时间表

内置时间表

Airflow内置了多种常见的时间表,覆盖大多数典型使用场景。更多时间表可能通过插件提供。

CronTriggerTimetable

一个接受cron表达式的时间表,并根据它触发DAG运行。

from airflow.timetables.trigger import CronTriggerTimetable


@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)  # At 01:00 on Wednesday
def example_dag():
    pass

你也可以为时间表提供一个静态的数据间隔。可选的interval参数必须是一个datetime.timedeltadateutil.relativedelta.relativedelta。当使用这些参数时,触发的DAG运行的数据间隔将跨越指定的持续时间,并以触发时间结束

from datetime import timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week (9:00 Monday to 18:00 Friday).
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

DeltaDataIntervalTimetable

一种基于时间差安排数据间隔的时间表。您可以通过向DAG的schedule参数提供datetime.timedeltadateutil.relativedelta.relativedelta来选择它。

该时间表侧重于数据间隔值,不一定将执行日期与任意边界(如一天的开始或整点)对齐。

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

CronDataIntervalTimetable

一个接受cron表达式的时间表,根据每个cron触发点之间的间隔创建数据间隔,并在每个数据间隔结束时触发DAG运行。

通过向DAG的schedule参数提供一个有效的cron表达式字符串来选择此时间表,如DAGs文档中所述。

@dag(schedule="0 1 * * 3")  # At 01:00 on Wednesday.
def example_dag():
    pass

事件时间表

传递一个datetime列表,表示DAG将在这些时间之后运行。这对于基于体育赛事、计划中的传播活动以及其他任意且不规则但可预测的时间安排非常有用。

事件列表必须是有限的且大小合理,因为每次解析DAG时都需要加载它。可选地,使用restrict_to_events标志来强制手动运行DAG,使用最近或最早事件的时间作为数据间隔。否则,手动运行将以data_interval_startdata_interval_end等于手动运行开始的时间开始。您还可以使用description参数为事件集合命名,该名称将在Airflow UI中显示。

from airflow.timetables.events import EventsTimetable


@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2022, 4, 5, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 17, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 22, 20, 50, tz="America/Chicago"),
        ],
        description="My Team's Baseball Games",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

基于数据集事件与时间调度的调度机制

将条件数据集表达式与基于时间的调度相结合,可以增强调度的灵活性。

DatasetOrTimeSchedule 是一种专门的时间表,允许基于时间计划和数据集事件来调度DAG。它既支持按传统时间表创建计划运行,也支持独立运行的数据集触发运行。

该特性在需要数据集更新时以及定期运行DAG的场景中特别有用。它能确保工作流对数据变化保持响应,并持续执行定期检查或更新。

这是一个使用 DatasetOrTimeSchedule 的 DAG 示例:

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

时间表对比

两种cron时间表的差异

Airflow有两个时间表CronTriggerTimetableCronDataIntervalTimetable,它们接受cron表达式。

然而,两者之间存在以下差异: - CronTriggerTimetable 不处理数据间隔,而 CronDataIntervalTimetable 会处理。 - 在CronTriggerTimetableCronDataIntervalTimetable中,run_id的时间戳和logical_date的定义方式不同,这取决于它们如何处理数据间隔,具体如触发DAG运行的时间所述。

是否处理数据间隔

CronTriggerTimetable 不包含 数据间隔。这意味着 data_interval_startdata_interval_end (以及遗留的 execution_date) 的值相同;即DAG运行被触发的时间。

然而,CronDataIntervalTimetable 确实包含数据间隔。这意味着 data_interval_startdata_interval_end(以及遗留的execution_date)的值是不同的。data_interval_start是DAG运行被触发的时间,而data_interval_end是间隔的结束时间。

追赶 行为

无论您使用的是CronTriggerTimetable还是CronDataIntervalTimetable,当catchupTrue时都没有区别。

在某些场景下,您可能希望将catchup设为False以避免运行不必要的DAG: - 当您创建一个起始日期在过去的新DAG时,若不想执行过去时段的DAG。如果catchup设为True,Airflow会执行该时间段内所有本应运行的DAG。 - 当您暂停现有DAG后,在较晚日期重新启动时,若不想... 如果catchup设为True

在这些场景中,logical_daterun_id 中的取值取决于 CronTriggerTimetableCronDataIntervalTimetable 如何处理数据间隔。

有关使用catchup时如何触发DAG运行的更多信息,请参阅Catchup

DAG运行被触发的时间

CronTriggerTimetableCronDataIntervalTimetable 会在相同时间触发DAG运行。但是,每个的run_id时间戳是不同的。

例如,假设有一个cron表达式@daily0 0 * * *,它被设定为每天午夜12点运行。如果您在1月31日下午3点启用这两个时间表的DAG: - CronTriggerTimetable会在2月1日午夜12点触发新的DAG运行。run_id的时间戳是2月1日午夜。 - CronDataIntervalTimetable会立即触发新的DAG运行,因为从1月31日午夜12点开始的每日时间间隔的DAG运行尚未发生。run_id的时间戳是1月31日午夜,因为这是数据间隔的开始时间。

这是另一个展示跳过DAG运行情况下差异的示例。

假设有两个正在运行的DAG,它们使用不同的时间表,并带有cron表达式@daily0 0 * * *。如果您在1月31日下午3点暂停这些DAG,并在2月2日下午3点重新启用它们, - CronTriggerTimetable会跳过本应在2月1日和2日触发的DAG运行。下一次DAG运行将在2月3日凌晨12点触发。 - CronDataIntervalTimetable仅跳过本应在2月1日触发的DAG运行。在您重新启用DAG后,会立即触发2月2日的DAG运行。

在这些示例中,您可以看到CronTriggerTimetable触发DAG运行的方式比CronDataIntervalTimetable更直观,更符合人们对cron行为的预期。

cron与delta数据间隔时间表的区别:

DeltaDataIntervalTimetableCronDataIntervalTimetable之间选择取决于您的使用场景。 如果您在2月1日01:05启用一个DAG,下表总结了创建的DAG运行及其覆盖的数据间隔,这取决于3个参数:schedulestart_datecatchup

schedule

start_date

catchup

覆盖的时间间隔

备注

*/30 * * * *

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

与使用 timedelta 对象的行为相同。

*/30 * * * *

year-02-01

False

  • 00:30 - 01:00

*/30 * * * *

year-02-01 00:10

True

  • 00:30 - 01:00

间隔00:00 - 00:30不在开始日期之后,因此被跳过。

*/30 * * * *

year-02-01 00:10

False

  • 00:30 - 01:00

无论开始日期如何,数据间隔都与小时/天/等边界对齐。

datetime.timedelta(minutes=30)

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

与使用cron表达式相同的行为。

datetime.timedelta(minutes=30)

year-02-01

False

  • 00:35 - 01:05

间隔时间未与开始日期对齐,而是与当前时间对齐。

datetime.timedelta(minutes=30)

year-02-01 00:10

True

  • 00:10 - 00:40

间隔与开始日期对齐。下一个将在5分钟后触发,覆盖00:40 - 01:10的时间段。

datetime.timedelta(minutes=30)

year-02-01 00:10

False

  • 00:35 - 01:05

间隔时间与当前时间对齐。下一次运行将在30分钟后触发。

这篇内容对您有帮助吗?