模板参考

变量、宏和过滤器可以在模板中使用(请参阅Jinja模板部分)

以下功能在Airflow中开箱即用。 额外的自定义宏可以通过Plugins全局添加,或者通过 DAG.user_defined_macros参数在DAG级别添加。

变量

Airflow引擎默认传递一些变量,这些变量在所有模板中都可访问

变量

类型

描述

{{ data_interval_start }}

pendulum.DateTime

数据区间的开始时间。在2.2版本中新增。

{{ data_interval_end }}

pendulum.DateTime

数据区间的结束时间。在2.2版本中新增。

{{ logical_date }}

pendulum.DateTime

一个逻辑上标识当前DAG运行的日期时间。该值不包含任何语义,仅作为识别用途。
Use data_interval_start and data_interval_end instead if you want a value that has real-world semantics,
例如根据时间戳从数据库中获取一部分行数据。

{{ ds }}

字符串

The DAG run’s logical date as YYYY-MM-DD.
Same as {{ logical_date | ds }}.

{{ ds_nodash }}

字符串

{{ logical_date | ds_nodash }} 相同。

{{ exception }}

无 | 字符串 | 异常 键盘中断

运行任务实例时发生错误。


{{ ts }}

字符串

Same as {{ logical_date | ts }}.
Example: 2018-01-01T00:00:00+00:00.

{{ ts_nodash_with_tz }}

字符串

Same as {{ logical_date | ts_nodash_with_tz }}.
Example: 20180101T000000+0000.

{{ ts_nodash }}

字符串

Same as {{ logical_date | ts_nodash }}.
Example: 20180101T000000.

{{ prev_data_interval_start_success }}

pendulum.DateTime | None

Start of the data interval of the prior successful DagRun.
在2.2版本中新增。

{{ prev_data_interval_end_success }}

pendulum.DateTime | None

End of the data interval of the prior successful DagRun.
在2.2版本中新增。

{{ prev_start_date_success }}

pendulum.DateTime | None

从先前成功的DagRun开始日期(如果可用)。

{{ prev_end_date_success }}

pendulum.DateTime | None

从前一次成功的DagRun中获取的结束日期(如果可用)。

{{ inlets }}

列表

任务上声明的入口列表。

{{ inlet_events }}

字典[str, …]

访问入口数据集的历史事件。参见数据集。该功能在2.10版本中新增。

{{ outlets }}

列表

任务上声明的输出列表。

{{ outlet_events }}

字典[str, …]

访问器用于将信息附加到当前任务将发出的数据集事件上。
See Datasets. Added in version 2.10.

{{ dag }}

DAG

当前正在运行的DAG。您可以在DAGs中了解更多关于DAG的信息。

{{ task }}

BaseOperator

The currently running BaseOperator. You can read more about Tasks in Operators

{{ macros }}

A reference to the macros package. See Macros below.

{{ task_instance }}

任务实例

当前正在运行的TaskInstance

{{ ti }}

任务实例

{{ task_instance }}

{{ params }}

字典[字符串, 任意类型]

用户定义的参数。这可以通过映射来覆盖
passed to trigger_dag -c if dag_run_conf_overrides_params
is enabled in airflow.cfg.

{{ var.value }}

Airflow变量。请参阅下方的模板中的Airflow变量

{{ var.json }}

Airflow变量。请参阅下方的模板中的Airflow变量

{{ conn }}

Airflow连接。请参阅下方的模板中的Airflow连接

{{ task_instance_key_str }}

字符串

任务实例的唯一人类可读键。格式为
{dag_id}__{task_id}__{ds_nodash}.

{{ run_id }}

字符串

当前正在运行的DagRun运行ID。

{{ dag_run }}

DagRun

当前正在运行的DagRun

{{ test_mode }}

布尔值

任务实例是否由airflow test命令行工具运行。

{{ map_index_template }}

无 | 字符串

用于渲染映射任务扩展任务实例的模板。设置此值将在渲染结果中体现。

{{ expanded_ti_count }}

int | None

映射任务扩展成的任务实例数量。如果
the current task is not mapped, this should be None.
添加于版本2.5。

{{ triggering_dataset_events }}

dict[str, list[DatasetEvent]]

If in a Dataset Scheduled DAG, a map of Dataset URI to a list of triggering DatasetEvent
(可能有多个,如果有多个不同频率的数据集)。
Read more here Datasets.
在2.4版本中新增。

注意

DAG运行逻辑日期及其派生值,如dsts不应被视为DAG中的唯一标识。请改用run_id

从TaskFlow任务访问Airflow上下文变量

虽然使用@task装饰的任务不支持渲染作为参数传递的jinja模板,但上述列出的所有变量都可以直接从任务中访问。以下代码块展示了如何从其任务中访问task_instance对象的示例:

from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun


@task
def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None):
    print(f"Run ID: {task_instance.run_id}")  # Run ID: scheduled__2023-08-09T00:00:00+00:00
    print(f"Duration: {task_instance.duration}")  # Duration: 0.972019
    print(f"DAG Run queued at: {dag_run.queued_at}")  # 2023-08-10 00:00:01+02:20

已弃用的变量

以下变量已被弃用。它们被保留以保持向后兼容性,但您应该将现有代码转换为使用其他变量。

已弃用的变量

描述

{{ execution_date }}

执行日期(逻辑日期),与 logical_date 相同

{{ next_execution_date }}

下一次计划运行的逻辑日期(如适用);你可以改用data_interval_end

{{ next_ds }}

下一个执行日期,格式为YYYY-MM-DD(如果存在),否则为None

{{ next_ds_nodash }}

如果存在,则下一个执行日期为YYYYMMDD格式,否则为None

{{ prev_execution_date }}

上一次计划运行的逻辑日期(如果适用)

{{ prev_ds }}

如果存在,则返回上一次执行日期,格式为YYYY-MM-DD,否则返回None

{{ prev_ds_nodash }}

如果存在,则返回上一次执行日期,格式为YYYYMMDD;否则返回None

{{ yesterday_ds }}

执行日期的前一天,格式为YYYY-MM-DD

{{ yesterday_ds_nodash }}

执行日期的前一天,格式为YYYYMMDD

{{ tomorrow_ds }}

执行日期后的第二天,格式为YYYY-MM-DD

{{ tomorrow_ds_nodash }}

执行日期后的第二天,格式为YYYYMMDD

{{ prev_execution_date_success }}

从前一次成功的DAG运行中获取执行日期; 如果您为DAG使用的时间表/调度定义了与旧版execution_date兼容的data_interval_start, 则可以考虑使用prev_data_interval_start_success来代替。

{{ conf }}

表示您airflow.cfg完整配置内容的对象。参见airflow.configuration.conf

请注意,您可以通过简单的点符号访问对象的属性和方法。以下是一些可能的示例: {{ task.owner }}, {{ task.task_id }}, {{ ti.hostname }}, ... 有关对象属性和方法的更多信息,请参考模型文档。

模板中的Airflow变量

var模板变量允许您访问Airflow变量。您可以将它们作为纯文本或JSON格式访问。如果使用JSON,您还可以遍历嵌套结构,例如字典:{{ var.json.my_dict_var.key1 }}

如果需要(例如变量键包含点号),也可以通过字符串获取变量,使用 {{ var.value.get('my.var', 'fallback') }}{{ var.json.get('my.dict.var', {'key1': 'val1'}) }}。如果变量不存在,可以提供默认值。

模板中的Airflow连接

同样地,Airflow Connections数据可以通过conn模板变量访问。例如,您可以在模板中使用类似{{ conn.my_conn_id.login }}{{ conn.my_conn_id.password }}等表达式。

就像使用var一样,可以通过字符串获取连接(例如{{ conn.get('my_conn_id_'+index).host }}),或者提供默认值(例如{{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }})。

此外,连接的extras字段可以通过extra_dejson字段作为Python字典获取,例如 conn.my_aws_conn_id.extra_dejson.region_name会从extras中提取region_name。 这样也可以提供extras中的默认值(例如{{ conn.my_aws_conn_id.extra_dejson.get('region_name', 'Europe (Frankfurt)') }})。

过滤器

Airflow定义了一些可用于格式化值的Jinja过滤器。

例如,使用{{ logical_date | ds }}将以YYYY-MM-DD格式输出logical_date。

过滤器

操作对象

描述

ds

日期时间

将日期时间格式化为 YYYY-MM-DD

ds_nodash

日期时间

将日期时间格式化为 YYYYMMDD

ts

日期时间

.isoformat() 相同,示例:2018-01-01T00:00:00+00:00

ts_nodash

日期时间

ts过滤器相同,但不包含-:或时区信息。 示例:20180101T000000

ts_nodash_with_tz

日期时间

作为不带-:ts过滤器。示例 20180101T000000+0000

宏(Macros)

宏是一种将对象暴露给模板的方式,它们位于模板中的macros命名空间下。

一些常用的库和方法已提供。

变量

描述

macros.datetime

标准库中的 datetime.datetime

macros.timedelta

标准库中的datetime.timedelta

macros.dateutil

dateutil包的引用

macros.time

标准库中的 time

macros.uuid

标准库中的 uuid

macros.random

标准库中的 random.random

还定义了一些airflow特定的宏:

airflow.macros.datetime_diff_for_humans(dt, since=None)[source]

返回两个日期时间之间人类可读的近似差异。

当只提供一个日期时间时,比较将基于当前时间。

Parameters:
  • dt (任意类型) – 用于显示差异的日期时间

  • since (DateTime | None) – 从何时开始显示日期。如果为None,则时间差是dt和当前时间之间的差值。

airflow.macros.ds_add(ds, days)[来源]

在YYYY-MM-DD格式的日期上增加或减少天数。

Parameters:
  • ds (str) – 要添加的基准日期,格式为YYYY-MM-DD

  • days (int) - 要添加到ds的天数,可以使用负值

>>> ds_add("2015-01-01", 5)
'2015-01-06'
>>> ds_add("2015-01-06", -5)
'2015-01-01'
airflow.macros.ds_format(ds, input_format, output_format)[来源]

以指定格式输出日期时间字符串。

Parameters:
  • ds (str) – 包含日期的输入字符串。

  • input_format (str) – 输入字符串格式(例如:‘%Y-%m-%d’)。

  • output_format (str) – 输出字符串格式(例如:'%Y-%m-%d')。

>>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
>>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d")
'2015-01-05'
>>> ds_format("12/07/2024", "%d/%m/%Y", "%A %d %B %Y", "en_US")
'Friday 12 July 2024'
airflow.macros.ds_format_locale(ds, input_format, output_format, locale=None)[source]

以给定的Babel格式输出本地化的日期时间字符串。

Parameters:
  • ds (str) – 包含日期的输入字符串。

  • input_format (str) – 输入字符串格式(例如:'%Y-%m-%d')。

  • output_format (str) – 输出字符串的Babel格式(例如,yyyy-MM-dd)。

  • locale (Locale | str | None) – 用于格式化输出字符串的区域设置(例如'en_US')。如果未指定区域设置,将使用默认的LC_TIME,如果该设置也不可用,则将使用'en_US'。

>>> ds_format("2015-01-01", "%Y-%m-%d", "MM-dd-yy")
'01-01-15'
>>> ds_format("1/5/2015", "%m/%d/%Y", "yyyy-MM-dd")
'2015-01-05'
>>> ds_format("12/07/2024", "%d/%m/%Y", "EEEE dd MMMM yyyy", "en_US")
'Friday 12 July 2024'

在2.10.0版本中新增。

airflow.macros.random() x in the interval [0, 1).

这篇内容对您有帮助吗?