最佳实践¶
创建一个新的DAG需要三个步骤:
编写Python代码来创建DAG对象
测试代码是否符合您的预期,
配置运行DAG所需的环境依赖项
本教程将向您介绍这三个步骤的最佳实践。
编写DAG¶
在Airflow中创建新的DAG非常简单。但需要注意许多事项,以确保DAG运行或失败时不会产生意外结果。
创建自定义Operator/Hook¶
请遵循我们关于自定义操作器的指南。
创建任务¶
您应将Airflow中的任务视为数据库中的事务。这意味着您不应该从任务中产生不完整的结果。例如,在任务结束时不应在HDFS或S3中生成不完整的数据。
如果任务失败,Airflow可以重试。因此,每次重新运行时任务应产生相同的结果。以下是一些避免产生不同结果的方法 -
在任务重新运行时不要使用INSERT语句,INSERT语句可能会导致数据库中出现重复行。请用UPSERT替换它。
读取和写入特定分区。切勿在任务中读取最新的可用数据。在重新运行之间,可能会有人更新输入数据,从而导致不同的输出。更好的方法是从特定分区读取输入数据。你可以使用
data_interval_start作为分区。在向S3/HDFS写入数据时也应遵循这种分区方法。Python的
now()函数返回当前日期时间对象。在任务内部绝不应使用此函数,特别是用于关键计算,因为它会导致每次运行结果不同。但可以用于生成临时日志等场景。
提示
您应将重复性参数(如connection_id或S3路径)定义在default_args中,而不是为每个任务单独声明。
default_args有助于避免诸如拼写错误等问题。此外,大多数连接类型在任务中都有唯一的参数名称,
因此您只需在default_args中声明一次连接(例如gcp_conn_id),所有使用该连接类型的操作器都会自动使用它。
删除任务¶
从DAG中删除任务时请谨慎操作。删除后,您将无法在图形视图、网格视图等界面中看到该任务,这会使得从Web服务器查看该任务的日志变得困难。如果不希望出现这种情况,请创建一个新的DAG。
通信¶
Airflow在不同的服务器上执行DAG任务,前提是您使用了Kubernetes执行器或Celery执行器。
因此,您不应将任何文件或配置存储在本地文件系统中,因为下一个任务很可能在没有访问权限的不同服务器上运行——例如,一个任务下载数据文件供下一个任务处理。
在使用Local executor的情况下,
将文件存储在磁盘上可能会使重试变得更加困难,例如您的任务需要一个被DAG中另一个任务删除的配置文件。
如果可能的话,使用XCom在任务间传递小消息,而在任务间传递较大数据的好方法是使用像S3/HDFS这样的远程存储。
例如,如果我们有一个任务将处理后的数据存储在S3中,该任务可以将输出数据的S3路径推送到Xcom中,
下游任务可以从XCom中拉取该路径并使用它来读取数据。
任务也不应在内部存储任何认证参数,如密码或令牌。 尽可能使用Connections将数据安全地存储在Airflow后端,并通过唯一的连接ID来检索它们。
顶层Python代码¶
您应避免编写不必要的顶层代码来创建操作符并构建它们之间的DAG关系。这是由于Airflow调度器的设计决策以及顶层代码解析速度对Airflow性能和可扩展性的影响。
Airflow调度器以min_file_process_interval秒为最小间隔,在Operator的execute方法外部执行代码。这样做的目的是为了实现DAG的动态调度——调度和依赖关系可能会随时间变化,并影响DAG的下一次调度。Airflow调度器会持续确保DAG中的内容能正确反映在调度的任务中。
具体来说,您不应运行任何数据库访问、繁重计算和网络操作。
影响DAG加载时间的一个重要因素(Python开发者可能容易忽视)是顶层导入可能会耗费大量时间,产生显著的开销。通过将其转换为Python可调用对象内部的局部导入,可以轻松避免这一问题。
请看以下两个示例。在第一个示例中,DAG解析将额外花费1000秒,而功能相同的第二个示例中的DAG则是在任务上下文中执行expensive_api_call。
不避免顶层DAG代码:
import pendulum
from airflow import DAG
from airflow.decorators import task
def expensive_api_call():
print("Hello from Airflow!")
sleep(1000)
my_expensive_response = expensive_api_call()
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_expensive_api_call():
print(my_expensive_response)
避免顶层DAG代码:
import pendulum
from airflow import DAG
from airflow.decorators import task
def expensive_api_call():
sleep(1000)
return "Hello from Airflow!"
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_expensive_api_call():
my_expensive_response = expensive_api_call()
print(my_expensive_response)
在第一个示例中,每次解析DAG文件时都会执行expensive_api_call,这会导致DAG文件处理性能不佳。而在第二个示例中,expensive_api_call仅在任务运行时被调用,因此能够在不影响性能的情况下完成解析。要亲自测试这一点,可以实施第一个DAG,然后在调度器日志中看到"Hello from Airflow!"的打印信息!
请注意,导入语句也被视为顶层代码。因此,如果某个导入语句耗时较长,或者被导入的模块本身在顶层执行代码,这也会影响调度程序的性能。以下示例展示了如何处理耗时的导入。
# It's ok to import modules that are not expensive to load at top-level of a DAG file
import random
import pendulum
# Expensive imports should be avoided as top level imports, because DAG files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#
...
@task()
def do_stuff_with_pandas_and_torch():
import pandas
import torch
# do some operations using pandas and torch
@task()
def do_stuff_with_tensorflow():
import tensorflow
# do some operations using tensorflow
如何检查我的代码是否为“顶层”代码¶
为了理解你的代码是否属于"顶层",你需要深入了解Python解析机制的许多复杂细节。一般来说,当Python解析.py文件时,它会执行所看到的代码,但(通常)不会执行方法内部的代码。
它有许多不明显的特殊情况 - 例如顶层代码也包含用于确定方法默认值的任何代码。
不过,有一种简单的方法可以检查您的代码是否为"顶层"代码。您只需要解析代码并查看该段代码是否被执行。
想象这段代码:
from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum
def get_task_id():
return "print_array_task" # <- is that code going to be executed?
def get_array():
return [1, 2, 3] # <- is that code going to be executed?
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
operator = PythonOperator(
task_id=get_task_id(),
python_callable=get_array,
dag=dag,
)
你可以通过添加一些打印语句到要检查的代码中来进行验证,然后运行
python 。
from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum
def get_task_id():
print("Executing 1")
return "print_array_task" # <- is that code going to be executed? YES
def get_array():
print("Executing 2")
return [1, 2, 3] # <- is that code going to be executed? NO
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
operator = PythonOperator(
task_id=get_task_id(),
python_callable=get_array,
dag=dag,
)
当你执行这段代码时,你会看到:
root@cf85ab34571e:/opt/airflow# python /files/test_python.py
Executing 1
这意味着get_array不会作为顶层代码执行,但get_task_id会执行。
动态DAG生成¶
有时手动编写DAG并不实际。 可能您有许多DAG,它们执行类似的操作,只是参数有所不同。 或者您可能需要一组DAG来加载表,但不想在表每次变更时手动更新DAG。 在这些及其他情况下,动态生成DAG会更加实用。
在前一章描述的顶层代码中避免过度处理尤为重要,特别是在动态DAG配置的情况下,可以通过以下方式之一进行配置:
通过 环境变量 (不要与 Airflow Variables 混淆)
通过外部提供的、生成的Python代码,包含DAG文件夹中的元数据
通过DAG文件夹中外部提供的生成配置元数据文件
动态DAG生成的一些案例在动态DAG生成章节中有所描述。
Airflow变量¶
使用Airflow Variables会产生网络调用和数据库访问,因此如前一章Top level Python Code所述,应尽量避免在DAG的顶层Python代码中使用它们。 如果必须在顶层DAG代码中使用Airflow Variables,则可以通过启用实验性缓存来减轻它们对DAG解析的影响,并配置合理的ttl。
您可以在操作符的execute()方法中自由使用Airflow变量,但也可以通过Jinja模板将Airflow变量传递给现有操作符,这将延迟到任务执行时才读取值。
实现这一点的模板语法是:
{{ var.value.<variable_name> }}
或者如果您需要从变量中反序列化一个json对象:
{{ var.json.<variable_name> }}
在顶层代码中,使用jinja模板的变量直到任务运行时才会产生请求,而如果未启用缓存,Variable.get()每次调度程序解析dag文件时都会产生请求。
在不启用缓存的情况下使用Variable.get()会导致dag文件处理性能不佳。
在某些情况下,这可能导致dag文件在完全解析之前超时。
错误示例:
from airflow.models import Variable
foo_var = Variable.get("foo") # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)
bash_use_variable_bad_2 = BashOperator(
task_id="bash_use_variable_bad_2",
bash_command=f"echo variable foo=${Variable.get('foo')}", # AVOID THAT
)
bash_use_variable_bad_3 = BashOperator(
task_id="bash_use_variable_bad_3",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": Variable.get("foo")}, # AVOID THAT
)
好的示例:
bash_use_variable_good = BashOperator(
task_id="bash_use_variable_good",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": "{{ var.value.get('foo') }}"},
)
@task
def my_task():
var = Variable.get("foo") # This is ok since my_task is called only during task run, not during DAG scan.
print(var)
出于安全考虑,建议您对包含敏感数据的任何变量使用Secrets Backend。
时间表¶
避免在时间表代码的顶层使用Airflow变量/连接或访问airflow数据库。 数据库访问应延迟到DAG执行时进行。这意味着您不应将变量/连接的获取作为时间表类初始化的参数,也不应在自定义时间表模块的顶层使用变量/连接。
错误示例:
from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable
class CustomTimetable(CronDataIntervalTimetable):
def __init__(self, *args, something=Variable.get("something"), **kwargs):
self._something = something
super().__init__(*args, **kwargs)
好的示例:
from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable
class CustomTimetable(CronDataIntervalTimetable):
def __init__(self, *args, something="something", **kwargs):
self._something = Variable.get(something)
super().__init__(*args, **kwargs)
变更后触发DAG¶
在修改DAG或DAG文件夹中的任何其他相关文件后,避免立即触发DAG运行。
您应该给系统足够的时间来处理更改的文件。这需要几个步骤。 首先,文件必须分发到调度程序 - 通常通过分布式文件系统或Git同步,然后 调度程序必须解析Python文件并将其存储在数据库中。根据您的配置、 分布式文件系统的速度、文件数量、DAG数量、文件中的更改数量、 文件大小、调度程序数量、CPU速度,这可能需要几秒到几分钟,在极端 情况下可能需要很多分钟。您应该等待您的DAG出现在UI中才能触发它。
如果您发现更新后到可以触发之间存在较长的延迟,可以查看以下配置参数并根据需求进行调整(点击链接查看每个参数的详细信息):
带触发规则的观察者模式示例¶
观察者模式是我们称呼这样一种DAG的方式:它包含一个"监视"其他任务状态的任务。 其主要目的是当任何其他任务失败时,使DAG运行失败。 这个需求源自Airflow系统测试,这些测试是由不同任务组成的DAG(类似于包含多个步骤的测试)。
通常情况下,当任何任务失败时,其他任务都不会被执行,整个DAG运行也会被标记为失败状态。但当我们使用触发规则时,可以打破任务运行的正常流程,整个DAG可能呈现出与我们预期不同的状态。例如,我们可以设置一个清理任务(其触发规则设为TriggerRule.ALL_DONE),无论其他任务状态如何都会执行(例如用于清理资源)。在这种情况下,DAG总会运行这个任务,且DAG运行将继承该特定任务的状态,因此我们可能会丢失关于失败任务的信息。如果我们想确保包含清理任务的DAG在任何任务失败时都会失败,就需要使用观察者模式。观察者任务是一个被触发时总会失败的任务,但它只需要在其他任务失败时被触发。它需要将触发规则设为TriggerRule.ONE_FAILED,并且必须成为DAG中所有其他任务的下游任务。这样,如果其他所有任务都成功,观察者任务会被跳过;但当有任务失败时,观察者任务将被执行并失败,从而导致整个DAG运行失败。
注意
请注意,触发规则仅依赖于直接上游(父级)任务,例如TriggerRule.ONE_FAILED
将忽略任何非参数化任务直接父级的失败(或upstream_failed)任务。
通过一个例子更容易理解这个概念。假设我们有以下DAG:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0)
def watcher():
raise AirflowException("Failing task because one or more upstream tasks failed.")
with DAG(
dag_id="watcher_example",
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", retries=0)
passing_task = BashOperator(task_id="passing_task", bash_command="echo passing_task")
teardown = BashOperator(
task_id="teardown",
bash_command="echo teardown",
trigger_rule=TriggerRule.ALL_DONE,
)
failing_task >> passing_task >> teardown
list(dag.tasks) >> watcher()
执行后,该DAG的可视化呈现如下所示:
我们有多个任务服务于不同目的:
failing_task总是失败,passing_task总是成功(如果被执行)。teardown总是会被触发(无论其他任务的状态如何)并且它应该总是成功执行,watcher是其他每个任务的下游任务,也就是说当任何任务失败时它都会被触发,从而导致整个DAG运行失败,因为它是一个叶子任务。
需要注意的是,如果没有watcher任务,整个DAG运行将获得success状态,因为唯一失败的任务不是叶子任务,而teardown任务将以success状态完成。
如果我们希望watcher监控所有任务的状态,需要让它分别依赖于所有任务。这样,如果任何任务失败,我们就可以让DAG运行失败。请注意,watcher任务的触发规则设置为"one_failed"。
另一方面,如果没有teardown任务,就不需要watcher任务,因为failing_task会将其failed状态传播给下游任务passed_task,整个DAG运行也将获得failed状态。
在集群策略中使用AirflowClusterPolicySkipDag异常来跳过特定DAG¶
在2.7版本中新增。
Airflow DAG通常可以通过git-sync与Git仓库的特定分支一起部署和更新。
但是,当由于某些运维原因需要运行多个Airflow集群时,维护多个Git分支会非常繁琐。
特别是当您需要按照适当的分支策略定期同步两个独立分支(如prod和beta)时,会遇到一些困难。
维护Git仓库时使用cherry-pick操作过于繁琐。
不建议将hard-reset作为GitOps的实现方式
因此,您可以考虑将多个Airflow集群连接到同一个Git分支(如main),并通过不同的环境变量和连接配置来维护它们,同时保持相同的connection_id。
您还可以在集群策略上抛出AirflowClusterPolicySkipDag异常,以便仅在特定的Airflow部署中将特定的DAG加载到DagBag中(如果需要的话)。
def dag_policy(dag: DAG):
"""Skipping the DAG with `only_for_beta` tag."""
if "only_for_beta" in dag.tags:
raise AirflowClusterPolicySkipDag(
f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
)
上面的示例展示了dag_policy代码片段,用于根据DAG的标签来决定是否跳过该DAG。
降低DAG复杂度¶
虽然Airflow擅长处理包含大量任务和依赖关系的DAG,但当您拥有许多复杂的DAG时,它们的复杂性可能会影响调度性能。要保持Airflow实例的高性能和良好利用率,您应尽可能简化和优化DAG——必须记住DAG解析和创建过程只是执行Python代码,而使其尽可能高效取决于您。没有让DAG"降低复杂性"的魔法配方——因为这是Python代码,DAG编写者才是控制代码复杂性的关键。
目前没有衡量DAG复杂度的"指标",特别是没有指标能告诉你你的DAG是否"足够简单"。不过,就像任何Python代码一样,当代码经过优化后,你肯定能看出DAG代码变得更"简单"或"更快"。如果你想优化DAG,可以采取以下措施:
让你的DAG加载更快。这是一个可以通过多种方式实现的单一改进建议,但它对调度器性能的影响最大。如果你的目标是提升性能,那么只要有机会让DAG加载更快——就去做吧。查看Top level Python Code获取相关技巧,同时参考DAG Loader Test了解如何评估你的DAG加载时间。
让你的DAG生成更简单的结构。每个任务依赖都会为调度和执行增加额外的处理开销。具有简单线性结构
A -> B -> C的DAG在任务调度中经历的延迟,会比具有深度嵌套树结构且依赖任务数量呈指数增长的DAG要少。如果你能让DAG更加线性化——在执行过程中每个时间点只有尽可能少的候选任务可运行,这将可能提高整体调度性能。减少每个文件中的DAG数量。虽然Airflow 2针对单个文件中包含多个DAG的情况进行了优化,但系统的某些部分可能会导致性能下降,或比将这些DAG分散到多个文件中引入更多延迟。例如,一个文件只能由一个FileProcessor解析这一事实就降低了可扩展性。如果您从一个文件生成许多DAG,并且观察到在Airflow用户界面中反映DAG文件更改需要很长时间,请考虑将它们拆分。
编写高效的Python代码。必须在上文提到的每个文件包含较少DAG与减少总体代码量之间取得平衡。创建描述DAG的Python文件时应遵循最佳编程实践,而不是像配置那样处理。如果您的DAG共享相似代码,您不应该将它们反复复制到大量几乎相同的源文件中,因为这会导致相同资源被不必要地重复导入多次。相反,您应该致力于最小化所有DAG中的重复代码,这样应用程序才能高效运行并易于调试。请参阅动态DAG生成了解如何使用相似代码创建多个DAG。
测试DAG¶
Airflow用户应将DAG视为生产级代码,并且DAG应配备多种关联测试以确保其产出预期结果。您可以为DAG编写多种测试。让我们来看看其中一些。
DAG加载器测试¶
该测试应确保您的DAG不包含在加载时会引发错误的代码片段。 用户无需编写额外代码即可运行此测试。
python your-dag-file.py
运行上述命令且无任何错误可确保您的DAG不包含任何未安装的依赖项、语法错误等。请确保在调度程序环境对应的环境中加载DAG - 具有相同的依赖项、环境变量以及DAG引用的公共代码。
这也是检查优化后DAG加载速度是否提升的好方法,如果您想尝试优化DAG加载时间。只需运行DAG并测量所需时间,但请再次确保您的DAG在相同的依赖项、环境变量和公共代码条件下运行。
有多种方法可以测量处理时间,在Linux环境中其中一种方法是使用内置的time命令。请确保连续多次运行以考虑缓存效应。在相同条件下(使用相同的机器、环境等)比较优化前后的结果,以评估优化的效果。
time python airflow/example_dags/example_python_operator.py
结果:
real 0m0.699s
user 0m0.590s
sys 0m0.108s
关键指标是"实时时间"——它告诉你处理DAG实际花费的时间。请注意,以这种方式加载文件时,会启动一个新的解释器,因此存在初始加载时间,而Airflow解析DAG时不会有这个时间。你可以通过运行以下命令来评估初始化时间:
time python -c ''
结果:
real 0m0.073s
user 0m0.037s
sys 0m0.039s
在这种情况下,解释器的初始启动时间约为0.07秒,这大约是解析上面example_python_operator.py文件所需时间的10%,因此对于示例DAG来说,实际解析时间约为0.62秒。
您可以查阅测试DAG了解如何测试单个操作器的详细信息。
单元测试¶
单元测试确保您的DAG中没有错误代码。您可以为任务和DAG编写单元测试。
加载DAG的单元测试:
import pytest
from airflow.models import DagBag
@pytest.fixture()
def dagbag():
return DagBag()
def test_dag_loaded(dagbag):
dag = dagbag.get_dag(dag_id="hello_world")
assert dagbag.import_errors == {}
assert dag is not None
assert len(dag.tasks) == 1
DAG结构单元测试: 这是一个示例测试,旨在验证代码生成的DAG结构与字典对象的匹配情况
def assert_dag_dict_equal(source, dag):
assert dag.task_dict.keys() == source.keys()
for task_id, downstream_list in source.items():
assert dag.has_task(task_id)
task = dag.get_task(task_id)
assert task.downstream_task_ids == set(downstream_list)
def test_dag():
assert_dag_dict_equal(
{
"DummyInstruction_0": ["DummyInstruction_1"],
"DummyInstruction_1": ["DummyInstruction_2"],
"DummyInstruction_2": ["DummyInstruction_3"],
"DummyInstruction_3": [],
},
dag,
)
自定义操作符的单元测试:
import datetime
import pendulum
import pytest
from airflow import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"
@pytest.fixture()
def dag():
with DAG(
dag_id=TEST_DAG_ID,
schedule="@daily",
start_date=DATA_INTERVAL_START,
) as dag:
MyCustomOperator(
task_id=TEST_TASK_ID,
prefix="s3://bucket/some/prefix",
)
return dag
def test_my_custom_operator_execute_no_trigger(dag):
dagrun = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DATA_INTERVAL_START,
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
start_date=DATA_INTERVAL_END,
run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
# Assert something related to tasks results.
自检¶
您还可以在DAG中实现检查逻辑,以确保任务按预期生成结果。 例如,如果您有一个将数据推送到S3的任务,可以在下一个任务中实现检查逻辑。比如,该检查可以 确保分区已在S3中创建,并执行一些简单的检查来确定数据是否正确。
类似地,如果您有一个在Kubernetes或Mesos中启动微服务的任务,您应该使用airflow.providers.http.sensors.http.HttpSensor来检查服务是否已启动。
task = PushToS3(...)
check = S3KeySensor(
task_id="check_parquet_exists",
bucket_key="s3://bucket/key/foo.parquet",
poke_interval=0,
timeout=0,
)
task >> check
预发布环境¶
如果可能,在生产环境部署前保留一个测试环境来完整测试DAG运行。 确保您的DAG已参数化以更改变量,例如S3操作的输出路径或用于读取配置的数据库。 不要在DAG中硬编码值,然后根据环境手动更改它们。
您可以使用环境变量来参数化DAG。
import os
dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/")
模拟变量和连接¶
当你为使用变量或连接的代码编写测试时,必须确保这些变量或连接在运行测试时存在。显而易见的解决方案是将这些对象保存到数据库中,以便在代码执行时读取。然而,读写数据库对象会带来额外的时间开销。为了加快测试执行速度,值得在不将这些对象保存到数据库的情况下模拟它们的存在。为此,你可以使用unittest.mock.patch.dict()来模拟os.environ创建环境变量。
对于变量,请使用 AIRFLOW_VAR_{KEY}。
with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
assert "env-value" == Variable.get("key")
对于连接,请使用 AIRFLOW_CONN_{CONN_ID}。
conn = Connection(
conn_type="gcpssh",
login="cat",
host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
assert "cat" == Connection.get_connection_from_secrets("my_conn").login
元数据库维护¶
随着时间的推移,随着更多DAG和任务运行以及事件日志的积累,元数据数据库的存储占用将会增加。
你可以使用Airflow CLI通过命令airflow db clean来清理旧数据。
更多详情请参阅db clean usage。
升级与降级¶
备份您的数据库¶
在执行任何修改数据库的操作之前,备份元数据数据库始终是一个明智的做法。
禁用调度器¶
在执行此类维护时,您可能需要考虑禁用Airflow集群。
一种实现方式是将参数[scheduler] > use_job_schedule设置为False,并等待所有正在运行的DAG完成;此后除非外部触发,否则不会创建新的DAG运行实例。
一个更好的方法(虽然稍微手动一些)是使用dags pause命令。您需要在此操作前记录哪些DAG处于暂停状态,以便在维护完成后知道需要恢复哪些DAG。首先运行airflow dags list并存储未暂停DAG的列表。然后使用相同的列表在维护前对每个DAG运行dags pause,并在维护后运行dags unpause。这样做的好处是,您可以在升级后仅恢复一两个DAG(可能是专用的test dags)来确认系统正常工作,然后再全面恢复所有DAG。
添加“集成测试”DAGs¶
添加几个使用生态系统中所有常见服务(如S3、Snowflake、Vault)但使用测试资源或"开发"账户的"集成测试"DAG会很有帮助。这些测试DAG可以是你升级后最先启用的,因为如果它们失败并不重要,你可以回滚到备份而不会产生负面影响。然而,如果它们成功运行,应该能证明你的集群能够使用你需要使用的库和服务来运行任务。
例如,如果您使用外部密钥后端,请确保有一个任务用于获取连接。如果使用KubernetesPodOperator,添加一个运行sleep 30; echo "hello"的任务。如果需要写入s3,请在测试任务中执行。如果需要访问数据库,添加一个从服务器执行select 1的任务。
升级前修剪数据¶
某些数据库迁移操作可能非常耗时。如果您的元数据库非常大,建议在执行升级前使用db clean命令清理部分旧数据。请谨慎使用。
处理冲突/复杂的Python依赖关系¶
Airflow有许多Python依赖项,有时Airflow的依赖项会与任务代码所需的依赖项发生冲突。默认情况下,Airflow环境只是一组Python依赖项和单一的Python环境,因此经常会出现某些任务需要与其他任务不同的依赖项,而这些依赖项之间基本上存在冲突的情况。
如果您使用预定义的Airflow Operators与外部服务通信,选择虽然不多,但这些操作符通常会有不冲突的基础Airflow依赖。Airflow采用约束机制,意味着社区保证您可以使用一组"固定"的依赖来安装Airflow(包括所有社区提供的扩展包)而不会引发冲突。不过,您可以独立升级这些扩展包,它们的约束不会限制您,因此依赖冲突的可能性较低(仍需测试这些依赖)。因此,当使用预定义操作符时,您遇到依赖冲突问题的可能性很小甚至没有。
然而,当你以更"现代的方式"使用Airflow时,比如使用TaskFlow API并且大部分操作符都是用自定义Python代码编写的,或者当你想编写自己的自定义操作符时,可能会遇到这样的情况:你的自定义代码所需的依赖项与Airflow的依赖项发生冲突,甚至你的多个自定义操作符的依赖项之间也会产生冲突。
有多种策略可以用来缓解这个问题。虽然在自定义操作器中处理依赖冲突很困难,但当涉及到使用airflow.operators.python.PythonVirtualenvOperator或airflow.operators.python.ExternalPythonOperator时,实际上会容易得多——无论是直接使用传统的"操作器"方法,还是通过使用带有@task.virtualenv或@task.external_python装饰器的任务(如果你使用TaskFlow的话)。
让我们从最容易实施的策略开始(虽然有一些限制和开销),然后逐步介绍那些需要对您的Airflow部署进行一些更改的策略。
使用PythonVirtualenvOperator¶
这是使用最简单但功能最有限的策略。PythonVirtualenvOperator允许你动态创建一个虚拟环境,你的Python可调用函数将在其中执行。在现代的TaskFlow方法中(如Working with TaskFlow所述),也可以通过使用@task.virtualenv装饰器来修饰你的可调用函数(这是使用该操作符的推荐方式)。每个airflow.operators.python.PythonVirtualenvOperator任务都可以拥有自己独立的Python虚拟环境(每次任务运行时动态创建),并且可以指定该任务执行所需的精细化的依赖包集合。
该操作员负责以下事项:
根据您的环境创建虚拟环境
将您的Python可调用对象序列化并通过virtualenv Python解释器执行
执行它并获取可调用对象的结果,如果指定则通过xcom推送结果
该操作器的优势包括:
无需提前准备虚拟环境。它将在任务运行前动态创建,并在完成后自动移除,因此除了需要在Airflow依赖中包含virtualenv包外,无需任何特殊操作即可使用多个虚拟环境
你可以在同一批工作节点上运行具有不同依赖集的任务 - 这样内存资源就能被重复利用(不过请参阅下面关于创建虚拟环境时涉及的CPU开销部分)。
在大型安装环境中,DAG作者无需请求他人为您创建虚拟环境。 作为DAG作者,您只需安装virtualenv依赖项,即可根据需求自由指定和修改环境配置。
部署需求无需变更 - 无论您使用本地虚拟环境、Docker还是Kubernetes,任务都能正常运行,无需为部署添加任何额外配置。
作为DAG作者,无需深入了解容器和Kubernetes。采用这种方式编写DAG仅需掌握Python相关知识即可。
该操作符存在一些限制和额外开销:
您的Python可调用对象必须可序列化。有许多Python对象无法使用标准
pickle库进行序列化。您可以通过使用dill库来缓解其中一些限制,但即使该库也无法解决所有序列化限制。所有在Airflow环境中不可用的依赖项必须在您使用的可调用对象中本地导入,并且您的DAG的顶层Python代码不应导入/使用这些库。
虚拟环境运行在同一个操作系统中,因此它们不能存在系统级别的依赖冲突(如
apt或yum可安装的软件包)。只有Python依赖可以独立安装在这些环境中。该操作符会为每个任务运行增加CPU、网络和耗时开销 - Airflow必须为每个任务从头开始重新创建虚拟环境
工作节点需要能够访问PyPI或私有代码库以安装依赖项
动态创建virtualenv容易发生临时性故障(例如当您的代码库不可用或存在访问存储库的网络问题时)
很容易陷入一个"过于"动态的环境 - 由于您安装的依赖项可能会升级,它们的传递依赖项可能会独立升级,最终可能导致您的任务停止工作,因为有人发布了依赖项的新版本,或者您可能成为"供应链"攻击的受害者,其中依赖项的新版本可能变得恶意
任务仅通过在不同环境中运行实现彼此隔离。这意味着运行中的任务仍可能相互干扰 - 例如在同一工作节点上执行的后续任务可能会受到前序任务创建/修改文件等操作的影响。
你可以在Taskflow Virtualenv example中查看使用airflow.operators.python.PythonVirtualenvOperator的详细示例
使用ExternalPythonOperator¶
在2.4版本中新增。
虽然稍微复杂一些,但使用airflow.operators.python.ExternalPythonOperator能显著减少开销、安全性和稳定性问题。在现代的TaskFlow方法中(如Working with TaskFlow所述),也可以通过使用@task.external_python装饰器来修饰可调用对象(这是推荐的使用方式)。不过,这要求您必须预先准备一个不可变的Python环境。与airflow.operators.python.PythonVirtualenvOperator不同,您不能向这种预先存在的环境添加新的依赖项。所有需要的依赖项都必须预先添加到您的环境中,并且如果您的Airflow运行在分布式环境中,这些依赖项必须在所有工作节点上都可用。
这种方式可以避免重新创建虚拟环境的开销和问题,但这些环境必须与Airflow安装一起准备和部署。通常需要Airflow安装管理人员参与,而在大型安装中,这些人通常与DAG作者(开发运维/系统管理员)不同。
这些虚拟环境可以通过多种方式准备 - 如果使用LocalExecutor,它们只需安装在运行调度程序的机器上;如果使用分布式Celery虚拟环境安装,应该建立一个跨多台机器安装这些虚拟环境的流水线;最后如果使用Docker镜像(例如通过Kubernetes),则应将虚拟环境创建添加到自定义镜像构建的流水线中。
该操作器的优势包括:
运行任务时无需设置开销。当您开始运行任务时,虚拟环境已准备就绪。
你可以在同一批工作节点上运行具有不同依赖集的任务——这样所有资源都能被重复利用。
无需让工作节点访问PyPI或私有代码仓库。降低因网络问题导致临时错误的概率。
依赖项可以由管理员和安全团队预先审核,不会动态添加意外的新代码。这对安全性和稳定性都有好处。
对您的部署影响有限 - 您无需切换到Docker容器或Kubernetes就能充分利用该操作器。
作为DAG编写者无需深入了解容器和Kubernetes。只需掌握Python和相关要求即可通过这种方式编写DAG。
缺点:
您的环境需要预先准备好虚拟环境。这通常意味着您无法即时更改它,添加新需求或更改需求至少需要重新部署Airflow,并且在开发新版本时的迭代时间可能会更长。
您的Python可调用对象必须可序列化。有许多Python对象无法使用标准
pickle库进行序列化。您可以通过使用dill库来缓解其中一些限制,但即使该库也无法解决所有序列化限制。所有在Airflow环境中不可用的依赖项,必须在您使用的可调用对象中本地导入,并且您的DAG的顶层Python代码不应导入/使用这些库。
虚拟环境运行在同一个操作系统中,因此它们不能存在系统级依赖冲突(
apt或yum可安装的软件包)。只有Python依赖项可以在这些环境中独立安装任务之间仅通过在不同环境中运行来实现隔离。这意味着运行中的任务仍可能相互干扰——例如,在同一工作节点上执行的后续任务可能会受到前序任务创建/修改文件等操作的影响。
你可以将PythonVirtualenvOperator和ExternalPythonOperator视为对应工具 -
它们能更顺畅地从开发阶段过渡到生产阶段。作为DAG作者,你通常会使用PythonVirtualenvOperator
(通过@task.virtualenv装饰器修饰任务)进行依赖项迭代和DAG开发,而在完成迭代和修改后,
你可能希望在生产环境中改用ExternalPythonOperator(和@task.external_python),
前提是你的DevOps/系统管理员团队已在生产环境的预存虚拟环境中部署了新依赖项。
这样做的好处是你可以随时切换回装饰器,继续使用PythonVirtualenvOperator进行"动态"开发。
您可以在Taskflow External Python example中查看使用airflow.operators.python.ExternalPythonOperator的详细示例
使用DockerOperator或Kubernetes Pod Operator¶
另一种策略是使用 airflow.providers.docker.operators.docker.DockerOperator
airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator
这些要求Airflow能够访问Docker引擎或Kubernetes集群。
与Python运算符的情况类似,如果您希望使用这些运算符来执行可调用的Python代码,任务流装饰器会非常方便。
然而,这种方式要复杂得多 - 如果你想采用这种方法,需要理解Docker/Kubernetes Pods的工作原理,但任务之间是完全隔离的,你甚至不受限于运行Python代码。你可以用任何编程语言编写任务。此外,你的依赖项与Airflow的依赖项(包括系统级依赖项)完全独立,因此如果你的任务需要完全不同的环境,这就是最佳选择。
在2.2版本中新增。
从Airflow 2.2版本开始,您可以使用@task.docker装饰器通过DockerOperator来运行您的函数。
在2.4版本中新增。
从Airflow 2.2版本开始,您可以使用@task.kubernetes装饰器通过KubernetesPodOperator来运行您的函数。
使用这些操作符的好处包括:
你可以运行具有不同Python和系统级依赖集的任务,甚至是用完全不同的语言编写的任务,或者不同的处理器架构(x86 vs arm)。
用于运行任务的环境利用了容器的优化和不可变性优势,其中一组相似的依赖项可以有效地重用镜像的多个缓存层,因此该环境针对拥有多个相似但不同的环境的情况进行了优化。
依赖项可以由管理员和安全团队预先审核,不会动态添加意外的新代码。这对安全性和稳定性都有好处。
任务之间的完全隔离。除了使用标准的Airflow XCom机制外,它们无法以其他方式相互影响。
缺点:
启动任务会有一定的开销。通常不像动态创建虚拟环境时那么大,但仍然很显著(尤其是对于
KubernetesPodOperator)。对于TaskFlow装饰器,需要将整个调用方法序列化并发送到Docker容器或Kubernetes Pod,而方法大小存在系统级限制。序列化、发送以及在远程端反序列化该方法也会增加额外开销。
运行这些操作符的任务会带来多进程的资源开销。至少需要两个进程——一个进程(运行在Docker容器或Kubernetes Pod中)执行任务,以及Airflow工作节点中的一个监督进程,该进程负责将作业提交到Docker/Kubernetes并监控执行过程。
您的环境需要预先准备好容器镜像。这通常意味着您无法实时更改它们。添加系统依赖项、修改或更改Python需求需要重新构建并发布镜像(通常在您的私有注册表中)。当您处理新依赖项时,迭代时间通常较长,如果开发人员在迭代过程中更改了依赖项,则需要构建并使用自己的镜像。在这里,一个适当的部署流水线对于可靠地维护您的部署至关重要。
如果你想通过装饰器运行你的python可调用对象,它必须是可序列化的。此外,在这种情况下,Airflow环境中不可用的所有依赖项必须在你使用的可调用对象中本地导入,并且你的DAG的顶层Python代码不应导入/使用这些库。
你需要更深入地了解Docker容器或Kubernetes的工作原理。这两种技术提供的抽象是"有漏洞的",因此你需要对资源、网络、容器等有更多了解,才能编写使用这些操作符的DAG。
你可以查看使用airflow.operators.providers.Docker的详细示例
Taskflow Docker示例
以及airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator
Taskflow Kubernetes示例
使用多个Docker镜像和Celery队列¶
有可能(虽然需要对Airflow部署有深入了解)通过使用多个独立的Docker镜像来运行Airflow任务。这可以通过将不同任务分配到不同队列,并配置Celery工作节点为不同队列使用不同镜像来实现。然而,这种方式(至少目前)需要大量手动部署配置,以及对Airflow、Celery和Kubernetes运作原理的深入理解。同时它会为任务运行引入相当多的开销——资源复用的机会更少,而且在不影响性能和稳定性的前提下,对这种部署方式进行资源成本优化也更为困难。
使其更有用的可能方式之一是 AIP-46 Runtime isolation for Airflow tasks and DAG parsing。 以及完成AIP-43 DAG Processor Separation。 在这些功能实现之前,使用此方法几乎没有好处,因此不建议使用。
当这些AIPs实现后,这将开启更灵活的多租户方案可能性,多个团队将能够拥有完全隔离的依赖项集合,这些依赖项将贯穿DAG的完整生命周期——从解析到执行。