管理连接

另请参阅

有关钩子和连接的概述,请参阅Connections & Hooks

Airflow的Connection对象用于存储凭证和连接到外部服务所需的其他信息。

连接可以通过以下方式定义:

将连接信息存储在环境变量中

Airflow连接可以通过环境变量来定义。

命名规范为 AIRFLOW_CONN_{CONN_ID},全部大写(注意CONN两侧的单下划线)。因此,如果您的连接ID是my_prod_db,那么变量名应为AIRFLOW_CONN_MY_PROD_DB

该值可以是JSON格式或Airflow的URI格式。

JSON格式示例

在2.3.0版本中新增。

如果使用JSON序列化:

export AIRFLOW_CONN_MY_PROD_DATABASE='{
    "conn_type": "my-conn-type",
    "login": "my-login",
    "password": "my-password",
    "host": "my-host",
    "port": 1234,
    "schema": "my-schema",
    "extra": {
        "param1": "val1",
        "param2": "val2"
    }
}'

生成JSON连接表示

添加于版本2.8.0。

为了简化连接JSON的生成,Connection类提供了一个便捷属性as_json()。可以像这样使用:

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'

此外,同样的方法可用于将Connection从URI格式转换为JSON格式

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="awesome_conn",
...     description="Example Connection",
...     uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'

URI格式示例

如果使用Airflow URI进行序列化:

export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

有关如何生成有效URI的更多详情,请参阅连接URI格式

注意

在环境变量中定义的连接不会显示在Airflow UI中,也无法通过airflow connections list命令查看。

在Secrets Backend中存储连接

您可以将Airflow连接存储在外部密钥后端中,例如HashiCorp Vault、AWS SSM参数存储等服务。更多详情请参阅Secrets Backend

在数据库中存储连接

另请参阅

连接也可以存储在环境变量外部密钥后端中,例如HashiCorp Vault、AWS SSM参数存储等。

当在数据库中存储连接时,您可以通过网页界面或Airflow CLI来管理它们。

通过用户界面创建连接

打开UI中的Admin->Connections部分。点击Create链接来创建新连接。

../_images/connection_create.png
  1. Connection Id字段中填写所需的连接ID。建议使用小写字母并用下划线分隔单词。

  2. 通过Connection Type字段选择连接类型。

  3. 填写剩余字段。有关不同连接类型所属字段的说明,请参阅 处理extra中的任意字典

  4. 点击Save按钮创建连接。

通过用户界面编辑连接

打开UI中的Admin->Connections部分。在连接列表中点击您想要编辑的连接旁边的铅笔图标。

../_images/connection_edit.png

修改连接属性并点击Save按钮保存您的更改。

通过CLI创建连接

您可以通过CLI添加数据库连接。

您可以使用JSON格式添加连接(自2.3.0版本起):

airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

或者您也可以使用Airflow的连接URI格式(参见生成连接URI)。

airflow connections add 'my_prod_db' \
    --conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1&param2=val2&...'

最后,您也可以单独指定每个参数:

airflow connections add 'my_prod_db' \
    --conn-type 'my-conn-type' \
    --conn-login 'login' \
    --conn-password 'password' \
    --conn-host 'host' \
    --conn-port 'port' \
    --conn-schema 'schema' \
    ...

将连接导出到文件

您可以将数据库中存储的连接导出到文件(例如,用于将连接从一个环境迁移到另一个环境)。有关用法,请参阅导出连接

数据库连接的安全性

对于存储在Airflow元数据库中的连接,Airflow使用Fernet来加密密码和其他潜在敏感数据。它确保在没有加密密钥的情况下,连接密码无法被篡改或读取。有关配置Fernet的信息,请参阅Fernet

测试连接

出于安全考虑,Airflow UI、API和CLI中的测试连接功能默认处于禁用状态。

For more information on capabilities of users, see the documentation: https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users. It is strongly advised to not enable the feature until you make sure that only highly trusted UI/API users have “edit connection” permissions.

该功能的可用性可以通过Airflow配置核心部分(airflow.cfg)中的test_connection标志来控制。 也可以通过环境变量AIRFLOW__CORE__TEST_CONNECTION来控制。

该配置参数接受以下值:

  • 禁用:禁用测试连接功能,并在用户界面中禁用"测试连接"按钮。这也是Airflow配置中设置的默认值。

  • 启用:开启测试连接功能,并在用户界面中激活"测试连接"按钮。

  • 隐藏:禁用测试连接功能,并在用户界面中隐藏"测试连接"按钮。

启用测试连接后,可以通过UI中的创建编辑连接页面使用,也可以通过调用Connections REST API,或者运行airflow connections test CLI命令来使用。

警告

当使用Airflow UI或REST API时,此功能将不可用于存储在外部密钥后端中的连接。

要测试连接,Airflow会调用关联钩子类中的test_connection方法并报告结果。可能会出现连接类型没有关联钩子或钩子未实现test_connection方法的情况,这两种情况下都会显示错误消息或在用户界面中禁用该功能(如果您在UI中进行测试)。

注意

在Airflow UI中进行测试时,测试会从网络服务器执行,因此此功能受限于您为网络服务器设置的网络出口规则。

注意

如果Web服务器和工作机器(如果通过Airflow UI测试)或机器/Pod(如果通过Airflow CLI测试)安装了不同的库或提供者包,测试结果可能会有所不同。

自定义连接类型

Airflow允许定义自定义连接类型——包括修改连接的添加/编辑表单。自定义连接类型由社区维护的提供者定义,但您也可以添加提供自定义连接类型的自定义提供者。有关如何添加自定义提供者的描述,请参阅Provider packages

自定义连接类型由提供者交付的Hooks定义。Hooks可以实现协议类DiscoverableHook中定义的方法。请注意,您的自定义Hook不应继承自该类,该类是一个示例,用于记录关于您的Hook可能定义的类字段和方法的期望。另一个很好的例子是JdbcHook

通过在您的hooks中实现这些方法,并通过provider元数据中的connection-types数组(以及已弃用的hook-class-names)暴露它们,您可以通过以下方式自定义Airflow:

  • 添加自定义连接类型

  • 根据连接类型自动创建Hook

  • 添加自定义表单控件以显示和编辑连接URL中的自定义"额外"参数

  • 隐藏连接中未使用的字段

  • 添加占位符,展示字段应如何格式化的示例

您可以在Provider packages中阅读更多关于如何添加自定义provider包的详细信息

自定义连接字段

可以在Airflow网页服务器的连接添加/编辑视图中添加自定义表单字段。自定义字段以JSON格式存储在Connection.extra字段中。要添加自定义字段,需实现方法get_connection_form_widgets()。该方法应返回一个字典,其中键应为字段的字符串名称(该名称将存储在extra字典中),值应为wtforms.fields.core.Field的继承类。

以下是一个示例:

@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
    """Returns connection widgets to add to connection form"""
    from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
    from flask_babel import lazy_gettext
    from wtforms import StringField

    return {
        "workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
        "project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
    }

注意

自定义字段不再需要extra__ type>__前缀

在Airflow 2.3之前,如果你想在用户界面中添加自定义字段,必须为其添加extra__ type>__前缀,其值会以这种方式存储在extra字典中。从2.3版本开始,你不再需要这样做了。

方法 get_ui_field_behaviour() 允许您自定义两者的行为。例如,您可以隐藏或重新标记字段(例如,如果它未被使用或重新用途),并且可以添加占位文本。

一个示例:

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
    """Returns custom field behaviour"""
    return {
        "hidden_fields": ["port", "host", "login", "schema"],
        "relabeling": {},
        "placeholders": {
            "password": "Asana personal access token",
            "workspace": "My workspace gid",
            "project": "My project gid",
        },
    }

注意

如果你想为某个extra字段添加表单占位符,而该字段名称与标准连接属性(如login、password、host、scheme、port、extra)冲突,那么你必须为其添加前缀extra__ type>__。例如extra__myservice__password

查看提供者以了解您可以实现的功能示例,例如JdbcHookAsanaHook都利用了此特性。

注意

已弃用 hook-class-names

在 Airflow 2.2.0 之前,提供者中的连接通过提供者元数据中的 hook-class-names 数组暴露。然而,事实证明在 worker 中使用单独的 hook 时效率低下,因此现在用 connection-types 数组取代了 hook-class-names 数组。在提供者支持 2.2.0 以下版本的 Airflow 之前,应同时保留 connection-typeshook-class-names。CI 构建期间的自动化检查将验证这两个数组的一致性。

URI格式

注意

从2.3.0版本开始,您可以使用JSON来序列化连接。参见example

由于历史原因,Airflow采用了一种特殊的URI格式,可用于将Connection对象序列化为字符串值。

通常,Airflow的URI格式如下所示:

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

上述URI将生成一个等同于以下内容的Connection对象:

Connection(
    conn_id="",
    conn_type="my_conn_type",
    description=None,
    login="my-login",
    password="my-password",
    host="my-host",
    port=5432,
    schema="my-schema",
    extra=json.dumps(dict(param1="val1", param2="val2")),
)

生成连接URI

为了简化连接URI的生成过程,Connection类提供了一个便捷方法get_uri()。使用方法如下:

>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'

此外,如果您已创建连接,可以使用airflow connections get命令。

$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db

处理extra中的任意字典

某些JSON结构无法在不丢失信息的情况下进行URL编码。对于这类JSON,get_uri会将整个字符串存储在URL查询参数__extra__下。

例如:

>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
...     conn_type="scheme",
...     host="host/location",
...     schema="schema",
...     login="user",
...     password="password",
...     port=1234,
...     extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'

我们可以验证它返回的是同一个字典:

>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True

但对于最常见的仅存储键值对的情况,会使用普通的URL编码。

你可以像这样验证URI是否被正确解析:

>>> from airflow.models.connection import Connection

>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password

连接参数中特殊字符的处理

注意

在生成连接时,可以使用便捷方法Connection.get_uri,如章节Generating a Connection URI所述。 本节内容仅供参考。

手动构建URI时,某些字符需要特殊处理。

例如,如果您的密码包含 /,则会失败:

>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'

要解决这个问题,你可以使用quote_plus()进行编码:

>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.password)
my-pa/ssword

这篇内容对您有帮助吗?