设置与拆卸¶
在数据工作流中,通常会创建一个资源(如计算资源),用它完成一些工作,然后将其拆除。Airflow提供了设置和拆卸任务来满足这一需求。
设置和拆卸任务的关键特性:
如果清除一个任务,其设置和拆卸操作也将被清除。
默认情况下,在评估DAG运行状态时会忽略拆卸任务。
即使工作任务失败,如果其设置成功,拆卸任务仍会运行。但如果设置被跳过,它也会跳过。
在设置任务组依赖关系时,Teardown任务会被忽略。
如果DAG运行被手动设置为“失败”或“成功”,也会执行拆卸操作,以确保资源会被清理。
setup和teardown的工作原理¶
基本用法¶
假设您有一个dag,它会创建一个集群,运行查询,然后删除该集群。如果不使用setup和teardown任务,您可能会设置以下关系:
create_cluster >> run_query >> delete_cluster
要将create_cluster和delete_cluster作为设置和拆卸任务启用,我们将其标记为as_setup和as_teardown方法,并在它们之间添加上游/下游关系:
create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster
为了方便,我们可以通过将create_cluster传递给as_teardown方法在一行中完成:
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)
这是该DAG的流程图:
观察结果:
如果您清除
run_query以再次运行它,那么create_cluster和delete_cluster都将被清除。如果
run_query失败,delete_cluster仍会继续执行。DAG运行的成功将仅取决于
run_query的成功。
此外,如果我们需要包装多个任务,可以将teardown作为上下文管理器使用:
with delete_cluster().as_teardown(setups=create_cluster()):
[RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
WorkOne() >> [do_this_stuff(), do_other_stuff()]
这将设置 create_cluster 在上下文中的任务之前运行,而 delete_cluster 在它们之后运行。
如图所示:
请注意,如果您尝试将已实例化的任务添加到设置上下文中,需要明确执行此操作:
with my_teardown_task as scope:
scope.add_task(work_task) # work_task was already instantiated elsewhere
设置“作用域”¶
设置任务与其拆卸任务之间的任务处于设置/拆卸对的"作用域"内。
让我们来看一个例子:
s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4
以及图表:
在上面的例子中,w1和w2位于s1和t1之间,因此被假定为需要s1。所以如果w1或w2被清除,那么s1和t1也会被清除。但如果w3或w4被清除,s1和t1都不会被清除。
你可以将多个设置任务连接到一个单独的拆卸任务。只要至少有一个设置任务成功完成,拆卸任务就会运行。
您可以设置一个不包含拆卸操作的配置:
create_cluster >> run_query >> other_task
在这种情况下,假设 create_cluster 下游的所有任务都依赖它。因此,如果你清除了 other_task,它也会清除 create_cluster。假设我们在 run_query 之后为 create_cluster 添加一个拆卸操作:
create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)
现在,Airflow可以推断出other_task不需要create_cluster,因此如果我们清除other_task,create_cluster也不会被清除。
在那个示例中,我们(假设在文档场景中)实际上想要删除集群。但假设我们并不想删除,而只是想表达"other_task不需要create_cluster",那么我们可以使用EmptyOperator来限制设置的范围:
create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)
隐式ALL_SUCCESS约束¶
在设置范围内的任何任务对其设置都有一个隐式的“all_success”约束。 这是为了确保如果一个具有间接设置的任务被清除,它将等待这些设置完成。如果设置失败或被跳过,依赖它们的工作任务将被标记为失败或跳过。我们还要求任何直接位于设置下游的非拆卸任务必须具有ALL_SUCCESS触发规则。
控制DAG运行状态¶
setup/teardown任务的另一个特性是,您可以选择teardown任务是否应该影响dag运行状态。也许您不关心teardown任务执行的"清理"工作是否失败,只有当"工作"任务失败时您才认为dag运行失败。默认情况下,teardown任务不会影响dag运行状态。
继续上面的例子,如果你想使运行的成功依赖于delete_cluster,那么在将delete_cluster设置为teardown时,设置on_failure_fail_dagrun=True。例如:
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)
并行运行设置和拆卸¶
您可以并行运行设置任务:
(
[create_cluster, create_bucket]
>> run_query
>> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)
图表:
将它们放在一个组中视觉效果会更好:
with TaskGroup("setup") as tg_s:
create_cluster = create_cluster()
create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t
以及图表:
拆卸操作的触发规则行为¶
拆卸操作使用一个名为ALL_DONE_SETUP_SUCCESS的(不可配置的)触发规则。根据该规则,只要所有上游任务完成且至少有一个直接连接的设置任务成功,拆卸任务就会运行。如果某个拆卸任务的所有设置任务都被跳过或失败,这些状态将传递给拆卸任务。
手动更改DAG状态时的副作用¶
由于拆解任务通常用于清理资源,即使DAG被手动终止,它们也需要运行。为了实现提前终止,用户可以手动将DAG运行标记为"成功"或"失败",这会在任务完成前终止所有任务。如果DAG包含拆解任务,这些任务仍会被执行。因此,作为允许调度拆解任务的副作用,即使用户请求立即终止,DAG也不会立即进入最终状态。