设置数据库后端

Airflow 设计用于通过 SqlAlchemy 与其元数据进行交互。

以下文档描述了数据库引擎的配置、为与Airflow配合使用所需对其配置进行的必要更改,以及修改Airflow配置以连接这些数据库的相关内容。

选择数据库后端

如果您想真正试用Airflow,应考虑将数据库后端设置为PostgreSQLMySQL。 默认情况下,Airflow使用SQLite,这仅适用于开发目的。

Airflow支持以下数据库引擎版本,请确认您所使用的版本。旧版本可能不支持所有SQL语句。

  • PostgreSQL: 12, 13, 14, 15, 16

  • MySQL: 8.0版本, Innovation

  • SQLite: 3.15.0+

如果您计划运行多个调度器,必须满足额外要求。详情请参阅Scheduler HA Database Requirements

警告

尽管MariaDB和MySQL之间有很多相似之处,但我们不支持将MariaDB作为Airflow的后端数据库。MariaDB和MySQL之间存在已知问题(例如索引处理),我们既没有在MariaDB上测试过迁移脚本,也没有测试过应用程序执行。我们知道有些人曾将MariaDB用于Airflow,这给他们带来了很多运维难题,因此我们强烈不建议尝试使用MariaDB作为后端数据库,用户也不能期望获得任何社区支持,因为尝试将MariaDB用于Airflow的用户数量非常少。

数据库URI

Airflow使用SQLAlchemy连接数据库,这需要您配置数据库URL。 您可以在[database]部分的sql_alchemy_conn选项中配置此项。通常也可以通过 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN环境变量来配置此选项。

注意

有关设置配置的更多信息,请参阅设置配置选项

如果想查看当前值,可以使用airflow config get-value database sql_alchemy_conn命令,如下例所示。

$ airflow config get-value database sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db

确切的格式描述在SQLAlchemy文档中有详细说明,请参阅Database Urls。我们下面也会展示一些示例。

设置SQLite数据库

SQLite数据库可用于运行Airflow进行开发,因为它不需要任何数据库服务器(数据库存储在本地文件中)。使用SQLite数据库存在许多限制(例如它只能与Sequential Executor一起使用),并且绝不应该用于生产环境。

运行Airflow 2.0+需要最低版本的sqlite3 - 最低要求版本为3.15.0。某些旧系统默认安装的sqlite版本较旧,这些系统需要手动升级SQLite至3.15.0以上版本。请注意,这里指的不是python library版本,而是需要升级系统级的SQLite应用程序。SQLite的安装方式有多种,您可以在SQLite官方网站及操作系统发行版的特定文档中找到相关信息。

故障排除

有时即使你将SQLite升级到更高版本,并且本地Python报告了更高版本, Airflow使用的Python解释器可能仍然使用启动Airflow时设置的LD_LIBRARY_PATH中可用的旧版本。

你可以通过运行以下检查来确认解释器使用的是哪个版本:

root@b8a8e73caa2c:/opt/airflow# python
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.27.2'
>>>

但请注意,为您的Airflow部署设置环境变量可能会改变系统首先找到的SQLite库,因此您可能需要确保系统中安装的唯一SQLite版本是"足够高"的版本。

SQLite数据库的示例URI:

sqlite:////home/airflow/airflow.db

在AmazonLinux AMI或容器镜像上升级SQLite

AmazonLinux上的SQLite只能通过源码仓库升级到v3.7版本。Airflow要求v3.15或更高版本。请按照以下说明使用最新的SQLite3设置基础镜像(或AMI)

前提条件:升级过程需要安装wgettargzipgccmakeexpect这些工具。

yum -y install wget tar gzip gcc make expect

https://sqlite.org/下载源码,在本地进行编译和安装。

wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
    -DSQLITE_ENABLE_FTS3_PARENTHESIS \
    -DSQLITE_ENABLE_FTS4 \
    -DSQLITE_ENABLE_FTS5 \
    -DSQLITE_ENABLE_JSON1 \
    -DSQLITE_ENABLE_LOAD_EXTENSION \
    -DSQLITE_ENABLE_RTREE \
    -DSQLITE_ENABLE_STAT4 \
    -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
    -DSQLITE_SOUNDEX \
    -DSQLITE_TEMP_STORE=3 \
    -DSQLITE_USE_URI \
    -O2 \
    -fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install

安装后请将/usr/local/lib添加到库路径中

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

设置PostgreSQL数据库

您需要创建一个数据库和一个数据库用户,供Airflow用来访问该数据库。 在下面的示例中,将创建一个名为airflow_db的数据库,以及用户名为airflow_user、密码为airflow_pass的用户

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
-- PostgreSQL 15 requires additional privileges:
GRANT ALL ON SCHEMA public TO airflow_user;

注意

数据库必须使用UTF-8字符集

您可能需要更新Postgres的pg_hba.conf文件,将 airflow用户添加到数据库访问控制列表中;并重新加载 数据库配置以使更改生效。详情请参阅Postgres文档中的 The pg_hba.conf File 部分。

警告

当您使用SQLAlchemy 1.4.0及以上版本时,需要在sql_alchemy_conn中使用postgresql://作为数据库连接字符串。 在早期版本的SQLAlchemy中可以使用postgres://,但在SQLAlchemy 1.4.0+中使用会导致以下错误:

>       raise exc.NoSuchModuleError(
            "Can't load plugin: %s:%s" % (self.group, name)
        )
E       sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres

如果您无法立即更改URL前缀,Airflow仍可与SQLAlchemy 1.3版本兼容工作,您可以降级SQLAlchemy,但我们建议更新前缀。

详情请参阅SQLAlchemy变更日志

我们推荐使用psycopg2驱动并在您的SqlAlchemy连接字符串中指定它。

postgresql+psycopg2://<user>:<password>@<host>/<db>

另外请注意,由于SqlAlchemy没有提供在数据库URI中指定特定schema的方法,您需要确保schema public在Postgres用户的search_path中。

如果您为Airflow创建了一个新的Postgres账户:

  • 新建Postgres用户的默认search_path为:"$user", public,无需修改。

如果您使用当前具有自定义search_path的Postgres用户,可以通过以下命令更改search_path:

ALTER USER airflow_user SET search_path = public;

有关PostgreSQL连接设置的更多信息,请参阅SQLAlchemy文档中的PostgreSQL dialect

注意

众所周知,Airflow(尤其是在高性能配置中)会与元数据数据库建立大量连接。这可能会对Postgres的资源使用造成问题,因为在Postgres中,每个连接都会创建一个新进程,当打开大量连接时会导致Postgres资源消耗激增。因此我们建议在所有Postgres生产环境部署中使用PGBouncer作为数据库代理。PGBouncer不仅可以处理来自多个组件的连接池,而且在您使用可能存在连接不稳定的远程数据库时,它能使您的数据库连接对临时网络问题具有更强的恢复能力。

您可以在Apache Airflow的Helm Chart中找到PGBouncer部署的示例实现,通过切换一个布尔标志即可启用预配置的PGBouncer实例。即使您不使用官方Helm Chart,在准备自己的部署时,也可以参考我们采用的方案并从中获取灵感。

另请参阅 Helm Chart 生产指南

注意

对于托管Postgres服务,如Azure Postgresql、CloudSQL、Amazon RDS,您应该在连接参数中使用 keepalives_idle并将其设置为小于空闲时间,因为这些 服务会在不活动一段时间后(通常为300秒)关闭空闲连接, 这会导致错误The error: psycopg2.operationalerror: SSL SYSCALL error: EOF detected。 可以通过sql_alchemy_connect_args配置参数更改keepalive设置, 参考Configuration Reference中的[database]部分。您可以在 local_settings.py中配置这些参数,sql_alchemy_connect_args应该是存储配置参数字典的完整导入路径。您可以阅读 Postgres Keepalives。 一个观察到可以解决问题的keepalives示例设置可能是:

keepalive_kwargs = {
    "keepalives": 1,
    "keepalives_idle": 30,
    "keepalives_interval": 5,
    "keepalives_count": 5,
}

然后,如果将其放置在 airflow_local_settings.py 中,配置导入路径将是:

sql_alchemy_connect_args = airflow_local_settings.keepalive_kwargs

有关如何配置本地设置的详细信息,请参阅配置本地设置

设置MySQL数据库

您需要创建一个数据库和一个数据库用户,供Airflow用来访问该数据库。 在下面的示例中,将创建一个名为airflow_db的数据库,以及用户名为airflow_user、密码为airflow_pass的用户

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

注意

数据库必须使用UTF-8字符集。需要注意的一个小细节是,新版本MySQL中的utf8实际上是utf8mb4,这会导致Airflow索引变得过大(参见https://github.com/apache/airflow/pull/17603#issuecomment-901121618)。因此从Airflow 2.2开始,所有MySQL数据库都会自动将sql_engine_collation_for_ids设置为utf8mb3_bin(除非您手动覆盖此设置)。这可能导致Airflow数据库中ID字段的排序规则不一致,但由于Airflow中所有相关ID仅使用ASCII字符,因此不会产生负面影响。

我们为MySQL采用了更严格的ANSI SQL设置以确保合理的默认配置。 请确保在my.cnf文件的[mysqld]章节下指定了explicit_defaults_for_timestamp=1选项。 您也可以通过向mysqld可执行文件传递--explicit-defaults-for-timestamp参数来激活这些选项

我们推荐使用mysqlclient驱动,并在您的SqlAlchemy连接字符串中指定它。

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

重要

MySQL后端的集成仅在Apache Airflow的持续集成(CI)过程中使用mysqlclient驱动进行过验证。

如需使用其他驱动程序,请参阅SQLAlchemy文档中的MySQL Dialect获取有关下载和设置SqlAlchemy连接的更多信息。

此外,您还需要特别注意MySQL的编码设置。虽然utf8mb4字符集在MySQL中越来越流行(实际上MySQL8.0已默认采用utf8mb4),但在Airflow 2+中使用utf8mb4编码需要额外配置(详见#7570)。若采用utf8mb4字符集,您还需设置sql_engine_collation_for_ids=utf8mb3_bin参数。

注意

在严格模式下,MySQL不允许将0000-00-00作为有效日期。因此在某些情况下(部分Airflow表使用0000-00-00 00:00:00作为时间戳字段的默认值),您可能会遇到类似"Invalid default value for 'end_date'"的错误。 为避免此错误,您可以在MySQL服务器上禁用NO_ZERO_DATE模式。 关于如何禁用它,请参阅https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field。 更多信息请参见SQL Mode - NO_ZERO_DATE

MsSQL 数据库

警告

经过讨论投票流程, Airflow的PMC成员和Committer们达成决议,不再将MsSQL作为受支持的数据库后端进行维护。

自Airflow 2.9.0版本起,已取消对MsSQL作为Airflow数据库后端的支持。 这不会影响现有的provider包(操作器和钩子),DAG仍然可以访问和处理MsSQL中的数据。 但继续使用可能会导致错误,使Airflow的核心功能无法使用。

从MsSQL Server迁移

从Airflow 2.9.0版本开始,已终止对MSSQL的支持,迁移脚本可帮助将Airflow 2.7.x或2.8.x版本从SQL-Server迁移出去。该迁移脚本可在Github上的airflow-mssql-migration代码库中获取。

请注意,迁移脚本的提供不附带支持或担保。

其他配置选项

还有更多用于配置SQLAlchemy行为的配置选项。详情请参阅参考文档中关于[database]部分sqlalchemy_*选项的内容。

例如,您可以指定一个数据库模式,Airflow将在其中创建所需的表。如果您希望Airflow将其表安装在PostgreSQL数据库的airflow模式中,请指定以下环境变量:

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"

注意SQL_ALCHEMY_CONN数据库URL末尾的search_path

初始化数据库

在配置数据库并在Airflow配置中连接后,您应该创建数据库模式。

airflow db migrate

Airflow中的数据库监控与维护

Airflow 广泛利用关系型元数据数据库进行任务调度和执行。监控并正确配置该数据库对于优化 Airflow 性能至关重要。

关键问题

  1. 性能影响: 长时间或过多的查询会显著影响Airflow的功能。这些问题可能由于工作流特性、缺乏优化或代码错误引起。

  2. 数据库统计信息: 数据库引擎做出的错误优化决策(通常由于数据统计信息过时)可能导致性能下降。

职责

在Airflow环境中,数据库监控和维护的职责根据您使用的是自托管数据库和Airflow实例还是选择托管服务而有所不同。

自托管环境:

在数据库和Airflow均为自我管理的设置中,部署管理器负责设置、配置和维护数据库。这包括监控其性能、管理备份、定期清理以及确保其与Airflow的最佳运行状态。

托管服务:

  • 托管数据库服务:当使用托管数据库服务时,许多维护任务(如备份、补丁更新和基础监控)将由服务提供商处理。但部署管理员仍需负责监督Airflow的配置,针对特定工作流优化性能设置,管理定期清理工作,并监控数据库以确保与Airflow协同运行时达到最佳操作状态。

  • 托管Airflow服务:使用托管Airflow服务时,这些服务提供商负责配置和维护Airflow及其数据库。然而,部署管理员需要与服务配置协作,以确保资源规模和工作流需求与托管服务的规模和配置相匹配。

监控方面

常规监控应包括:

  • CPU、I/O 和内存使用情况。

  • 查询频率和数量。

  • 识别并记录慢查询或长时间运行的查询。

  • 检测低效查询执行计划。

  • 分析磁盘交换与内存使用情况及缓存交换频率。

工具与策略

  • Airflow 不提供直接的数据库监控工具。

  • 使用服务器端监控和日志记录来获取指标。

  • 根据定义的阈值启用对长时间运行查询的跟踪。

  • 定期运行维护任务(如ANALYZE SQL命令)进行维护。

数据库清理工具

  • Airflow DB清理命令: 使用airflow db clean命令来帮助管理和清理您的数据库。

  • airflow.utils.db_cleanup中的Python方法: 该模块提供了额外的Python方法用于数据库清理和维护,为特定需求提供更细粒度的控制和自定义功能。

推荐建议

  • 主动监控: 在生产环境中实施监控和日志记录,同时不会显著影响性能。

  • 数据库特定指南:查阅所选数据库的文档以获取具体的监控设置说明。

  • 托管数据库服务: 检查您的数据库提供商是否提供自动维护任务。

SQLAlchemy 日志记录

如需进行详细的查询分析,请启用SQLAlchemy客户端日志记录(在SQLAlchemy引擎配置中设置echo=True)。

  • 该方法侵入性更强,可能会影响Airflow的客户端性能。

  • 它会生成大量日志,尤其是在繁忙的Airflow环境中。

  • 适用于非生产环境,如预发布系统。

你可以通过echo=True作为sqlalchemy引擎配置来实现,如SQLAlchemy日志文档中所述。

使用 sql_alchemy_engine_args 配置参数将 echo 参数设置为 True。

注意

  • 在启用大量日志记录时,请注意对Airflow性能和系统资源的影响。

  • 在生产环境中,优先采用服务端监控而非客户端日志记录,以最大限度减少性能干扰。

接下来是什么?

默认情况下,Airflow使用SequentialExecutor,它不提供并行处理能力。为了获得更好的性能,您应该考虑配置不同的executor

这篇内容对您有帮助吗?