模板参考¶
变量、宏和过滤器可以在模板中使用(请参阅Jinja模板部分)
以下功能在Airflow中开箱即用。
额外的自定义宏可以通过Plugins全局添加,或者通过
DAG.user_defined_macros参数在DAG级别添加。
变量¶
Airflow引擎默认传递一些变量,这些变量在所有模板中都可访问
变量 |
类型 |
描述 |
|---|---|---|
|
数据区间的开始时间。在2.2版本中新增。 |
|
|
数据区间的结束时间。在2.2版本中新增。 |
|
|
一个逻辑上标识当前DAG运行的日期时间。该值不包含任何语义,仅作为识别用途。
Use
data_interval_start and data_interval_end instead if you want a value that has real-world semantics,例如根据时间戳从数据库中获取一部分行数据。
|
|
|
字符串 |
The DAG run’s logical date as
YYYY-MM-DD.Same as
{{ logical_date | ds }}. |
|
字符串 |
与 |
|
无 | 字符串 | 异常 键盘中断 |
运行任务实例时发生错误。
|
|
字符串 |
Same as
{{ logical_date | ts }}.Example:
2018-01-01T00:00:00+00:00. |
|
字符串 |
Same as
{{ logical_date | ts_nodash_with_tz }}.Example:
20180101T000000+0000. |
|
字符串 |
Same as
{{ logical_date | ts_nodash }}.Example:
20180101T000000. |
|
pendulum.DateTime
| |
Start of the data interval of the prior successful
DagRun.在2.2版本中新增。
|
|
pendulum.DateTime
| |
End of the data interval of the prior successful
DagRun.在2.2版本中新增。
|
|
pendulum.DateTime
| |
从先前成功的 |
|
pendulum.DateTime
| |
从前一次成功的 |
|
列表 |
任务上声明的入口列表。 |
|
字典[str, …] |
访问入口数据集的历史事件。参见数据集。该功能在2.10版本中新增。 |
|
列表 |
任务上声明的输出列表。 |
|
字典[str, …] |
访问器用于将信息附加到当前任务将发出的数据集事件上。
See Datasets. Added in version 2.10.
|
|
DAG |
|
|
BaseOperator |
The currently running
BaseOperator. You can read more about Tasks in Operators |
|
A reference to the macros package. See Macros below.
|
|
|
任务实例 |
当前正在运行的 |
|
任务实例 |
同 |
|
字典[字符串, 任意类型] |
用户定义的参数。这可以通过映射来覆盖
passed to
trigger_dag -c if dag_run_conf_overrides_paramsis enabled in
airflow.cfg. |
|
Airflow变量。请参阅下方的模板中的Airflow变量。 |
|
|
Airflow变量。请参阅下方的模板中的Airflow变量。 |
|
|
Airflow连接。请参阅下方的模板中的Airflow连接。 |
|
|
字符串 |
任务实例的唯一人类可读键。格式为
{dag_id}__{task_id}__{ds_nodash}. |
|
字符串 |
当前正在运行的 |
|
DagRun |
当前正在运行的 |
|
布尔值 |
任务实例是否由 |
|
无 | 字符串 |
用于渲染映射任务扩展任务实例的模板。设置此值将在渲染结果中体现。 |
|
int | |
映射任务扩展成的任务实例数量。如果
the current task is not mapped, this should be
None.添加于版本2.5。
|
|
dict[str, list[DatasetEvent]] |
If in a Dataset Scheduled DAG, a map of Dataset URI to a list of triggering
DatasetEvent(可能有多个,如果有多个不同频率的数据集)。
Read more here Datasets.
在2.4版本中新增。
|
注意
DAG运行逻辑日期及其派生值,如ds和ts,不应被视为DAG中的唯一标识。请改用run_id。
从TaskFlow任务访问Airflow上下文变量¶
虽然使用@task装饰的任务不支持渲染作为参数传递的jinja模板,但上述列出的所有变量都可以直接从任务中访问。以下代码块展示了如何从其任务中访问task_instance对象的示例:
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
已弃用的变量¶
以下变量已被弃用。它们被保留以保持向后兼容性,但您应该将现有代码转换为使用其他变量。
已弃用的变量 |
描述 |
|---|---|
|
执行日期(逻辑日期),与 |
|
下一次计划运行的逻辑日期(如适用);你可以改用 |
|
下一个执行日期,格式为 |
|
如果存在,则下一个执行日期为 |
|
上一次计划运行的逻辑日期(如果适用) |
|
如果存在,则返回上一次执行日期,格式为 |
|
如果存在,则返回上一次执行日期,格式为 |
|
执行日期的前一天,格式为 |
|
执行日期的前一天,格式为 |
|
执行日期后的第二天,格式为 |
|
执行日期后的第二天,格式为 |
|
从前一次成功的DAG运行中获取执行日期;
如果您为DAG使用的时间表/调度定义了与旧版 |
|
表示您 |
请注意,您可以通过简单的点符号访问对象的属性和方法。以下是一些可能的示例:
{{ task.owner }}, {{ task.task_id }}, {{ ti.hostname }}, ...
有关对象属性和方法的更多信息,请参考模型文档。
模板中的Airflow变量¶
var模板变量允许您访问Airflow变量。您可以将它们作为纯文本或JSON格式访问。如果使用JSON,您还可以遍历嵌套结构,例如字典:{{ var.json.my_dict_var.key1 }}。
如果需要(例如变量键包含点号),也可以通过字符串获取变量,使用
{{ var.value.get('my.var', 'fallback') }} 或
{{ var.json.get('my.dict.var', {'key1': 'val1'}) }}。如果变量不存在,可以提供默认值。
模板中的Airflow连接¶
同样地,Airflow Connections数据可以通过conn模板变量访问。例如,您可以在模板中使用类似{{ conn.my_conn_id.login }}、
{{ conn.my_conn_id.password }}等表达式。
就像使用var一样,可以通过字符串获取连接(例如{{ conn.get('my_conn_id_'+index).host }}),或者提供默认值(例如{{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }})。
此外,连接的extras字段可以通过extra_dejson字段作为Python字典获取,例如
conn.my_aws_conn_id.extra_dejson.region_name会从extras中提取region_name。
这样也可以提供extras中的默认值(例如{{ conn.my_aws_conn_id.extra_dejson.get('region_name', 'Europe (Frankfurt)') }})。
过滤器¶
Airflow定义了一些可用于格式化值的Jinja过滤器。
例如,使用{{ logical_date | ds }}将以YYYY-MM-DD格式输出logical_date。
过滤器 |
操作对象 |
描述 |
|---|---|---|
|
日期时间 |
将日期时间格式化为 |
|
日期时间 |
将日期时间格式化为 |
|
日期时间 |
与 |
|
日期时间 |
与 |
|
日期时间 |
作为不带 |
宏(Macros)¶
宏是一种将对象暴露给模板的方式,它们位于模板中的macros命名空间下。
一些常用的库和方法已提供。
变量 |
描述 |
|---|---|
|
标准库中的 |
|
标准库中的 |
|
对 |
|
标准库中的 |
|
标准库中的 |
|
标准库中的 |
还定义了一些airflow特定的宏:
- airflow.macros.datetime_diff_for_humans(dt, since=None)[source]¶
返回两个日期时间之间人类可读的近似差异。
当只提供一个日期时间时,比较将基于当前时间。
- Parameters:
dt (任意类型) – 用于显示差异的日期时间
since (DateTime | None) – 从何时开始显示日期。如果为
None,则时间差是dt和当前时间之间的差值。
- airflow.macros.ds_add(ds, days)[来源]¶
在YYYY-MM-DD格式的日期上增加或减少天数。
>>> ds_add("2015-01-01", 5) '2015-01-06' >>> ds_add("2015-01-06", -5) '2015-01-01'
- airflow.macros.ds_format(ds, input_format, output_format)[来源]¶
以指定格式输出日期时间字符串。
- Parameters:
>>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y") '01-01-15' >>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d") '2015-01-05' >>> ds_format("12/07/2024", "%d/%m/%Y", "%A %d %B %Y", "en_US") 'Friday 12 July 2024'
- airflow.macros.ds_format_locale(ds, input_format, output_format, locale=None)[source]¶
以给定的Babel格式输出本地化的日期时间字符串。
- Parameters:
>>> ds_format("2015-01-01", "%Y-%m-%d", "MM-dd-yy") '01-01-15' >>> ds_format("1/5/2015", "%m/%d/%Y", "yyyy-MM-dd") '2015-01-05' >>> ds_format("12/07/2024", "%d/%m/%Y", "EEEE dd MMMM yyyy", "en_US") 'Friday 12 July 2024'
在2.10.0版本中新增。
- airflow.macros.random() x in the interval [0, 1).¶