设置与拆卸

在数据工作流中,通常会创建一个资源(如计算资源),用它完成一些工作,然后将其拆除。Airflow提供了设置和拆卸任务来满足这一需求。

设置和拆卸任务的关键特性:

  • 如果清除一个任务,其设置和拆卸操作也将被清除。

  • 默认情况下,在评估DAG运行状态时会忽略拆卸任务。

  • 即使工作任务失败,如果其设置成功,拆卸任务仍会运行。但如果设置被跳过,它也会跳过。

  • 在设置任务组依赖关系时,Teardown任务会被忽略。

  • 如果DAG运行被手动设置为“失败”或“成功”,也会执行拆卸操作,以确保资源会被清理。

setup和teardown的工作原理

基本用法

假设您有一个dag,它会创建一个集群,运行查询,然后删除该集群。如果不使用setup和teardown任务,您可能会设置以下关系:

create_cluster >> run_query >> delete_cluster

要将create_cluster和delete_cluster作为设置和拆卸任务启用,我们将其标记为as_setupas_teardown方法,并在它们之间添加上游/下游关系:

create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster

为了方便,我们可以通过将create_cluster传递给as_teardown方法在一行中完成:

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)

这是该DAG的流程图:

../_images/setup-teardown-simple.png

观察结果:

  • 如果您清除 run_query 以再次运行它,那么 create_clusterdelete_cluster 都将被清除。

  • 如果run_query失败,delete_cluster仍会继续执行。

  • DAG运行的成功将取决于run_query的成功。

此外,如果我们需要包装多个任务,可以将teardown作为上下文管理器使用:

with delete_cluster().as_teardown(setups=create_cluster()):
    [RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
    WorkOne() >> [do_this_stuff(), do_other_stuff()]

这将设置 create_cluster 在上下文中的任务之前运行,而 delete_cluster 在它们之后运行。

如图所示:

../_images/setup-teardown-complex.png

请注意,如果您尝试将已实例化的任务添加到设置上下文中,需要明确执行此操作:

with my_teardown_task as scope:
    scope.add_task(work_task)  # work_task was already instantiated elsewhere

设置“作用域”

设置任务与其拆卸任务之间的任务处于设置/拆卸对的"作用域"内。

让我们来看一个例子:

s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4

以及图表:

../_images/setup-teardown-scope.png

在上面的例子中,w1w2位于s1t1之间,因此被假定为需要s1。所以如果w1w2被清除,那么s1t1也会被清除。但如果w3w4被清除,s1t1都不会被清除。

你可以将多个设置任务连接到一个单独的拆卸任务。只要至少有一个设置任务成功完成,拆卸任务就会运行。

您可以设置一个不包含拆卸操作的配置:

create_cluster >> run_query >> other_task

在这种情况下,假设 create_cluster 下游的所有任务都依赖它。因此,如果你清除了 other_task,它也会清除 create_cluster。假设我们在 run_query 之后为 create_cluster 添加一个拆卸操作:

create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)

现在,Airflow可以推断出other_task不需要create_cluster,因此如果我们清除other_task,create_cluster也不会被清除。

在那个示例中,我们(假设在文档场景中)实际上想要删除集群。但假设我们并不想删除,而只是想表达"other_task不需要create_cluster",那么我们可以使用EmptyOperator来限制设置的范围:

create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)

隐式ALL_SUCCESS约束

在设置范围内的任何任务对其设置都有一个隐式的“all_success”约束。 这是为了确保如果一个具有间接设置的任务被清除,它将等待这些设置完成。如果设置失败或被跳过,依赖它们的工作任务将被标记为失败或跳过。我们还要求任何直接位于设置下游的非拆卸任务必须具有ALL_SUCCESS触发规则。

控制DAG运行状态

setup/teardown任务的另一个特性是,您可以选择teardown任务是否应该影响dag运行状态。也许您不关心teardown任务执行的"清理"工作是否失败,只有当"工作"任务失败时您才认为dag运行失败。默认情况下,teardown任务不会影响dag运行状态。

继续上面的例子,如果你想使运行的成功依赖于delete_cluster,那么在将delete_cluster设置为teardown时,设置on_failure_fail_dagrun=True。例如:

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)

使用任务组进行创作

当从任务组指向任务组,或从任务组指向任务时,我们会忽略拆卸操作。这允许拆卸操作并行运行,并且即使拆卸任务失败,也能让DAG继续执行。

考虑以下示例:

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2

图表:

../_images/setup-teardown-group.png

如果t1不是一个拆卸任务,那么这个dag实际上会是s1 >> w1 >> t1 >> w2。但由于我们将t1标记为拆卸任务,它在tg >> w2中被忽略。因此这个dag等同于以下内容:

s1 >> w1 >> [t1.as_teardown(setups=s1), w2]

现在让我们来看一个嵌套的例子:

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
dag_s1 = dag_s1()
dag_t1 = dag_t1()
dag_s1 >> [tg, w2] >> dag_t1.as_teardown(setups=dag_s1)

图表:

../_images/setup-teardown-nesting.png

在这个例子中,s1dag_s1 的下游任务,因此它必须等待 dag_s1 成功完成。但 t1dag_t1 可以并发运行,因为在表达式 tg >> dag_t1t1 被忽略了。如果你清除 w2,它将清除 dag_s1dag_t1,但不会清除任务组中的任何内容。

并行运行设置和拆卸

您可以并行运行设置任务:

(
    [create_cluster, create_bucket]
    >> run_query
    >> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)

图表:

../_images/setup-teardown-parallel.png

将它们放在一个组中视觉效果会更好:

with TaskGroup("setup") as tg_s:
    create_cluster = create_cluster()
    create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
    delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
    delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t

以及图表:

../_images/setup-teardown-setup-group.png

拆卸操作的触发规则行为

拆卸操作使用一个名为ALL_DONE_SETUP_SUCCESS的(不可配置的)触发规则。根据该规则,只要所有上游任务完成且至少有一个直接连接的设置任务成功,拆卸任务就会运行。如果某个拆卸任务的所有设置任务都被跳过或失败,这些状态将传递给拆卸任务。

手动更改DAG状态时的副作用

由于拆解任务通常用于清理资源,即使DAG被手动终止,它们也需要运行。为了实现提前终止,用户可以手动将DAG运行标记为"成功"或"失败",这会在任务完成前终止所有任务。如果DAG包含拆解任务,这些任务仍会被执行。因此,作为允许调度拆解任务的副作用,即使用户请求立即终止,DAG也不会立即进入最终状态。

这篇内容对您有帮助吗?