数据感知调度

在2.4版本中新增。

快速入门

除了基于时间调度DAG外,您还可以根据任务更新数据集的时间来调度DAG运行。

from airflow.datasets import Dataset

with DAG(...):
    MyOperator(
        # this task updates example.csv
        outlets=[Dataset("s3://dataset-bucket/example.csv")],
        ...,
    )


with DAG(
    # this DAG should be run when example.csv is updated (by dag1)
    schedule=[Dataset("s3://dataset-bucket/example.csv")],
    ...,
):
    ...
../_images/dataset-scheduled-dags.png

什么是“数据集”?

Airflow数据集是数据的逻辑分组。上游生产者任务可以更新数据集,而数据集更新有助于调度下游消费者DAG。

Uniform Resource Identifier (URI) 定义数据集:

from airflow.datasets import Dataset

example_dataset = Dataset("s3://dataset-bucket/example.csv")

Airflow不会对URI所表示的数据内容或位置做任何假设,而是将URI视为字符串处理。这意味着Airflow会将任何正则表达式(如input_\d+.csv)或文件通配符模式(例如input_2022*.csv)视为尝试从一个声明创建多个数据集的操作,因此这些模式将不会生效。

您必须使用有效的URI创建数据集。Airflow核心和提供者定义了各种可用的URI方案,例如file(核心)、postgres(由Postgres提供者提供)和s3(由Amazon提供者提供)。第三方提供者和插件也可能提供自己的方案。这些预定义的方案具有各自需要遵循的语义。

什么是有效的URI?

从技术上讲,URI必须符合RFC 3986标准中的有效字符集,主要包括ASCII字母数字字符,以及%-_.~。若要标识无法用URI安全字符表示的资源,请使用百分号编码对资源名称进行编码。

URI也是区分大小写的,因此s3://example/datasets3://Example/Dataset被视为不同。请注意,URI的主机部分也区分大小写,这与RFC 3986不同。

不要使用airflow方案,该方案是为Airflow内部保留的。

Airflow 始终倾向于在方案中使用小写字母,而在 URI 的主机部分需要区分大小写以正确区分资源。

# invalid datasets:
reserved = Dataset("airflow://example_dataset")
not_ascii = Dataset("èxample_datašet")

如果您想定义不包含额外语义约束的数据集方案,请使用前缀为x-的方案。Airflow会跳过对这些URI方案的所有语义验证。

# valid dataset, treated as a plain string
my_ds = Dataset("x-my-thing://foobarbaz")

标识符不必是绝对的;它可以是一个无方案(scheme-less)的相对URI,甚至只是一个简单的路径或字符串:

# valid datasets:
schemeless = Dataset("//example/dataset")
csv_file = Dataset("example_dataset")

非绝对标识符被视为普通字符串,对Airflow不携带任何语义含义。

数据集上的额外信息

如果需要,您可以在数据集中包含一个额外的字典:

example_dataset = Dataset(
    "s3://dataset/example.csv",
    extra={"team": "trainees"},
)

这可用于为数据集提供自定义描述,例如谁拥有目标文件的所有权,或文件的用途。额外信息不会影响数据集的身份。这意味着即使额外字典不同,具有相同URI的数据集仍会触发DAG:

with DAG(
    dag_id="consumer",
    schedule=[Dataset("s3://dataset/example.csv", extra={"different": "extras"})],
):
    ...

with DAG(dag_id="producer", ...):
    MyOperator(
        # triggers "consumer" with the given extra!
        outlets=[Dataset("s3://dataset/example.csv", extra={"team": "trainees"})],
        ...,
    )

注意

安全提示: Dataset URI和额外字段未经加密,它们以明文形式存储在Airflow的元数据数据库中。请勿在任何数据集URI或额外键值中存储敏感信息,尤其是凭据!

如何在你的DAG中使用数据集

您可以使用数据集来指定DAG中的数据依赖关系。以下示例展示了在producer DAG中的producer任务成功完成后,Airflow会调度consumer DAG。Airflow仅在任务成功完成时将数据集标记为updated。如果任务失败或被跳过,则不会发生更新,Airflow也不会调度consumer DAG。

example_dataset = Dataset("s3://dataset/example.csv")

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_dataset], ...)

with DAG(dag_id="consumer", schedule=[example_dataset], ...):
    ...

您可以在数据集视图中查看数据集与DAG之间的关联关系列表

多数据集

由于schedule参数是一个列表,DAG可以依赖多个数据集。Airflow会在该DAG所消费的所有数据集自上次运行后至少更新过一次时,才会调度该DAG:

with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        example_dataset_1,
        example_dataset_2,
        example_dataset_3,
    ],
    ...,
):
    ...

如果在所有被消费的数据集更新之前,某个数据集被多次更新,下游DAG仍然只会运行一次,如下图所示:

graph dataset_event_timeline { graph [layout=neato] { node [margin=0 fontcolor=blue width=0.1 shape=point label=""] e1 [pos="1,2.5!"] e2 [pos="2,2.5!"] e3 [pos="2.5,2!"] e4 [pos="4,2.5!"] e5 [pos="5,2!"] e6 [pos="6,2.5!"] e7 [pos="7,1.5!"] r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape] e8 [pos="8,2!"] e9 [pos="9,1.5!"] e10 [pos="10,2!"] e11 [pos="11,1.5!"] e12 [pos="12,2!"] e13 [pos="13,2.5!"] r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape] } { node [shape=none label="" width=0] end_ds1 [pos="14,2.5!"] end_ds2 [pos="14,2!"] end_ds3 [pos="14,1.5!"] } { node [shape=none margin=0.25 fontname="roboto,sans-serif"] example_dataset_1 [ pos="-0.5,2.5!"] example_dataset_2 [ pos="-0.5,2!"] example_dataset_3 [ pos="-0.5,1.5!"] dag_runs [label="创建的DagRuns" pos="-0.5,1!"] } edge [color=lightgrey] example_dataset_1 -- e1 -- e2 -- e4 -- e6 -- e13 -- end_ds1 example_dataset_2 -- e3 -- e5 -- e8 -- e10 -- e12 -- end_ds2 example_dataset_3 -- e7 -- e9 -- e11 -- end_ds3 }

为数据集发射事件附加额外信息

在2.10.0版本中新增。

一个带有数据集出口的任务可以选择在发出数据集事件之前附加额外信息。这与数据集上的额外信息不同。数据集上的额外信息静态描述了数据集URI指向的实体;而数据集事件上的额外信息应该用于注释触发数据变更,例如更新改变了数据库中的多少行,或它所覆盖的日期范围。

将额外信息附加到数据集事件的最简单方法是通过从任务中yield一个Metadata对象来实现:

from airflow.datasets import Dataset
from airflow.datasets.metadata import Metadata

example_s3_dataset = Dataset("s3://dataset/example.csv")


@task(outlets=[example_s3_dataset])
def write_to_s3():
    df = ...  # Get a Pandas DataFrame to write.
    # Write df to dataset...
    yield Metadata(example_s3_dataset, {"row_count": len(df)})

Airflow 自动收集所有生成的元数据,并为对应的元数据对象填充带有额外信息的数据集事件。

这也可以通过经典操作符实现。最佳方式是继承操作符并重写execute方法。另外,也可以在任务的pre_executepost_execute钩子中添加额外信息。但请注意,如果选择使用钩子,当任务重试时它们不会重新运行,在某些情况下可能导致额外信息与实际数据不匹配。

另一种实现相同效果的方法是直接在任务的执行上下文中访问outlet_events

@task(outlets=[example_s3_dataset])
def write_to_s3(*, outlet_events):
    outlet_events[example_s3_dataset].extra = {"row_count": len(df)}

这里几乎没有魔法——Airflow只是将生成的值写入完全相同的访问器。这在经典操作符中也适用,包括executepre_executepost_execute

从先前发出的数据集事件中获取信息

在2.10.0版本中新增。

在任务outlets中定义的数据集事件(如前一节所述),可以被在其inlets中声明相同数据集的任务读取。数据集事件条目包含extra(详见前一节)、表示事件从任务发出时间的timestamp,以及将事件链接回其来源的source_task_instance

入口数据集事件可以通过执行上下文中的inlet_events访问器读取。继续上一节中的write_to_s3任务:

@task(inlets=[example_s3_dataset])
def post_process_s3_file(*, inlet_events):
    events = inlet_events[example_s3_dataset]
    last_row_count = events[-1].extra["row_count"]

inlet_events映射中的每个值都是一个类似序列的对象,它按timestamp从早到晚排序给定数据集的过去事件。它支持Python列表的大部分接口,因此你可以使用[-1]访问最后一个事件,[-2:]访问最后两个事件,以此类推。该访问器是惰性的,只有当你访问其中的项目时才会查询数据库。

从触发数据集事件中获取信息

触发的DAG可以通过triggering_dataset_events模板或参数从触发它的数据集中获取信息。更多信息请参阅Templates reference

示例:

example_snowflake_dataset = Dataset("snowflake://my_db/my_schema/my_table")

with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
    SQLExecuteQueryOperator(
        task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_dataset], ...
    )

with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_dataset], ...):
    SQLExecuteQueryOperator(
        task_id="query",
        conn_id="snowflake_default",
        sql="""
          SELECT *
          FROM my_db.my_schema.my_table
          WHERE "updated_at" >= '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_start }}'
          AND "updated_at" < '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_end }}';
        """,
    )

    @task
    def print_triggering_dataset_events(triggering_dataset_events=None):
        for dataset, dataset_list in triggering_dataset_events.items():
            print(dataset, dataset_list)
            print(dataset_list[0].source_dag_run.dag_id)

    print_triggering_dataset_events()

请注意,此示例使用(.values() | first | first)来获取传递给DAG的第一个数据集中的第一个DatasetEvent。如果您有多个数据集(可能包含多个DatasetEvent),实现可能会相当复杂。

通过REST API操作队列中的数据集事件

在2.9版本中新增。

在这个示例中,当任务更新了数据集"dataset-1"和"dataset-2"时,DAG waiting_for_dataset_1_and_2 将被触发。一旦"dataset-1"被更新,Airflow会创建一条记录。这确保了当"dataset-2"被更新时,Airflow知道要触发该DAG。我们称此类记录为排队数据集事件。

with DAG(
    dag_id="waiting_for_dataset_1_and_2",
    schedule=[Dataset("dataset-1"), Dataset("dataset-2")],
    ...,
):
    ...

queuedEvent API端点被引入用于操作此类记录。

  • 获取DAG的排队数据集事件: /datasets/queuedEvent/{uri}

  • 获取DAG的排队数据集事件: /dags/{dag_id}/datasets/queuedEvent

  • 删除DAG的排队数据集事件: /datasets/queuedEvent/{uri}

  • 删除DAG的排队数据集事件: /dags/{dag_id}/datasets/queuedEvent

  • 获取数据集排队的Dataset事件: /dags/{dag_id}/datasets/queuedEvent/{uri}

  • 删除数据集排队的Dataset事件: DELETE /dags/{dag_id}/datasets/queuedEvent/{uri}

关于如何使用REST API以及这些端点所需的参数,请参考Airflow API

使用条件表达式进行高级数据集调度

Apache Airflow包含利用数据集条件表达式的高级调度功能。该特性允许您基于数据集更新定义DAG执行的复杂依赖关系,通过逻辑运算符实现对工作流触发机制的更精细控制。

数据集逻辑运算符

Airflow支持两种用于组合数据集条件的逻辑运算符:

  • AND (``&``): 指定DAG只有在所有指定的数据集都更新后才能被触发。

  • OR (``|``): 指定当任意指定的数据集更新时,应触发DAG。

这些操作符使您能够配置Airflow工作流以使用更复杂的数据集更新条件,使其更加动态和灵活。

使用示例

基于多个数据集更新的调度

要安排一个DAG仅在两个特定数据集都更新后才运行,请使用AND运算符(&):

dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")

with DAG(
    # Consume dataset 1 and 2 with dataset expressions
    schedule=(dag1_dataset & dag2_dataset),
    ...,
):
    ...

基于任何数据集更新的调度

要在两个数据集中的任意一个更新时触发DAG执行,请使用OR运算符(|):

with DAG(
    # Consume dataset 1 or 2 with dataset expressions
    schedule=(dag1_dataset | dag2_dataset),
    ...,
):
    ...

复杂条件逻辑

对于需要更复杂条件的场景,例如当一个数据集更新时触发DAG,或者当另外两个数据集都更新时触发DAG,可以结合使用OR和AND运算符:

dag3_dataset = Dataset("s3://dag3/output_3.txt")

with DAG(
    # Consume dataset 1 or both 2 and 3 with dataset expressions
    schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
    ...,
):
    ...

通过DatasetAlias动态数据事件发射与数据集创建

数据集别名可用于触发与别名关联的数据集事件。下游可以依赖于已解析的数据集。此功能允许您基于数据集更新定义DAG执行的复杂依赖关系。

如何使用DatasetAlias

DatasetAlias 只有一个参数 name 用于唯一标识数据集。任务必须首先将该别名声明为输出,并使用 outlet_events 或 yield Metadata 来向其添加事件。

以下示例针对S3 URI f"s3://bucket/my-task" 创建一个数据集事件,并包含可选的额外信息 extra。如果数据集不存在,Airflow将动态创建它并记录一条警告消息。

在任务执行期间通过outlet_events触发数据集事件

from airflow.datasets import DatasetAlias


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})

在任务执行期间通过生成元数据来发出数据集事件

from airflow.datasets.metadata import Metadata


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata():
    s3_dataset = Dataset("s3://bucket/my-task")
    yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

对于新增的数据集,即使被多次添加到别名中,或添加到多个别名,也只会触发一个数据集事件。然而,如果传入不同的extra值,则可能触发多个数据集事件。在以下示例中,将会触发两个数据集事件。

from airflow.datasets import DatasetAlias


@task(
    outlets=[
        DatasetAlias("my-task-outputs-1"),
        DatasetAlias("my-task-outputs-2"),
        DatasetAlias("my-task-outputs-3"),
    ]
)
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs-1")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
    # This line won't emit an additional dataset event as the dataset and extra are the same as the previous line.
    outlet_events[DatasetAlias("my-task-outputs-2")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
    # This line will emit an additional dataset event as the extra is different.
    outlet_events[DatasetAlias("my-task-outputs-3")].add(Dataset("s3://bucket/my-task"), extra={"k2": "v2"})

基于数据集别名的调度

由于添加到别名中的数据集事件只是简单的数据集事件,依赖实际数据集的下游DAG可以正常读取其数据集事件,而无需考虑关联的别名。下游DAG也可以依赖于数据集别名。编写语法是通过名称引用DatasetAlias,并选取关联的数据集事件进行调度。请注意,当且仅当别名解析为Dataset("s3://bucket/my-task")时,具有outlets=DatasetAlias("xxx")的任务才能触发DAG。当具有出口DatasetAlias("out")的任务在运行时与至少一个数据集关联时,无论数据集的身份如何,DAG都会运行。如果特定任务运行中没有数据集与别名关联,则不会触发下游DAG。这也意味着我们可以进行条件性数据集触发。

数据集别名在DAG解析期间会被解析为具体的数据集。因此,如果"min_file_process_interval"配置项被设置为较高的值,可能会导致数据集别名无法被解析。要解决这个问题,您可以触发DAG解析。

with DAG(dag_id="dataset-producer"):

    @task(outlets=[Dataset("example-alias")])
    def produce_dataset_events():
        pass


with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task"))


with DAG(dag_id="dataset-consumer", schedule=Dataset("s3://bucket/my-task")):
    ...

with DAG(dag_id="dataset-alias-consumer", schedule=DatasetAlias("example-alias")):
    ...

在提供的示例中,一旦DAG dataset-alias-producer执行完毕,数据集别名DatasetAlias("example-alias")将被解析为Dataset("s3://bucket/my-task")。然而,DAG dataset-alias-consumer需要等待下一次DAG重新解析才能更新其调度计划。为了解决这个问题,当数据集别名DatasetAlias("example-alias")被解析为这些DAG之前未依赖的数据集时,Airflow将重新解析依赖该别名的DAG。因此,在DAG dataset-alias-producer执行后,"dataset-consumer"和"dataset-alias-consumer"这两个DAG都将被触发。

通过已解析的数据集别名从先前发出的数据集事件中获取信息

从先前发出的数据集事件中获取信息所述,可以通过执行上下文中的inlet_events访问器读取入口数据集事件,您还可以使用数据集别名来访问由它们触发的数据集事件。

with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(
            Dataset("s3://bucket/my-task"), extra={"row_count": 1}
        )


with DAG(dag_id="dataset-alias-consumer", schedule=None):

    @task(inlets=[DatasetAlias("example-alias")])
    def consume_dataset_alias_events(*, inlet_events):
        events = inlet_events[DatasetAlias("example-alias")]
        last_row_count = events[-1].extra["row_count"]

结合数据集与基于时间的调度

DatasetTimetable 集成

您可以使用DatasetOrTimeSchedule基于数据集事件和时间计划来调度DAG。这使您能够创建既需要由数据更新触发又需要根据固定时间表定期运行的工作流。

有关DatasetOrTimeSchedule的更多详细信息,请参阅DatasetOrTimeSchedule中的相应章节。

这篇内容对您有帮助吗?