使用命令行界面

本文档旨在概述使用CLI时的所有常见任务。

注意

有关CLI命令的更多信息,请参阅命令行界面与环境变量参考

设置 Bash/Zsh 自动补全

当使用bash(或zsh)作为终端时,airflow可以利用 argcomplete实现自动补全功能。

要为所有启用argcomplete的Python应用程序全局激活,请运行:

sudo activate-global-python-argcomplete

要永久(但非全局)激活airflow,请使用:

register-python-argcomplete airflow >> ~/.bashrc

要仅为Airflow一次性激活argcomplete,请使用:

eval "$(register-python-argcomplete airflow)"
../_images/cli_completion.gif

如果您正在使用 zsh,请将以下内容添加到您的 .zshrc 文件中:

autoload bashcompinit
bashcompinit
eval "$(register-python-argcomplete airflow)"

创建连接

有关使用CLI创建连接的信息,请参阅Creating a Connection from the CLI

将DAG结构导出为图片

Airflow允许您将DAG结构打印或保存为图像。这对于记录或分享DAG结构非常有用。您需要安装Graphviz

例如,要在终端打印example_complex DAG:

airflow dags show example_complex

这将以DOT格式在屏幕上打印渲染后的DAG结构(类似于Graph)。

支持多种文件格式。要使用它们,请添加参数--save [filename].[format]

example_complex DAG保存为PNG文件:

airflow dags show example_complex --save example_complex.png

这将把以下图像保存为文件:

../_images/usage_cli_export.png

示例DAG表示

支持以下文件格式:

  • bmp

  • canon, dot, gv, xdot, xdot1.2, xdot1.4

  • cgimage

  • cmap

  • eps

  • exr

  • fig

  • gd, gd2

  • gif

  • gtk

  • ico

  • imap, cmapx

  • imap_np, cmapx_np

  • ismap

  • jp2

  • jpg, jpeg, jpe

  • json, json0, dot_json, xdot_json

  • pct, pict

  • pdf

  • pic

  • plain, plain-ext

  • png

  • pov

  • ps

  • ps2

  • psd

  • sgi

  • svg, svgz

  • tga

  • tif, tiff

  • tk

  • vml, vmlz

  • vrml

  • wbmp

  • webp

  • xlib

  • x11

默认情况下,Airflow会在airflow.cfg文件[core]部分中dags_folder选项指定的目录中查找DAG。您可以使用--subdir参数选择新目录。

展示DAG结构

有时您会处理包含复杂依赖关系的DAG。这时预览DAG以检查其是否正确会很有帮助。

如果你使用的是macOS系统,可以将iTerm2终端与imgcat脚本配合使用,在控制台中显示DAG结构图。同时需要确保已安装Graphviz软件。

其他终端不支持高质量图形的显示。您可以将图像转换为文本形式,但由于分辨率限制,您将无法阅读它。

要实现这一点,你应该在--imgcat命令中使用airflow dags show开关。例如,如果你想显示example_bash_operator这个DAG,可以使用以下命令:

airflow dags show example_bash_operator --imgcat

您将看到与下方截图类似的结果。

../_images/usage_cli_imgcat.png

在iTerm2中预览DAG

格式化命令输出

一些Airflow命令如airflow dags listairflow tasks states-for-dag-run支持--output标志, 允许用户更改命令输出的格式。可选选项:

  • table - 将信息渲染为纯文本表格

  • simple - 将信息渲染为简单表格,可通过标准Linux工具解析

  • json - 以json字符串的形式呈现信息

  • yaml - 以有效的yaml格式呈现信息

无论是json还是yaml格式,都能更方便地使用命令行工具如jqyq来操作数据。例如:

airflow tasks states-for-dag-run example_complex 2020-11-13T00:00:00+00:00 --output json | jq ".[] | {sd: .start_date, ed: .end_date}"
{
  "sd": "2020-11-29T14:53:46.811030+00:00",
  "ed": "2020-11-29T14:53:46.974545+00:00"
}
{
  "sd": "2020-11-29T14:53:56.926441+00:00",
  "ed": "2020-11-29T14:53:57.118781+00:00"
}
{
  "sd": "2020-11-29T14:53:56.915802+00:00",
  "ed": "2020-11-29T14:53:57.125230+00:00"
}
{
  "sd": "2020-11-29T14:53:56.922131+00:00",
  "ed": "2020-11-29T14:53:57.129091+00:00"
}
{
  "sd": "2020-11-29T14:53:56.931243+00:00",
  "ed": "2020-11-29T14:53:57.126306+00:00"
}

从元数据数据库中清除历史记录

注意

强烈建议在运行db clean命令前备份元数据库。

db clean 命令的工作原理是从每个表中删除早于指定 --clean-before-timestamp 的记录。

您可以选择提供一个表列表来执行删除操作。如果没有提供表列表,则将包含所有表。

你可以使用--dry-run选项来打印待清理主表中的行数。

默认情况下,db clean会将清除的行存档到形如_airflow_deleted__

__
的表中。如果您不希望以这种方式保留数据,可以添加参数--skip-archive

从归档表中导出已清除的记录

db export-archived 命令将归档表的内容导出为指定格式,默认为CSV文件。这些归档表是由 db clean 命令创建的。导出的文件将包含在 db clean 过程中从主表中清除的记录。

您可以使用--export-format选项指定导出格式。默认格式为csv,目前也是唯一支持的格式。

您还必须使用--output-path选项指定要导出数据的路径位置。该位置必须已存在。

其他选项包括:--tables 用于指定要导出的表,--drop-archives 用于在导出后删除归档表。

删除归档表

如果在db clean过程中,您没有使用会删除归档表的--skip-archive选项,您仍然可以使用db drop-archived命令删除归档表。此操作不可逆,建议在删除表之前使用db export-archived命令将表备份到磁盘。

您可以通过--tables选项指定要删除的表。如果未指定任何表,则将删除所有归档表。

注意级联删除

请注意,部分表定义了外键关系使用ON DELETE CASCADE级联删除机制,因此删除一个表中的记录可能会级联删除其他表中的记录。例如,task_instance表关联到dag_run表,如果删除一条DagRun记录,其关联的所有任务实例也将被删除。

DAG运行的特殊处理

通常情况下,Airflow通过查找最新的DagRun来确定接下来要运行哪个DagRun。如果删除所有DAG运行记录,Airflow可能会重新调度一个已经完成的旧DAG运行,例如当设置了catchup=True时。因此db clean命令会保留最新的非手动触发的DAG运行,以保持调度连续性。

可回填DAG的注意事项

并非所有DAG都设计用于与Airflow的回填命令配合使用。但对于那些确实适用的DAG,需要特别小心。如果您删除DAG运行记录,然后在包含已删除DAG运行记录的日期范围内执行回填操作,这些运行记录将被重新创建并再次执行。因此,如果您的DAG属于此类情况,建议避免删除DAG运行记录,仅清理其他大型表(如任务实例和日志等)。

升级Airflow

运行 airflow db migrate --help 查看使用详情。

手动运行迁移

如果需要,您可以生成升级所需的SQL语句,并手动逐个应用每个升级迁移。为此,您可以使用--range(针对Airflow版本)或--revision-range(针对Alembic修订版本)选项配合db migrate命令。切勿跳过运行Alembic修订ID更新命令;这是Airflow下次需要升级时识别您当前升级起点的方式。有关修订版本与版本之间的映射关系,请参阅Reference for Database Migrations

降级Airflow

注意

建议在运行db downgrade或其他数据库操作前备份您的数据库。

您可以使用db downgrade命令降级到特定的Airflow版本。或者,您也可以指定一个Alembic修订ID来进行降级。

如果想预览命令但不执行它们,请使用选项 --show-sql-only

选项 --from-revision--from-version 只能与 --show-sql-only 选项一起使用,因为在实际运行迁移时,我们应该始终从当前版本进行降级。

有关Airflow版本与Alembic修订版本之间的映射,请参阅数据库迁移参考

导出连接

您可以通过CLI从数据库导出连接。支持的文件格式包括jsonyamlenv

您可以将目标文件指定为参数:

airflow connections export connections.json

或者,您可以指定file-format参数来覆盖文件格式:

airflow connections export /tmp/connections --file-format yaml

你也可以为STDOUT指定-

airflow connections export -

JSON格式包含一个对象,其中键包含连接ID,值包含连接的定义。在此格式中,连接被定义为JSON对象。以下是示例JSON文件。

{
  "airflow_db": {
    "conn_type": "mysql",
    "host": "mysql",
    "login": "root",
    "password": "plainpassword",
    "schema": "airflow",
    "port": null,
    "extra": null
  },
  "druid_broker_default": {
    "conn_type": "druid",
    "host": "druid-broker",
    "login": null,
    "password": null,
    "schema": null,
    "port": 8082,
    "extra": "{\"endpoint\": \"druid/v2/sql\"}"
  }
}

YAML文件结构与JSON类似。包含连接ID的键值对以及一个或多个连接的定义。在此格式中,连接被定义为YAML对象。以下是示例YAML文件。

airflow_db:
  conn_type: mysql
  extra: null
  host: mysql
  login: root
  password: plainpassword
  port: null
  schema: airflow
druid_broker_default:
  conn_type: druid
  extra: '{"endpoint": "druid/v2/sql"}'
  host: druid-broker
  login: null
  password: null
  port: 8082
  schema: null

您也可以以.env格式导出连接。键是连接ID,值是连接的序列化表示,使用Airflow的连接URI格式或JSON。要使用JSON,请提供选项--serialization-format=json,否则将使用Airflow连接URI格式。以下是使用这两种格式的示例.env文件。

URI 示例:

airflow_db=mysql://root:plainpassword@mysql/airflow
druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql

JSON示例输出:

airflow_db={"conn_type": "mysql", "login": "root", "password": "plainpassword", "host": "mysql", "schema": "airflow"}
druid_broker_default={"conn_type": "druid", "host": "druid-broker", "port": 8082, "extra": "{\"endpoint\": \"druid/v2/sql\"}"}

测试DAG导入错误

可以通过list-import-errors子命令使用CLI检查是否有发现的DAG存在导入错误。通过检查命令输出(特别是与--output一起使用生成标准文件格式时),可以创建一个自动化步骤,当任何DAG无法导入时该步骤会失败。 例如,当没有错误时默认输出是No data found,而json输出是[]。然后可以在CI或预提交中运行此检查,以加快审查过程和测试。

示例命令,如果出现任何错误将失败,使用jq来解析输出:

airflow dags list-import-errors --output=json | jq -e 'select(type=="array" and length == 0)'

这行可以直接添加到自动化流程中,或者如果你想打印输出,可以使用tee

airflow dags list-import-errors | tee import_errors.txt && jq -e 'select(type=="array" and length == 0)' import_errors.txt

Jenkins流水线中的示例:

stage('All DAGs are loadable') {
    steps {
        sh 'airflow dags list-import-errors | tee import_errors.txt && jq -e \'select(type=="array" and length == 0)\' import_errors.txt'
    }
}

注意

为了确保准确运行,您必须确保Airflow不会向标准输出记录任何额外文本。例如,您可能需要修复所有弃用警告,在命令中添加2>/dev/null,或者在Airflow配置中设置lazy_load_plugins = True(如果您有插件在加载时生成日志)。

这篇内容对您有帮助吗?