使用时间表自定义DAG调度

在我们的示例中,假设一家公司希望在每个工作日结束后运行一个作业来处理当天收集的数据。对此最直观的答案可能是schedule="0 0 * * 1-5"(周一至周五午夜),但这意味着周五收集的数据不会在周五结束后立即处理,而是要等到下周一,而该运行的间隔时间是从周五午夜到周一午夜。此外,上述调度字符串无法跳过节假日处理。我们真正需要的是:

  • 为每周一、周二、周三、周四和周五安排运行。运行的数据间隔将覆盖从每天的午夜到第二天的午夜(例如2021-01-01 00:00:00到2021-01-02 00:00:00)。

  • 每次运行将在数据间隔结束后立即创建。覆盖星期一的运行将在星期二午夜进行,以此类推。覆盖星期五的运行将在星期六午夜进行。星期日和星期一的午夜不会进行任何运行。

  • 不在定义的节假日安排运行。

为简单起见,本示例中我们仅处理UTC时间。

注意

自定义时间表返回的所有日期时间值必须是"感知型"的,即包含时区信息。此外,它们必须使用pendulum的日期时间和时区类型。

时间表注册

时间表必须是Timetable的子类, 并作为plugin的一部分进行注册。 以下是实现新时间表的框架:

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable


class AfterWorkdayTimetable(Timetable):
    pass


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]

接下来,我们将开始向AfterWorkdayTimetable中添加代码。实现完成后,我们应该能够在DAG文件中使用这个时间表:

import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable


with DAG(
    dag_id="example_after_workday_timetable_dag",
    start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    ...

定义调度逻辑

当Airflow的调度器遇到一个DAG时,它会调用以下两种方法之一来确定何时调度该DAG的下一次运行。

  • next_dagrun_info: 调度器使用此参数来了解时间表的常规调度计划,例如我们示例中的"每个工作日结束时运行一次"部分。

  • infer_manual_data_interval: 当手动触发DAG运行时(例如从网页界面),调度器会使用此方法来了解如何反向推断计划外运行的数据间隔。

我们将从infer_manual_data_interval开始,因为它是两者中较为简单的一个:

airflow/example_dags/plugins/workday.py[source]

def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
    start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
    # Skip backwards over weekends and holidays to find last run
    start = self.get_next_workday(start, incr=-1)
    return DataInterval(start=start, end=(start + timedelta(days=1)))

该方法接受一个参数run_after,这是一个pendulum.DateTime对象,用于表示DAG被外部触发的时间。由于我们的时间表为每个完整工作日创建一个数据间隔,这里推断的数据间隔通常应从run_after前一天的午夜开始,但如果run_after落在周日或周一(即前一天是周六或周日),则应进一步回溯到前一个周五。一旦我们确定了间隔的开始时间,结束时间就是开始时间后的一整天。然后我们创建一个DataInterval对象来描述这个间隔。

接下来是 next_dagrun_info 的实现:

airflow/example_dags/plugins/workday.py[source]

def next_dagrun_info(
    self,
    *,
    last_automated_data_interval: DataInterval | None,
    restriction: TimeRestriction,
) -> DagRunInfo | None:
    if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
        last_start = last_automated_data_interval.start
        next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
    # Otherwise this is the first ever run on the regular schedule...
    elif (earliest := restriction.earliest) is None:
        return None  # No start_date. Don't schedule.
    elif not restriction.catchup:
        # If the DAG has catchup=False, today is the earliest to consider.
        next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
    elif earliest.time() != Time.min:
        # If earliest does not fall on midnight, skip to the next day.
        next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
    else:
        next_start = earliest
    # Skip weekends and holidays
    next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

    if restriction.latest is not None and next_start > restriction.latest:
        return None  # Over the DAG's scheduled end; don't schedule.
    return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))

该方法接受两个参数。last_automated_data_interval是一个 DataInterval实例,表示该DAG上一次非手动触发运行的数据间隔, 如果这是DAG首次被调度则为Nonerestriction封装了 DAG及其任务如何指定调度计划,并包含三个属性:

  • earliest: DAG可被调度的最早时间。这是一个根据DAG及其任务中所有start_date参数计算得出的pendulum.DateTime值,如果未找到任何start_date参数则为None

  • latest: 类似于earliest,这是DAG可能被调度的最晚时间,根据end_date参数计算得出。

  • catchup: 一个布尔值,反映DAG的catchup参数。

注意

无论是earliest还是latest都适用于DAG运行的逻辑日期 (数据区间的开始时间),而非运行将被调度的时间 (通常在数据区间结束之后)。

如果之前有预定运行,我们现在应该通过循环遍历后续日期来找到非周六、周日或美国节假日的下一个工作日进行调度。然而,如果没有之前的预定运行,我们则选择restriction.earliest之后的下一个非节假日工作日的午夜时间。 restriction.catchup也需要被考虑——如果它是False,即使start_date值在过去,我们也不能在当前时间之前调度。最后,如果我们计算的数据间隔晚于restriction.latest,我们必须遵守它并通过返回None来不调度运行。

如果我们决定调度一个运行,需要用DagRunInfo来描述它。该类型有两个参数和属性:

  • data_interval: 一个DataInterval实例,描述下一次运行的数据间隔。

  • run_after: 一个pendulum.DateTime实例,用于告知调度器何时可以调度DAG运行。

可以像这样创建一个 DagRunInfo

info = DagRunInfo(
    data_interval=DataInterval(start=start, end=end),
    run_after=run_after,
)

由于我们通常希望在数据间隔结束后立即调度运行,上述的endrun_after通常是相同的。因此DagRunInfo为此提供了一个快捷方式:

info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after  # Always True.

作为参考,以下是我们的插件和DAG文件的完整内容:

airflow/example_dags/plugins/workday.py[source]

from pendulum import UTC, Date, DateTime, Time

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable

if TYPE_CHECKING:
    from airflow.timetables.base import TimeRestriction

log = logging.getLogger(__name__)
try:
    from pandas.tseries.holiday import USFederalHolidayCalendar

    holiday_calendar = USFederalHolidayCalendar()
except ImportError:
    log.warning("Could not import pandas. Holidays will not be considered.")
    holiday_calendar = None  # type: ignore[assignment]


class AfterWorkdayTimetable(Timetable):
    def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
        next_start = d
        while True:
            if next_start.weekday() not in (5, 6):  # not on weekend
                if holiday_calendar is None:
                    holidays = set()
                else:
                    holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
                if next_start not in holidays:
                    break
            next_start = next_start.add(days=incr)
        return next_start
    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
        # Skip backwards over weekends and holidays to find last run
        start = self.get_next_workday(start, incr=-1)
        return DataInterval(start=start, end=(start + timedelta(days=1)))
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            last_start = last_automated_data_interval.start
            next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
        # Otherwise this is the first ever run on the regular schedule...
        elif (earliest := restriction.earliest) is None:
            return None  # No start_date. Don't schedule.
        elif not restriction.catchup:
            # If the DAG has catchup=False, today is the earliest to consider.
            next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
        elif earliest.time() != Time.min:
            # If earliest does not fall on midnight, skip to the next day.
            next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
        else:
            next_start = earliest
        # Skip weekends and holidays
        next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

        if restriction.latest is not None and next_start > restriction.latest:
            return None  # Over the DAG's scheduled end; don't schedule.
        return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]


import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.operators.empty import EmptyOperator


with DAG(
    dag_id="example_workday_timetable",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    EmptyOperator(task_id="run_this")

参数化时间表

有时我们需要向时间表传递一些运行时参数。继续以我们的AfterWorkdayTimetable为例,可能我们的DAG运行在不同的时区,我们希望将一些DAG安排在第二天早上8点运行,而不是午夜。与其为每个目的创建单独的时间表,我们更希望这样做:

class SometimeAfterWorkdayTimetable(Timetable):
    def __init__(self, schedule_at: Time) -> None:
        self._schedule_at = schedule_at

    def next_dagrun_info(self, last_automated_dagrun, restriction):
        ...
        end = start + timedelta(days=1)
        return DagRunInfo(
            data_interval=DataInterval(start=start, end=end),
            run_after=DateTime.combine(end.date(), self._schedule_at).replace(tzinfo=UTC),
        )

然而,由于时间表是DAG的一部分,我们需要告诉Airflow如何通过我们在__init__中提供的上下文来序列化它。这需要在我们时间表类上实现两个额外的方法:

class SometimeAfterWorkdayTimetable(Timetable):
    ...

    def serialize(self) -> dict[str, Any]:
        return {"schedule_at": self._schedule_at.isoformat()}

    @classmethod
    def deserialize(cls, value: dict[str, Any]) -> Timetable:
        return cls(Time.fromisoformat(value["schedule_at"]))

当DAG被序列化时,会调用serialize来获取一个可JSON序列化的值。该值会在调度器访问序列化后的DAG时传递给deserialize,以重新构建时间表。

UI中的时间表展示

默认情况下,自定义时间表在用户界面中会显示其类名(例如"DAGs"表格中的Schedule列)。可以通过重写summary属性来自定义显示内容。这对于参数化时间表特别有用,可以包含在__init__中提供的参数。以我们的SometimeAfterWorkdayTimetable类为例,可以这样实现:

@property
def summary(self) -> str:
    return f"after each workday, at {self._schedule_at}"

因此,对于这样声明的DAG:

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

计划列会显示after each workday, at 08:00:00

另请参阅

Module airflow.timetables.base

公共接口有详尽的文档说明,解释子类应该实现哪些内容。

UI中的时间表描述显示

您还可以通过重写description属性来为您的Timetable Implementation提供描述。 这在UI中为您的实现提供全面描述时特别有用。 以我们的SometimeAfterWorkdayTimetable类为例,我们可以这样描述:

description = "Schedule: after each workday"

如果你想派生描述,也可以将其封装在__init__内部。

def __init__(self) -> None:
    self.description = "Schedule: after each workday, at f{self._schedule_at}"

当您希望提供与summary属性不同的全面描述时,这特别有用。

因此,对于这样声明的DAG:

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

i 图标会显示 Schedule: after each workday, at 08:00:00

另请参阅

Module airflow.timetables.interval

检查CronDataIntervalTimetable描述实现,该实现在用户界面中提供全面的cron描述。

修改生成的run_id

在2.4版本中新增。

自Airflow 2.4起,时间表(Timetables)还负责为DagRuns生成run_id

例如,要让运行ID显示运行开始的"人性化"日期(即数据间隔的结束时间,而非当前使用的开始时间),您可以向自定义时间表添加如下方法:

def generate_run_id(
    self,
    *,
    run_type: DagRunType,
    logical_date: DateTime,
    data_interval: DataInterval | None,
    **extra,
) -> str:
    if run_type == DagRunType.SCHEDULED and data_interval:
        return data_interval.end.format("YYYY-MM-DD dddd")
    return super().generate_run_id(
        run_type=run_type, logical_date=logical_date, data_interval=data_interval, **extra
    )

请记住,RunID限制为250个字符,并且在DAG中必须是唯一的。

这篇内容对您有帮助吗?