模块管理

Airflow允许您在DAG和Airflow配置中使用自定义的Python模块。本文将介绍如何创建自定义模块以确保Airflow能正确加载,并诊断模块加载失败时的问题。

通常您会希望在Airflow部署中使用自己的Python代码,例如公共代码、库,您可能希望通过共享的Python代码生成DAG并拥有多个DAG Python文件。

您可以通过以下任一方式实现:

  • 将您的模块添加到Airflow自动添加到PYTHONPATH的文件夹之一中

  • 将存放代码的额外文件夹添加到PYTHONPATH

  • 将您的代码打包成一个Python包,并与Airflow一起安装。

下一章节将概述Python如何加载包和模块,并深入探讨上述三种可能性的具体细节。

Python中包/模块加载的工作原理

Python尝试加载模块的目录列表由变量sys.path给出。Python会根据操作系统、安装方式以及使用的Python版本,智能地确定该变量的内容

您可以通过运行如下示例中的交互式终端来检查当前Python环境中该变量的内容:

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.8/site-packages']

sys.path在程序启动时初始化。最高优先级是当前目录,即path[0]表示包含当前执行脚本的目录(如果是交互式shell则为空字符串)。次优先级是PYTHONPATH环境变量(如果提供的话),最后是由site模块管理的安装依赖默认路径。

sys.path 也可以在Python会话期间通过简单的append操作进行修改(例如 sys.path.append("/path/to/custom/package"))。一旦添加了新路径,Python就会开始在这些新路径中搜索包。Airflow利用了这一特性,具体描述见章节Adding directories to the PYTHONPATH

在变量sys.path中有一个名为site-packages的目录,它包含了已安装的外部包,这意味着你可以使用pipanaconda安装包,并在Airflow中使用它们。在下一节中,你将学习如何创建自己的简单可安装包,以及如何使用环境变量PYTHONPATH来指定要添加到sys.path的额外目录。

同时确保在您的文件夹中添加初始化文件

包的典型结构

这是一个示例结构,您可能在dags文件夹中拥有:

<DIRECTORY ON PYTHONPATH>
| .airflowignore  -- only needed in ``dags`` folder, see below
| -- my_company
              | __init__.py
              | common_package
              |              |  __init__.py
              |              | common_module.py
              |              | subpackage
              |                         | __init__.py
              |                         | subpackaged_util_module.py
              |
              | my_custom_dags
                              | __init__.py
                              | my_dag1.py
                              | my_dag2.py
                              | base_dag.py

在上述情况下,以下是导入python文件的方式:

from my_company.common_package.common_module import SomeClass
from my_company.common_package.subpackage.subpackaged_util_module import AnotherClass
from my_company.my_custom_dags.base_dag import BaseDag

你可以在文件夹根目录看到.airflowignore文件。这个文件可以放在你的dags文件夹中,用于告诉Airflow当调度器查找DAG时应该忽略该文件夹中的哪些文件。文件中应包含要忽略路径的正则表达式(默认)或通配符表达式。你不需要在PYTHONPATH的其他文件夹中放置该文件(而且其他文件夹中只能存放共享代码,不能存放实际的DAG文件)。

在上面的例子中,DAGs仅位于my_custom_dags文件夹中,调度器在搜索DAGs时不应扫描common_package,因此我们应该忽略common_package文件夹。如果您在那里保留了一个基础DAG,而my_dag1.pymy_dag2.py都继承自它,您还需要忽略base_dag.py。那么您的.airflowignore文件应该如下所示:

my_company/common_package/.*
my_company/my_custom_dags/base_dag\.py

如果将 DAG_IGNORE_FILE_SYNTAX 设置为 glob,等效的 .airflowignore 文件将是:

my_company/common_package/
my_company/my_custom_dags/base_dag.py

Airflow 内置的 PYTHONPATH 条目

Airflow在运行时动态地将三个目录添加到sys.path中:

  • dags 文件夹:该路径在 [core] 部分的 dags_folder 选项中配置。

  • config 文件夹:默认通过设置 AIRFLOW_HOME 变量({AIRFLOW_HOME}/config)来配置。

  • plugins 文件夹:该路径通过 [core] 部分的 plugins_folder 选项进行配置。

注意

Airflow 2中的DAGS文件夹不应与webserver共享。虽然可以这样做,但与Airflow 1.10不同,Airflow并不要求webserver中存在DAGS文件夹。实际上,将dags文件夹与webserver共享存在一定的安全风险,因为这意味着编写DAG的人可以编写webserver能够执行的代码(理想情况下,webserver不应运行可由编写DAG的用户修改的代码)。因此,如果需要与webserver共享某些代码,强烈建议通过configplugins文件夹,或通过已安装的Airflow包(见下文)进行共享。这些文件夹通常由不同的用户(管理员/DevOps)管理并访问,而不是DAG文件夹(通常由数据科学家使用),因此它们被认为是安全的,因为它们是Airflow安装配置的一部分,并由管理安装的人员控制。

代码命名最佳实践

在导入代码时,有几个需要注意的陷阱。

有时,你可能会看到从Airflow或其他使用的库代码中抛出module 'X' has no attribute 'Y'异常。这通常是由于你在PYTHONPATH顶层有一个名为'X'的模块或包,导致系统导入的是这个模块而非原始代码期望的模块。

您应该始终为您的包和模块使用唯一名称,以下将介绍如何确保强制执行这种唯一性的方法。

使用唯一的顶层包名

最重要的是,避免对直接添加到PYTHONPATH顶层的任何内容使用通用名称。例如,如果将带有__init__.pyairflow文件夹添加到DAGS_FOLDER中,它会与Airflow包冲突,导致无法从Airflow包中导入任何内容。同样,不要直接在那里添加airflow.py文件。此外,标准库包(如multiprocessinglogging等)常用的名称也不应作为顶层使用——无论是作为包(即带有__init__.py的文件夹)还是作为模块(即.py文件)。

同样适用于configplugins文件夹,它们也位于PYTHONPATH中,以及您手动添加到PYTHONPATH的任何内容(详见后续章节)。

建议您始终将DAGs/通用文件放在部署特有的子包中(如下例中的my_company)。使用通用名称作为文件夹名称很容易与系统中已存在的其他包发生冲突。例如,如果您创建airflow/operators子文件夹,它将无法访问,因为Airflow已经有一个名为airflow.operators的包,当导入from airflow.operators时会优先查找该包。

不要使用相对导入

切勿使用Python 3中添加的相对导入(以.开头)。

这样做很诱人,比如在my_dag1.py中:

from .base_dag import BaseDag  # NEVER DO THAT!!!!

您应该使用完整路径导入此类共享DAG(从已添加到PYTHONPATH的目录开始):

from my_company.my_custom_dags.base_dag import BaseDag  # This is cool

相对导入方式往往违反直觉,并且根据Python代码的启动方式不同,其行为也可能有所差异。在Airflow中,同一个DAG文件可能会在不同上下文中被解析(例如由调度器、工作节点或在测试期间),在这些情况下,相对导入的行为可能不一致。因此在Airflow DAG中进行任何导入时,请始终使用完整的Python包路径,这将为您避免许多麻烦。您可以在这个Stack Overflow讨论串中了解更多关于相对导入的注意事项。

在包文件夹中添加__init__.py

创建文件夹时,您应该在文件夹中添加__init__.py文件作为空文件。虽然在Python 3中存在隐式命名空间的概念,您不必将这些文件添加到文件夹中,但Airflow期望您将所有添加的包中都包含这些文件。

检查你的PYTHONPATH加载配置

你也可以使用airflow info命令查看具体路径, 并像使用环境变量PYTHONPATH指定的目录一样使用它们。 该命令指定的sys.path变量内容示例如下:

Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/rootcss/venvs/airflow/lib/python3.8/site-packages:/home/rootcss/airflow/dags:/home/rootcss/airflow/config:/home/rootcss/airflow/plugins]

以下是airflow info命令的示例输出:

Apache Airflow: 2.0.0b3

System info
OS              | Linux
architecture    | x86_64
uname           | uname_result(system='Linux', node='85cd7ab7018e', release='4.19.76-linuxkit', version='#1 SMP Tue May 26 11:42:35 UTC 2020', machine='x86_64', processor='')
locale          | ('en_US', 'UTF-8')
python_version  | 3.8.6 (default, Nov 25 2020, 02:47:44)  [GCC 8.3.0]
python_location | /usr/local/bin/python

Tools info
git             | git version 2.20.1
ssh             | OpenSSH_7.9p1 Debian-10+deb10u2, OpenSSL 1.1.1d  10 Sep 2019
kubectl         | NOT AVAILABLE
gcloud          | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql           | mysql  Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)
sqlite3         | 3.27.2 2019-02-25 16:06:06 bd49a8271d650fa89e446b42e513b595a717b9212c91dd384aab871fc1d0alt1
psql            | psql (PostgreSQL) 11.9 (Debian 11.9-0+deb10u1)

Paths info
airflow_home    | /root/airflow
system_path     | /usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path     | /usr/local/bin:/opt/airflow:/files/plugins:/usr/local/lib/python38.zip:/usr/local/lib/python3.8:/usr/
                | local/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/site-packages:/files/dags:/root/airflow/conf
                | ig:/root/airflow/plugins
airflow_on_path | True

Config info
executor             | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn     | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder          | /files/dags
plugins_folder       | /root/airflow/plugins
base_log_folder      | /root/airflow/logs

Providers info
apache-airflow-providers-amazon           | 1.0.0b2
apache-airflow-providers-apache-cassandra | 1.0.0b2
apache-airflow-providers-apache-druid     | 1.0.0b2
apache-airflow-providers-apache-hdfs      | 1.0.0b2
apache-airflow-providers-apache-hive      | 1.0.0b2

将目录添加到 PYTHONPATH

你可以使用环境变量PYTHONPATH来指定需要添加到sys.path的额外目录。通过以下命令启动python shell时,请提供你项目的根目录路径:

PYTHONPATH=/home/arch/projects/airflow_operators python

sys.path 变量的内容如下所示:

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/projects/airflow_operators'
 '/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
 '/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.8/site-packages']

可以看到我们提供的目录现在已添加到路径中,让我们尝试导入这个包:

>>> import airflow_operators
Hello from airflow_operators
>>>

我们也可以在使用airflow命令时配合PYTHONPATH环境变量。 例如,当我们运行以下airflow命令时:

PYTHONPATH=/home/arch/projects/airflow_operators airflow info

我们将看到Python PATH已更新为我们提到的PYTHONPATH值,如下所示:

Python PATH: [/home/arch/venv/bin:/home/arch/projects/airflow_operators:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/arch/venv/lib/python3.8/site-packages:/home/arch/airflow/dags:/home/arch/airflow/config:/home/arch/airflow/plugins]

在Python中创建包

这是添加自定义代码最有条理的方式。得益于使用包,您可以组织版本控制方法,控制共享代码的安装版本,并以受控方式将代码部署到所有实例和容器中——所有这些都由系统管理员/DevOps而非DAG编写者完成。当您有专门团队管理这些共享代码时,这种方式通常很合适,但如果您熟悉Python方法,也可以在小规模部署中以这种方式分发代码。您还可以将插件Provider包作为Python包安装,因此学习如何构建自己的包非常实用。

以下是创建您的软件包的方法:

1. 开始之前,请选择并安装您将使用的构建/打包工具,理想情况下它应符合PEP-621标准,以便能够轻松切换到其他工具。 常见的选择包括setuptools、poetry、hatch和flit。

  1. 在创建自己的包时决定。创建包目录 - 在我们的例子中,我们将称之为airflow_operators

mkdir airflow_operators
  1. 在包内创建文件 __init__.py 并添加以下代码:

print("Hello from airflow_operators")

当我们导入这个包时,它应该会打印上述消息。

4. 创建 pyproject.toml 并填入您选择的构建工具配置 参见 The pyproject.toml specification

  1. 使用您选择的工具构建项目。例如对于hatch可以是:

hatch build -t wheel

这将在你的dist文件夹中创建.whl文件

  1. 使用pip安装.whl文件:

pip install dist/airflow_operators-0.0.0-py3-none-any.whl
  1. 该软件包现在可以使用了!

>>> import airflow_operators
Hello from airflow_operators
>>>

可以使用pip命令移除该包:

pip uninstall airflow_operators

有关如何创建和发布Python包的更多详细信息,请参阅Packaging Python Projects

这篇内容对您有帮助吗?