XComs¶
XComs("跨通信"的缩写)是一种让任务相互通信的机制,因为默认情况下任务是完全隔离的,可能运行在完全不同的机器上。
XCom由key(本质上是其名称)以及来源的task_id和dag_id标识。它们可以包含任何可序列化的值(包括用@dataclass或@attr.define装饰的对象,参见TaskFlow arguments),但仅设计用于传递少量数据;请勿用于传递大型数值,如数据框。
XComs通过任务实例上的xcom_push和xcom_pull方法显式地"推送"和"拉取"到/从其存储中。
要在名为“task-1”的任务中推送一个值供另一个任务使用:
# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
要在另一个任务中获取上述代码中推送的值:
# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
许多操作符会自动将其结果推送到名为return_value的XCom键中,前提是do_xcom_push参数设置为True(默认值),@task函数也是如此。如果未传递键值,xcom_pull默认使用return_value作为键,这意味着可以编写如下代码:
# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')
你也可以在templates中使用XComs:
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XComs是Variables的近亲,主要区别在于XComs是基于任务实例的,专为DAG运行中的通信设计,而Variables是全局的,专为整体配置和值共享设计。
如果你想一次性推送多个XCom或者重命名推送的XCom键,可以将do_xcom_push和multiple_outputs参数设置为True,然后返回一个值字典。
注意
如果首次任务运行未成功,则每次重试时任务XComs将被清除,以确保任务运行具有幂等性。
对象存储XCom后端¶
默认的XCom后端是BaseXCom类,它将XCom存储在Airflow数据库中。这对于小值来说没问题,但对于大值或大量XCom可能会出现问题。
要将XComs存储在对象存储中,您可以将xcom_backend配置选项设置为airflow.providers.common.io.xcom.backend.XComObjectStorageBackend。
您还需要将xcom_objectstorage_path设置为所需位置。连接ID将从您提供的URL的用户部分获取,例如xcom_objectstorage_path = s3://conn_id@mybucket/key。此外,xcom_objectstorage_threshold必须
设置为大于-1的值。任何小于该阈值(以字节为单位)的对象将存储在数据库中,大于该阈值的对象将
放入对象存储。这将允许混合设置。如果xcom存储在对象存储上,数据库中将保存一个引用。最后,您可以将xcom_objectstorage_compression设置为fsspec支持的压缩方法,如zip或snappy以
在将数据存储到对象存储之前对其进行压缩。
例如,以下配置会将超过1MB的内容存储在S3中,并使用gzip进行压缩:
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip
注意
压缩功能需要你的Python环境中已安装相关支持库。例如,要使用snappy压缩,你需要安装python-snappy。Zip、gzip和bz2则无需额外安装即可直接使用。
自定义XCom后端¶
XCom系统支持可互换的后端存储,您可以通过xcom_backend配置选项设置使用哪个后端。
如果你想实现自己的后端,应该继承BaseXCom类,并重写serialize_value和deserialize_value方法。
此外还有一个orm_deserialize_value方法,每当XCom对象被渲染用于用户界面或报告用途时都会调用;如果你的XCom中包含大型或检索成本高的值,你应该重写此方法以避免调用该代码(而是返回一个更轻量、不完整的表示形式),从而保持用户界面的响应性。
你也可以重写clear方法,在清除指定DAG和任务的结果时使用它。这能让自定义XCom后端更轻松地处理数据生命周期。
验证容器中自定义XCom后端的使用¶
根据Airflow部署的位置(例如本地环境、Docker或K8s等),确认自定义XCom后端是否实际被初始化会很有帮助。举例来说,容器环境的复杂性可能使得在容器部署期间更难判断您的后端是否正确加载。幸运的是,以下指南可以帮助您增强对自定义XCom实现的信心。
如果你能通过终端进入一个Airflow容器,就可以打印出实际使用的XCom类:
from airflow.models.xcom import XCom
print(XCom.__name__)