DAGs¶
一个DAG(有向无环图)是Airflow的核心概念,它将任务集合在一起,并通过依赖关系和关联关系来组织它们应该如何运行。
这是一个基础的DAG示例:
它定义了四个任务 - A、B、C和D - 并规定了它们的运行顺序以及任务之间的依赖关系。同时还会指定DAG的运行频率 - 可能是"从明天开始每5分钟运行一次",或者"自2020年1月1日起每天运行"。
DAG本身并不关心任务内部发生了什么;它只关注如何执行这些任务——它们的运行顺序、重试次数、是否超时等等。
声明一个DAG¶
有三种方式可以声明一个DAG - 你可以使用with语句(上下文管理器),它会隐式地将其中所有内容添加到DAG中:
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
EmptyOperator(task_id="task")
或者,您可以使用标准构造函数,将DAG传递到您使用的任何操作符中:
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
或者,您可以使用@dag装饰器来将函数转换为DAG生成器:
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
没有任务可运行的DAG毫无意义,这些任务通常以操作器、传感器或TaskFlow的形式存在。
任务依赖关系¶
一个任务/操作符通常不会孤立存在;它依赖于其他任务(即其上游任务),同时其他任务也依赖于它(即其下游任务)。声明这些任务之间的依赖关系就构成了DAG结构(即有向无环图的边)。
声明单个任务依赖关系主要有两种方式。推荐的方法是使用>>和<<操作符:
first_task >> [second_task, third_task]
third_task << fourth_task
或者,你也可以使用更明确的 set_upstream 和 set_downstream 方法:
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)
还有一些快捷方式可以声明更复杂的依赖关系。如果你想让一组任务依赖于另一组任务,上述方法都无法使用,这时你需要使用cross_downstream:
from airflow.models.baseoperator import cross_downstream
# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])
如果你想将依赖关系串联起来,可以使用 chain:
from airflow.models.baseoperator import chain
# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)
# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])
Chain 也可以为相同大小的列表执行成对依赖关系(这与cross_downstream创建的交叉依赖不同!):
from airflow.models.baseoperator import chain
# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
加载DAGs¶
Airflow从Python源文件加载DAG,它会从配置的DAG_FOLDER目录中查找这些文件。对于每个文件,Airflow会执行该文件,然后从中加载所有的DAG对象。
这意味着您可以在每个Python文件中定义多个DAG,甚至可以通过导入将非常复杂的DAG分散到多个Python文件中。
请注意,当Airflow从Python文件加载DAG时,它只会提取顶层的DAG实例对象。例如,看这个DAG文件:
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
当访问文件时,虽然两个DAG构造函数都会被调用,但只有dag_1位于顶层(在globals()中),因此只有它被添加到Airflow中。dag_2不会被加载。
注意
在DAG_FOLDER中搜索DAG时,Airflow作为优化措施,仅考虑包含字符串airflow和dag(不区分大小写)的Python文件。
如果要考虑所有Python文件,请禁用DAG_DISCOVERY_SAFE_MODE配置标志。
您还可以在DAG_FOLDER或其任何子文件夹中提供一个.airflowignore文件,该文件描述了加载器应忽略的文件模式。它涵盖所在目录及其下所有子文件夹。有关文件语法的详细信息,请参阅下方的.airflowignore。
当.airflowignore文件无法满足需求,且您需要更灵活的方式控制Python文件是否需要被airflow解析时,可以通过在配置文件中设置might_contain_dag_callable来插入您的可调用对象。
请注意,此可调用对象将替换Airflow的默认启发式方法(即检查Python文件中是否包含字符串airflow和dag,不区分大小写)。
def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
# Your logic to check if there are DAGs defined in the file_path
# Return True if the file_path needs to be parsed, otherwise False
运行DAGs¶
DAG将以以下两种方式之一运行:
当它们被触发时,无论是手动还是通过API
按照预定义的时间表执行,该时间表是DAG的一部分
DAGs 并不强制要求设置调度计划,但通常都会定义一个。你可以通过 schedule 参数来定义,如下所示:
with DAG("my_daily_dag", schedule="@daily"):
...
schedule 参数有多种有效值:
with DAG("my_daily_dag", schedule="0 0 * * *"):
...
with DAG("my_one_time_dag", schedule="@once"):
...
with DAG("my_continuous_dag", schedule="@continuous"):
...
提示
有关不同类型调度的更多信息,请参阅编写与调度。
每次运行DAG时,你都在创建一个新的DAG实例,Airflow称之为DAG Run。同一个DAG可以并行运行多个DAG Runs,每个运行都有定义的数据间隔,用于标识任务应处理的数据周期。
举个例子说明这为何有用,假设编写一个处理每日实验数据集的DAG。它被重写了,而您想针对过去3个月的数据运行它——没问题,因为Airflow可以回填该DAG,并为过去3个月中的每一天同时运行其副本。
这些DAG运行都将在同一天实际启动,但每个DAG运行将有一个数据间隔覆盖该3个月期间中的单一天,该数据间隔是DAG内部所有任务、操作符和传感器运行时查看的内容。
就像每次运行时DAG会实例化为一个DAG Run一样, 在DAG内部指定的任务也会随之实例化为 Task Instances。
一个DAG运行将在开始时有一个开始日期,在结束时有一个结束日期。这个时间段描述了DAG实际"运行"的时间。除了DAG运行的开始和结束日期外,还有一个称为逻辑日期(正式名称为执行日期)的日期,它描述了DAG运行被调度或触发的预期时间。之所以称之为逻辑,是因为它具有多重含义的抽象性质,具体取决于DAG运行本身的上下文。
例如,如果用户手动触发一个DAG运行,其逻辑日期将是触发DAG运行的日期和时间,该值应等于DAG运行的开始日期。然而,当DAG被自动调度时,在设置了特定调度间隔的情况下,逻辑日期将表示数据间隔开始的时间点,此时DAG运行的开始日期将是逻辑日期加上调度间隔。
提示
有关logical date的更多信息,请参阅数据间隔和
execution_date是什么意思?。
DAG 分配¶
请注意,每个Operator/Task都必须分配给一个DAG才能运行。Airflow提供了几种无需显式传递即可计算DAG的方式:
如果你在
with DAG代码块内声明你的Operator如果你在
@dag装饰器内部声明你的Operator如果您将您的Operator放置在有DAG的Operator的上游或下游
否则,您必须通过dag=将其传递给每个Operator。
默认参数¶
通常,DAG中的许多Operator都需要相同的默认参数集(例如它们的retries)。与其为每个Operator单独指定这些参数,您可以在创建DAG时传递default_args,它会自动将这些参数应用到与之绑定的任何Operator:
import pendulum
with DAG(
dag_id="my_dag",
start_date=pendulum.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 2},
):
op = BashOperator(task_id="hello_world", bash_command="Hello World!")
print(op.retries) # 2
DAG装饰器¶
在2.0版本中新增。
除了使用上下文管理器或DAG()构造函数这类传统方式来声明单个DAG外,您还可以用@dag装饰一个函数,将其转换为DAG生成器函数:
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_dag_decorator(email: str = "example@example.com"):
"""
DAG to send server IP to email.
:param email: Email to send IP to. Defaults to example@example.com.
"""
get_ip = GetRequestOperator(task_id="get_ip", url="http://httpbin.org/get")
@task(multiple_outputs=True)
def prepare_email(raw_json: dict[str, Any]) -> dict[str, str]:
external_ip = raw_json["origin"]
return {
"subject": f"Server connected from {external_ip}",
"body": f"Seems like today your server executing Airflow is connected from IP {external_ip}<br>",
}
email_info = prepare_email(get_ip.output)
EmailOperator(
task_id="send_email", to=email, subject=email_info["subject"], html_content=email_info["body"]
)
example_dag = example_dag_decorator()
除了作为一种使DAG保持整洁的新方法外,该装饰器还会将函数中的任何参数设置为DAG参数,让您在触发DAG时设置这些参数。然后,您可以从Python代码中访问这些参数,或者在Jinja模板中通过{{ context.params }}访问它们。
注意
Airflow 只会加载出现在顶层的 DAG 文件。这意味着你不能仅仅用@dag声明一个函数——还必须至少在 DAG 文件中调用它一次,并将其分配给顶层对象,正如你在上面的示例中所见。
控制流¶
默认情况下,DAG只有在它所依赖的所有任务都成功时才会运行一个任务。但有几种方法可以修改此行为:
分支 - 根据条件选择要执行的任务
触发规则 - 设置DAG运行任务的条件
设置与拆卸 - 定义设置和拆卸关系
Latest Only - 一种特殊的分支形式,仅针对当前运行的DAG生效
依赖过去 - 任务可以依赖于它们自己从之前的运行
分支¶
你可以利用分支功能来指示DAG不要运行所有依赖任务,而是选择一条或多条路径继续执行。这正是@task.branch装饰器的用途所在。
@task.branch装饰器与@task非常相似,不同之处在于它期望被装饰的函数返回一个任务ID(或ID列表)。系统会执行指定的任务,同时跳过所有其他路径。它也可以返回None来跳过所有下游任务。
Python函数返回的task_id必须引用一个直接位于@task.branch装饰任务下游的任务。
注意
当一个任务既位于分支操作符下游,又位于一个或多个选定任务的下游时,该任务将不会被跳过:
分支任务的路径是 branch_a, join 和 branch_b。由于 join 是 branch_a 的下游任务,它仍会被运行,即使它没有被作为分支决策的一部分返回。
@task.branch 也可以与 XComs 一起使用,允许分支上下文根据上游任务动态决定要遵循的分支。例如:
@task.branch(task_id="branch_task")
def branch_func(ti=None):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
elif xcom_value >= 3:
return "stop_task"
else:
return None
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
do_xcom_push=True,
dag=dag,
)
branch_op = branch_func()
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
如果你想实现具有分支功能的自定义操作符,可以继承BaseBranchOperator,它的行为类似于@task.branch装饰器,但需要你提供choose_branch方法的实现。
注意
建议在DAG中使用@task.branch装饰器,而不是直接实例化BranchPythonOperator。后者通常仅应在实现自定义操作符时进行子类化。
与@task.branch的可调用对象一样,此方法可以返回下游任务的ID或任务ID列表,这些任务将被运行,其他所有任务将被跳过。它也可以返回None以跳过所有下游任务:
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run an extra branch on the first day of the month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None
类似于常规Python代码中的@task.branch装饰器,也存在使用虚拟环境的分支装饰器@task.branch_virtualenv,以及调用外部Python的分支装饰器@task.branch_external_python。
仅限最新¶
Airflow的DAG运行通常针对与当前日期不同的日期执行 - 例如,为上个月的每一天运行一个DAG副本来回填某些数据。
不过在某些情况下,您不希望让DAG的某些(或全部)部分为过去的日期运行;这时可以使用LatestOnlyOperator。
这个特殊的Operator会跳过其下游的所有任务,前提是您不在“最新”的DAG运行中(如果当前挂钟时间介于其execution_time和下一个计划execution_time之间,并且不是外部触发的运行)。
以下是一个示例:
import datetime
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="latest_only_with_trigger",
schedule=datetime.timedelta(hours=4),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example3"],
) as dag:
latest_only = LatestOnlyOperator(task_id="latest_only")
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)
latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
在这个DAG的情况下:
task1直接依赖于latest_only,并且除了最新的运行外,其他所有运行都将跳过该任务。task2完全独立于latest_only,将在所有计划周期中运行task3是task1和task2的下游任务,由于默认的 trigger rule 是all_success,因此会从task1接收到级联跳过。task4是task1和task2的下游任务,但它不会被跳过,因为它的trigger_rule被设置为all_done。
依赖过去¶
你也可以设置任务仅在前一次DAG运行中该任务成功执行后才能运行。要实现这一点,只需将任务的depends_on_past参数设置为True即可。
请注意,如果您在DAG的生命周期最初阶段运行它——具体来说,是其第一次自动运行——那么任务仍会执行,因为此时没有先前的运行可供依赖。
触发规则¶
默认情况下,Airflow会等待某个任务的所有上游(直接父级)任务成功完成后,才会运行该任务。
然而,这只是默认行为,您可以通过任务的trigger_rule参数来控制它。trigger_rule的选项包括:
all_success(默认): 所有上游任务都已成功all_failed: 所有上游任务都处于failed或upstream_failed状态all_done: 所有上游任务都已完成执行all_skipped: 所有上游任务都处于skipped状态one_failed: 至少有一个上游任务失败(不等待所有上游任务完成)one_success: 至少有一个上游任务已成功(不需要等待所有上游任务完成)one_done: 至少有一个上游任务成功或失败none_failed: 所有上游任务都没有failed或upstream_failed状态 - 即所有上游任务都已成功完成或被跳过none_failed_min_one_success: 所有上游任务都没有failed或upstream_failed状态,且至少有一个上游任务已成功。none_skipped: 没有上游任务处于skipped状态 - 即所有上游任务都处于success、failed或upstream_failed状态always: 完全不依赖任何条件,随时可以运行此任务
你也可以根据需要将其与Depends On Past功能结合使用。
注意
需要特别注意触发规则与被跳过任务之间的交互关系,尤其是作为分支操作一部分被跳过的任务。在分支操作的下游几乎从不应该使用all_success或all_failed。
跳过的任务将通过触发规则all_success和all_failed级联,并导致它们也被跳过。考虑以下DAG:
# dags/branch_without_trigger.py
import pendulum
from airflow.decorators import task
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
dag = DAG(
dag_id="branch_without_trigger",
schedule="@once",
start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)
run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
@task.branch(task_id="branching")
def do_branching():
return "branch_a"
branching = do_branching()
branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)
branch_false = EmptyOperator(task_id="branch_false", dag=dag)
join = EmptyOperator(task_id="join", dag=dag)
run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join
join 是 follow_branch_a 和 branch_false 的下游任务。join 任务将显示为跳过状态,因为其 trigger_rule 默认设置为 all_success,而由分支操作引起的跳过状态会级联传播,导致标记为 all_success 的任务也被跳过。
通过在join任务中将trigger_rule设置为none_failed_min_one_success,我们可以获得预期的行为:
设置与拆卸¶
在数据工作流中,通常会创建一个资源(如计算资源),用它完成一些工作,然后将其拆除。Airflow提供了设置和拆卸任务来满足这一需求。
详情请参阅主文章Setup and Teardown了解如何使用此功能。
动态DAG¶
由于DAG是通过Python代码定义的,因此它不需要是纯声明式的;你可以自由使用循环、函数等来定义你的DAG。
例如,这里有一个使用for循环定义若干任务的DAG:
with DAG("loop_example", ...):
first = EmptyOperator(task_id="first")
last = EmptyOperator(task_id="last")
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
t = EmptyOperator(task_id=option)
first >> t >> last
一般来说,我们建议您尽量保持DAG任务的拓扑结构(布局)相对稳定;动态DAG通常更适合用于动态加载配置选项或更改操作器选项。
DAG可视化¶
如果你想查看DAG的可视化表示,你有两种选择:
你可以加载Airflow用户界面,导航到你的DAG,然后选择"图形视图"
你可以运行
airflow dags show命令,它会将DAG渲染成图像文件
我们通常建议您使用图形视图,因为它还会显示您所选择的任何DAG运行中所有任务实例的状态。
当然,随着您开发的DAG越来越复杂,我们提供了几种修改这些DAG视图的方法,使其更易于理解。
任务组¶
TaskGroup可用于在图视图中将任务组织成层级分组。它有助于创建重复模式并减少视觉杂乱。
与SubDAGs不同,TaskGroups纯粹是一个UI分组概念。TaskGroups中的任务位于同一个原始DAG上,并遵循所有DAG设置和池配置。
依赖关系可以通过>>和<<运算符应用于TaskGroup中的所有任务。例如,以下代码将task1和task2放入TaskGroup group1中,然后将这两个任务都设置为task3的上游:
from airflow.decorators import task_group
@task_group()
def group1():
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
group1() >> task3
TaskGroup 也支持像 DAG 一样的 default_args,它会覆盖 DAG 层级的 default_args:
import datetime
from airflow import DAG
from airflow.decorators import task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="dag1",
start_date=datetime.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 1},
):
@task_group(default_args={"retries": 3})
def group1():
"""This docstring will become the tooltip for the TaskGroup."""
task1 = EmptyOperator(task_id="task1")
task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
print(task1.retries) # 3
print(task2.retries) # 2
如果想查看TaskGroup更高级的用法,可以参考Airflow自带的示例DAG文件example_task_group_decorator.py。
注意
默认情况下,子任务/TaskGroups的ID会以其父TaskGroup的group_id作为前缀。这有助于确保整个DAG中group_id和task_id的唯一性。
要禁用前缀功能,在创建TaskGroup时传入prefix_group_id=False,但请注意现在您需要自行确保每个任务和组都具有唯一的ID。
注意
使用@task_group装饰器时,被装饰函数的文档字符串将作为UI中TaskGroups的提示信息显示,除非显式提供了tooltip值。
边标签¶
除了将任务分组外,您还可以在图形视图中为不同任务之间的依赖边添加标签 - 这对于DAG中的分支区域特别有用,这样您就可以标记某些分支可能运行的条件。
要添加标签,可以直接使用内联的>>和<<操作符:
from airflow.utils.edgemodifier import Label
my_task >> Label("When empty") >> other_task
或者,你可以传递一个Label对象给set_upstream/set_downstream:
from airflow.utils.edgemodifier import Label
my_task.set_downstream(other_task, Label("When empty"))
这是一个示例DAG,展示了如何标记不同的分支:
with DAG(
"example_branch_labels",
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = EmptyOperator(task_id="ingest")
analyse = EmptyOperator(task_id="analyze")
check = EmptyOperator(task_id="check_integrity")
describe = EmptyOperator(task_id="describe_integrity")
error = EmptyOperator(task_id="email_error")
save = EmptyOperator(task_id="save")
report = EmptyOperator(task_id="report")
ingest >> analyse >> check
check >> Label("No errors") >> save >> report
check >> Label("Errors found") >> describe >> error >> report
DAG与任务文档¶
可以为您的DAGs和任务对象添加文档或注释,这些内容在Web界面中可见(DAGs的"Graph"和"Tree"视图,任务的"Task Instance Details"视图)。
如果定义了以下一组特殊任务属性,它们将被渲染为富文本内容:
属性 |
渲染为 |
|---|---|
文档 |
等宽字体 |
doc_json |
json |
文档_yaml |
yaml |
文档标记 |
markdown |
文档_rst |
重构文本 |
请注意,对于DAGs,doc_md是唯一被解析的属性。对于DAGs,它可以包含一个字符串或对markdown文件的引用。Markdown文件通过以.md结尾的字符串来识别。
如果提供的是相对路径,它将从Airflow调度器或DAG解析器启动的相对路径加载。如果markdown文件不存在,传递的文件名将被用作文本,不会显示异常。请注意,markdown文件在DAG解析期间加载,对markdown内容的更改需要一个DAG解析周期才能显示更改。
这在任务是从配置文件动态构建时特别有用,因为它允许您在Airflow中暴露导致相关任务的配置:
"""
### My great DAG
"""
import pendulum
dag = DAG(
"my_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="@daily",
catchup=False,
)
dag.doc_md = __doc__
t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""
子DAG¶
注意
SubDAG已被弃用,因此TaskGroup始终是首选方案。
有时,您会发现需要定期向每个DAG添加完全相同的任务集,或者希望将大量任务分组为一个逻辑单元。这正是SubDAG的用途所在。
例如,这里有一个DAG,它在两个部分中包含大量并行任务:
我们可以将所有并行的task-*操作符合并为一个SubDAG,这样最终的DAG将类似于以下结构:
请注意,SubDAG操作符应包含一个返回DAG对象的工厂方法。这将防止SubDAG在主界面中被视为独立的DAG——记住,如果Airflow在Python文件的顶层看到一个DAG,它会将其作为自己的DAG加载。例如:
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
def subdag(parent_dag_name, child_dag_name, args) -> DAG:
"""
Generate a DAG to be used as a subdag.
:param str parent_dag_name: Id of the parent DAG
:param str child_dag_name: Id of the child DAG
:param dict args: Default arguments to provide to the subdag
:return: DAG to use as a subdag
"""
dag_subdag = DAG(
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=args,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
)
for i in range(5):
EmptyOperator(
task_id=f"{child_dag_name}-task-{i + 1}",
default_args=args,
dag=dag_subdag,
)
return dag_subdag
这个SubDAG可以在您的主DAG文件中引用:
import datetime
from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
DAG_NAME = "example_subdag_operator"
with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)
section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
)
some_other_task = EmptyOperator(
task_id="some-other-task",
)
section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)
end = EmptyOperator(
task_id="end",
)
start >> section_1 >> some_other_task >> section_2 >> end
您可以从主DAG的图形视图中放大查看SubDagOperator,以显示子DAG中包含的任务:
使用SubDAGs时的一些其他提示:
按照惯例,SubDAG的
dag_id应该以其父DAG的名称加点号作为前缀(parent.child)你应该在主DAG和子DAG之间通过向子DAG操作符传递参数来共享参数(如上所示)
子DAG必须设置调度计划并启用。如果子DAG的调度计划设置为
None或@once,该子DAG会在不执行任何操作的情况下直接成功。清除
SubDagOperator也会清除其中任务的状态。在
SubDagOperator上标记成功不会影响其中任务的状态。避免在SubDAG内的任务中使用Depends On Past,因为这可能会引起混淆。
您可以为SubDAG指定一个执行器。如果想在进程中运行SubDAG并将其并行度限制为1,通常使用SequentialExecutor。使用LocalExecutor可能会出现问题,因为它可能会过度占用您的工作节点,在单个槽中运行多个任务。
查看 airflow/example_dags 以获取演示。
注意
并行性不被SubDagOperator所遵循,因此SubdagOperators可能会消耗超出您设置限制的资源。
任务组 vs 子DAG¶
SubDAGs虽然与TaskGroups用途相似,但由于其实现方式,会带来性能和功能上的问题。
SubDagOperator会启动一个BackfillJob,该作业会忽略现有的并行度配置,可能导致工作环境资源过载。
子DAG拥有自己的DAG属性。当子DAG的属性与其父DAG不一致时,可能会出现意外行为。
由于SubDAGs本身就是一个完整的DAG,因此无法在一个视图中看到"完整"的DAG。
SubDAGs会引入各种边界情况和注意事项。这可能会影响用户体验和预期。
另一方面,TaskGroups是一个更好的选择,因为它纯粹是一个UI分组概念。TaskGroup内的所有任务仍然表现得与TaskGroup外的任何其他任务一样。
你可以看到这两种结构之间的核心差异。
任务组 |
子DAG |
|---|---|
作为同一DAG中的重复模式 |
作为独立DAG的重复模式 |
为DAG提供一组视图和统计数据 |
在父DAG和子DAG之间提供独立的视图和统计数据 |
一组DAG配置 |
多组DAG配置 |
通过现有的SchedulerJob遵守并行度配置 |
由于新生成的BackfillJob而不遵守并行度配置 |
使用上下文管理器的简单构造声明 |
具有命名限制的复杂DAG工厂 |
打包DAGs¶
虽然较简单的DAG通常只包含在单个Python文件中,但更复杂的DAG通常会分布在多个文件中,并且需要附带它们的依赖项("vendored")。
您可以选择在DAG_FOLDER目录内使用标准文件系统布局完成所有操作,也可以将DAG及其所有Python文件打包成单个zip文件。例如,您可以打包两个DAG及其所需的依赖项,形成一个包含以下内容的zip文件:
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
请注意,打包的DAG有以下注意事项:
如果启用了pickling进行序列化,则无法使用它们
它们不能包含已编译的库(例如
libz.so),只能是纯Python它们将被插入到Python的
sys.path中,并且可以被Airflow进程中的任何其他代码导入,因此请确保包名与系统中已安装的其他包不发生冲突。
一般来说,如果你有一组复杂的编译依赖项和模块,使用Python的virtualenv系统并通过pip在目标系统上安装必要的包可能会更好。
.airflowignore¶
.airflowignore 文件用于指定 Airflow 应故意忽略的 DAG_FOLDER
或 PLUGINS_FOLDER 中的目录或文件。Airflow 支持
文件中模式的两种语法风格,由 DAG_IGNORE_FILE_SYNTAX
配置参数指定(在 Airflow 2.3 中新增):regexp 和 glob。
注意
默认的DAG_IGNORE_FILE_SYNTAX是regexp以确保向后兼容性。
对于regexp模式语法(默认模式),.airflowignore文件中的每一行都指定一个正则表达式模式,任何名称(非DAG id)匹配这些模式的目录或文件将被忽略(底层使用Pattern.search()进行模式匹配)。使用#字符表示注释;所有以#开头的行上的字符都会被忽略。
与Airflow中的大多数正则表达式匹配一样,使用的正则表达式引擎是re2,该引擎明确不支持许多高级功能,更多信息请参阅其文档。
使用glob语法时,模式匹配的工作方式与.gitignore文件中的模式完全相同:
字符
*将匹配任意数量的字符,除了/字符
?将匹配除/之外的任何单个字符范围表示法,例如
[a-zA-Z],可用于匹配范围内的一个字符可以通过在模式前添加
!来否定该模式。模式按顺序评估,因此否定可以覆盖同一文件中先前定义的模式或父目录中定义的模式。双星号 (
**) 可用于跨目录匹配。例如,**/__pycache__/将忽略每个子目录中无限深度的__pycache__目录。如果模式开头或中间(或两者)有
/,则该模式相对于特定.airflowignore文件本身的目录级别。否则该模式也可能匹配.airflowignore级别以下的任何级别。
.airflowignore 文件应放置在您的 DAG_FOLDER 中。例如,您可以使用 regexp 语法准备一个内容如下的 .airflowignore 文件
project_a
tenant_[\d]
或者,等效地,使用glob语法
**/*project_a*
tenant_[0-9]*
那么像project_a_dag_1.py、TESTING_project_a.py、tenant_1.py、
project_a/dag_1.py和tenant_1/dag_1.py这些文件在你的DAG_FOLDER中会被忽略
(如果一个目录的名称匹配了任何模式,该目录及其所有子文件夹将完全不会被Airflow扫描。这提高了DAG查找的效率)。
.airflowignore文件的适用范围是其所在目录及其所有子目录。
您还可以在DAG_FOLDER的子文件夹中准备.airflowignore文件,
该文件将仅适用于该子文件夹。
DAG 依赖关系¶
Airflow 2.1 版本新增
虽然DAG中任务之间的依赖关系通过上下游关系明确定义,但DAG之间的依赖关系则稍微复杂一些。一般来说,一个DAG对另一个DAG的依赖有两种方式:
触发 -
TriggerDagRunOperator等待中 -
ExternalTaskSensor
额外的复杂性在于,一个DAG可能需要等待或触发另一个DAG在不同数据间隔下的多次运行。DAG依赖关系视图
Menu -> Browse -> DAG Dependencies可以帮助可视化DAG之间的依赖关系。这些依赖关系
由调度器在DAG序列化过程中计算得出,网络服务器则利用它们来构建依赖关系图。
依赖检测器是可配置的,因此您可以实现与DependencyDetector中默认逻辑不同的自定义逻辑
DAG 暂停、停用和删除¶
当DAG处于"非运行"状态时,可能有几种情况。DAG可以被暂停、停用,最终可以删除DAG的所有元数据。
当DAG存在于DAGS_FOLDER中,且调度程序已将其存储在数据库中时,用户可以通过界面暂停该DAG。用户可以通过界面和API执行"暂停"和"取消暂停"操作。被暂停的DAG不会被调度程序自动调度,但您可以通过界面手动触发运行。在界面中,您可以在Paused标签页查看被暂停的DAG,而取消暂停的DAG则位于Active标签页。当DAG被暂停时,正在运行的任务会被允许完成,所有下游任务会被置为"Scheduled"状态。当DAG取消暂停后,任何处于"scheduled"状态的任务将根据DAG逻辑开始运行。没有"scheduled"任务的DAG将按其预定计划开始运行。
DAG可以通过从DAGS_FOLDER中移除来停用(不要与UI中的Active标签混淆)。当调度程序解析DAGS_FOLDER时,如果发现之前见过并存储在数据库中的DAG缺失,会将其标记为停用状态。停用DAG的元数据和历史记录会被保留,当DAG被重新添加到DAGS_FOLDER时,它将再次激活且历史记录可见。您无法通过UI或API来激活/停用DAG,这只能通过从DAGS_FOLDER中移除文件来实现。再次强调——当调度程序停用DAG时,其历史运行数据不会丢失。请注意,Airflow UI中的Active选项卡指的是既Activated又Not paused的DAG,因此这最初可能会有点令人困惑。
在用户界面中无法查看已停用的DAG - 有时可以看到历史运行记录,但当尝试查看相关信息时,会显示DAG缺失的错误。
你也可以通过用户界面或API从元数据数据库中删除DAG的元数据,但这并不总是会导致DAG从界面中消失——起初这可能会让人感到困惑。如果你在删除元数据时DAG仍在DAGS_FOLDER中,DAG会重新出现,因为调度程序会解析该文件夹,只有DAG的历史运行信息会被移除。
这意味着如果你想实际删除一个DAG及其所有历史元数据,需要分三步操作:
暂停DAG
通过用户界面或API从数据库中删除历史元数据
从
DAGS_FOLDER中删除DAG文件,并等待其变为非活动状态
DAG自动暂停功能(实验性)¶
DAG也可以配置为自动暂停。
有一个Airflow配置允许在DAG连续失败N次后自动禁用。
我们也可以通过DAG参数提供和覆盖这些配置:
max_consecutive_failed_dag_runs: 覆盖 max_consecutive_failed_dag_runs_per_dag。