对象存储¶
添加于版本2.8.0。
这是一个实验性功能。
所有主流云服务提供商都提供对象存储形式的持久化数据存储。这些并非传统的"POSIX"文件系统。为了存储数百PB级别的数据且不出现单点故障,对象存储用更简单的"对象名=>数据"模型取代了经典的文件系统目录树结构。为实现远程访问,对象操作通常通过(较慢的)HTTP REST接口提供。
Airflow在对象存储(如s3、gcs和azure blob storage)之上提供了一个通用抽象层。
这种抽象允许您在DAG中使用各种对象存储系统,而无需
更改代码来处理每个不同的对象存储系统。此外,它还允许您使用
大多数标准Python模块,比如shutil,这些模块可以处理类文件对象。
对特定对象存储系统的支持取决于已安装的提供程序。例如,如果安装了apache-airflow-providers-google提供程序,就能使用gcs方案进行对象存储。Airflow默认提供对file方案的支持。
注意
支持s3需要您安装apache-airflow-providers-amazon[s3fs]。这是因为
它依赖于aiobotocore,该组件默认不会安装,因为它可能与
botocore产生依赖冲突。
云对象存储并非真正的文件系统¶
对象存储并非真正的文件系统,尽管它们可能看起来像。它们不支持真实文件系统提供的所有操作。主要差异包括:
不保证原子性的重命名操作。这意味着如果你将文件从一个位置移动到另一个位置,它会被复制然后删除。如果复制失败,你将丢失该文件。
目录是模拟的,可能会导致操作变慢。例如,列出目录可能需要列出存储桶中的所有对象并按前缀进行过滤。
在文件中进行查找可能需要大量调用开销,影响性能,或者可能完全不被支持。
Airflow 依赖 fsspec 来提供跨不同对象存储系统的一致体验。它实现了本地文件缓存以加速访问。但在设计DAG时,您应该注意对象存储的限制。
基础用法¶
要使用对象存储,您需要使用要交互对象的URI实例化一个Path对象(如下所示)。例如,要指向s3中的存储桶,您可以执行以下操作:
from airflow.io.path import ObjectStoragePath
base = ObjectStoragePath("s3://aws_default@my-bucket/")
URI中的用户名部分代表Airflow连接ID,是可选的。它也可以作为单独的关键字参数传递:
# Equivalent to the previous example.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")
列出文件对象:
@task
def list_files() -> list[ObjectStoragePath]:
files = [f for f in base.iterdir() if f.is_file()]
return files
在目录树中导航:
base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"
# prints ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)
打开文件:
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
利用XCOM,您可以在任务之间传递路径:
@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
return path / "new_file.txt"
@task
def write_file(path: ObjectStoragePath, content: str):
with path.open("wb") as f:
f.write(content)
new_file = create(base)
write = write_file(new_file, b"data")
read >> write
配置¶
在基本使用中,对象存储抽象不需要太多配置,它依赖于标准的Airflow连接机制。这意味着您可以使用conn_id参数来指定要使用的连接。连接的任何设置都会被传递到底层实现。例如,如果您使用s3,可以指定aws_access_key_id和aws_secret_access_key,还可以添加额外的参数如endpoint_url来指定自定义端点。
替代后端¶
可以为某个方案或协议配置替代的后端。这是通过将backend附加到方案上来实现的。例如,要为dbfs方案启用databricks后端,您可以执行以下操作:
from airflow.io.path import ObjectStoragePath
from airflow.io.store import attach
from fsspec.implementations.dbfs import DBFSFileSystem
attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")
注意
要在多个任务间复用注册信息,请确保将后端附加到DAG的顶层。 否则,该后端将无法在多个任务间共享使用。
路径API¶
对象存储抽象是通过Path API实现的,并基于Universal Pathlib构建。这意味着您基本上可以使用相同的API与对象存储交互,就像操作本地文件系统一样。本节我们仅列出两种API之间的差异。超出标准Path API的扩展操作(如复制和移动)将在下一节中列出。有关每个操作的详细信息(例如它们接受哪些参数),请参阅ObjectStoragePath类的文档。
mkdir¶
在指定路径或存储桶/容器内创建目录条目。对于没有真正目录的系统,它可能仅为此实例创建目录条目,而不会影响实际的文件系统。
如果 parents 参数为 True,则会根据需要自动创建该路径中所有缺失的父目录。
touch¶
在指定路径创建文件或更新时间戳。如果truncate参数为True(默认值),文件将被截断。若文件已存在且exists_ok为真,则函数执行成功(并更新修改时间为当前时间),否则将抛出FileExistsError异常。
统计¶
返回一个类似stat_result的对象,支持以下属性:st_size、st_mtime、st_mode,
但同时表现得像一个字典,可以提供关于对象的额外元数据。例如,对于s3,它将返回额外的键,如:['ETag', 'ContentType']。如果您的代码需要在不同对象存储之间保持可移植性,请不要依赖扩展的元数据。
扩展¶
以下操作不属于标准路径API的一部分,但对象存储抽象层支持这些操作。
存储桶¶
返回存储桶名称。
校验和¶
返回文件的校验和。
容器¶
存储桶的别名
文件系统¶
便捷属性,用于访问已实例化的文件系统
密钥¶
返回对象键。
命名空间¶
返回对象的命名空间。通常这是协议,例如s3://加上存储桶名称。
路径¶
与文件系统实例兼容的fsspec路径
协议¶
文件系统规范协议。
read_block¶
从给定路径的文件中读取一个字节块。
从文件的偏移量开始,读取指定长度的字节。如果设置了分隔符,则确保读取操作在分隔符边界处开始和结束,这些边界遵循偏移量和偏移量加长度的位置。如果偏移量为零,则从零开始读取。返回的字节串将包含结束分隔符字符串。
如果偏移量+长度超出文件末尾,则读取到文件末尾。
签名¶
创建一个表示给定路径的签名URL。某些实现允许生成临时URL,作为一种委托凭据的方式。
大小¶
返回给定路径下文件的大小(以字节为单位)。
storage_options¶
用于实例化底层文件系统的存储选项。
ukey¶
文件属性的哈希值,用于判断文件是否发生更改。
复制与移动¶
本文档记录了copy和move操作的预期行为,特别是跨对象存储(例如文件 -> s3)的行为。每个方法将文件或目录从source复制或移动到target位置。预期行为与fsspec中规定的相同。对于跨对象存储的目录复制,Airflow需要遍历目录树并逐个复制每个文件。这是通过将每个文件从源位置流式传输到目标位置来实现的。
外部集成¶
许多其他项目,如DuckDB、Apache Iceberg等,都可以利用对象存储抽象。通常这是通过传递底层的fsspec实现来完成的。为此,ObjectStoragePath暴露了fs属性。例如,以下代码与duckdb配合使用时,会使用Airflow的连接详情连接到s3,并读取由ObjectStoragePath指定的parquet文件:
import duckdb
from airflow.io.path import ObjectStoragePath
path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")