使用Iceberg进行数据版本控制¶
Apache Iceberg 是一种用于分析数据集的开源表格式。Iceberg表提供了模式演进、隐藏分区、分区布局演进、时间旅行和版本回滚等功能。本指南将介绍如何在Kedro中使用Iceberg表。本教程我们将使用pyiceberg库,该库允许您使用Python与Iceberg表交互,而无需JVM环境。需要注意的是pyiceberg是一个快速发展的项目,目前不支持Iceberg表提供的全部功能。您可以将本教程作为灵感来源,通过创建自定义数据集来扩展功能,使用不同的计算引擎如Spark,或数据框架技术如Apache Arrow、DuckDB等。
先决条件¶
你需要创建一个名为spaceflights-pandas的初始项目,其中包含名为kedro-iceberg的示例流水线。如果尚未创建,可以使用以下命令新建一个Kedro项目:
kedro new --starter spaceflights-pandas --name kedro-iceberg
要与Iceberg表交互,您还需要安装pyiceberg包。您可以通过在requirements.txt中添加以下行来安装它:
pyiceberg[pyarrow]~=0.8.0
根据您选择的存储方式,您可能还需要安装可选依赖项。请参考PyIceberg的安装指南来更新上述命令,添加所需的可选依赖项。
现在您可以通过以下命令安装项目所需依赖:
pip install -r requirements.txt
设置Iceberg目录¶
Iceberg表由目录(catalog)管理,该目录负责处理表的元数据。在生产环境中,这可以是Hive、Glue或Apache Iceberg支持的其他目录。Iceberg还支持多种存储选项,如S3、HDFS等。通过参考配置指南,您可以通过多种方式配置目录、凭证和对象存储以满足需求。在本教程中,我们将使用SQLCatalog,它将元数据存储在本地sqlite数据库中,并使用本地文件系统进行存储。
通过运行以下命令为Iceberg表创建一个临时位置:
mkdir -p /tmp/warehouse
有多种方式可以配置目录,在本教程中,您可以使用~/.pyiceberg.yaml文件。默认情况下,pyiceberg会在您的主目录中查找.pyiceberg.yaml文件,也就是说它会查找~/.pyiceberg.yaml。您可以在主目录中创建或更新现有的.pyiceberg.yaml文件,内容如下:
catalog:
default:
type: sql
uri: sqlite:////tmp/warehouse/pyiceberg_catalog.db
warehouse: file:///tmp/warehouse/warehouse
您可以通过打开一个Python shell(使用ipython命令)并运行以下代码来检查配置是否正在加载:
from pyiceberg.catalog import load_catalog
catalog = load_catalog(name="default")
定义自定义数据集以使用Iceberg表¶
要在Kedro中使用Iceberg表,您需要定义一个自定义数据集,该数据集使用pyiceberg库。在项目的src/kedro_iceberg/目录下创建一个名为pyiceberg_dataset.py的新文件,并复制以下代码:
import pyarrow as pa
from kedro.io.core import AbstractDataset, DatasetError
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NoSuchTableError
DEFAULT_LOAD_ARGS = {"load_version": None}
DEFAULT_SAVE_ARGS = {"mode": "overwrite"}
class PyIcebergDataset(AbstractDataset):
def __init__(
self,
catalog,
namespace,
table_name,
load_args=DEFAULT_LOAD_ARGS,
scan_args=None,
save_args=DEFAULT_SAVE_ARGS,
):
self.table_name = table_name
self.namespace = namespace
self.catalog = load_catalog(catalog)
self.load_args = load_args
self.table = self._load_table(namespace, table_name)
self.save_args = save_args
self.scan_args = scan_args
def load(self):
self.table = self.catalog.load_table((self.namespace, self.table_name))
if self.scan_args:
scan = self.table.scan(**self.scan_args)
else:
scan = self.table.scan()
return scan.to_pandas()
def _load_table(self, namespace, table_name):
try:
return self.catalog.load_table((namespace, table_name))
except NoSuchTableError:
return None
def save(self, data) -> None:
arrow = pa.Table.from_pandas(data)
if not self.table:
self.catalog.create_namespace_if_not_exists(self.namespace)
self.table = self.catalog.create_table((self.namespace, self.table_name), schema=arrow.schema)
if self.save_args.get("mode") == "overwrite":
self.table.overwrite(arrow)
elif self.save_args.get("mode") == "append":
self.table.append(arrow)
else:
raise DatasetError("Mode not supported")
def _describe(self) -> dict:
return {}
def exists(self):
return self.catalog.table_exists((self.namespace, self.table_name))
def inspect(self):
return self.table.inspect
该数据集允许您将Iceberg表加载为pandas数据框。您还可以通过向数据集传递scan_args参数来加载表的子集。save()方法允许您将数据框保存为Iceberg表。通过向数据集传递save_args参数,您可以指定表的保存模式。inspect()方法返回一个包含表元数据的InspectTable对象。
您还可以更新代码以扩展数据集功能,支持更多Iceberg表特性。请参阅Iceberg API文档了解您可以使用Table对象实现的功能。
在目录中使用Iceberg表¶
将数据集保存为Iceberg表¶
现在更新你的目录文件 conf/base/catalog.yml,将数据集 model_input_table 修改为使用自定义的 PyIcebergDataset:
model_input_table:
type: kedro_iceberg.pyiceberg_dataset.PyIcebergDataset
catalog: default
namespace: default
table_name: model_input_table
现在使用以下命令运行您的Kedro项目:
kedro run
你可以通过运行以下命令来检查创建的Iceberg表model_input_table数据集:
tree /tmp/warehouse
输出结果应类似于:
/tmp/warehouse
├── pyiceberg_catalog.db
└── warehouse
└── default.db
└── model_input_table
├── data
│ ├── 00000-0-a3d0f3e6-a9b4-44e4-8dac-95d4b5c14b29.parquet
│ └── 00000-0-baa30a43-2cad-4507-967e-84c744d69c9b.parquet
└── metadata
├── 00000-e66a465e-cdfa-458e-aaf3-aed48ac49157.metadata.json
├── 00001-d1fa7797-ef6f-438e-83c0-bdaabf1bd8de.metadata.json
├── 00002-f0b5294e-a0be-450c-95fe-4cee96c9a311.metadata.json
├── 836ecf2e-e339-44d8-933a-bd978991ea3e-m0.avro
├── a3d0f3e6-a9b4-44e4-8dac-95d4b5c14b29-m0.avro
├── baa30a43-2cad-4507-967e-84c744d69c9b-m0.avro
├── snap-3087457244520966174-0-836ecf2e-e339-44d8-933a-bd978991ea3e.avro
├── snap-3885749350984242152-0-a3d0f3e6-a9b4-44e4-8dac-95d4b5c14b29.avro
└── snap-7387825159950300388-0-baa30a43-2cad-4507-967e-84c744d69c9b.avro
假设上游数据集 companies、shuttles 或 reviews 已更新。您可以运行以下命令来生成 model_input_table 数据集的新版本:
kedro run --to-outputs=model_input_table
你可以使用find /tmp/warehouse/命令来检查更新后的数据集和日志。
加载特定数据集版本¶
要加载数据集的特定版本,您可以在配置中的表scan_args里指定所需版本的snapshot_id。您可以从表的历史记录中获取snapshot_id。以下部分将说明如何在交互模式下检查表的历史记录。
model_input_table:
type: kedro_iceberg.pyiceberg_dataset.PyIcebergDataset
catalog: default
namespace: default
table_name: model_input_table
table_type: pandas
scan_args:
snapshot_id: <snapshot_id>
在交互模式下检查数据集¶
你可以在交互式Python会话中检查Iceberg表的历史记录、元数据、模式等。要启动已加载Kedro组件的IPython会话,请运行:
kedro ipython
使用catalog.datasets属性加载PyIcebergDataset实例:
In [1]: model_input_table = catalog.datasets['model_input_table']
您可以通过使用inspect()方法访问InspectTable对象来检查Delta表的历史记录:
In [2]: inspect_table = model_input_table.inspect()
现在你可以在InspectTable对象上调用history()方法来获取该表的历史记录:
In [3]: inspect_table.history()
Out [3]:
pyarrow.Table
made_current_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
is_current_ancestor: bool not null
----
made_current_at: [[2025-02-26 11:42:36.871,2025-02-26 12:08:38.826,2025-02-26 12:08:38.848]]
snapshot_id: [[9089827653240705573,5091346767047746426,7107920212859354452]]
parent_id: [[null,9089827653240705573,5091346767047746426]]
is_current_ancestor: [[true,true,true]]
或者,你也可以直接从pyiceberg.table.Table对象调用history()方法,这会显示更简洁的输出:
In [4]: model_input_table.table.history()
Out [4]:
[
SnapshotLogEntry(snapshot_id=7387825159950300388, timestamp_ms=1741190825900),
SnapshotLogEntry(snapshot_id=3087457244520966174, timestamp_ms=1741190833531),
SnapshotLogEntry(snapshot_id=3885749350984242152, timestamp_ms=1741190833554)
]
同样,你可以调用其他方法在InspectTable对象上来获取关于表的更多信息,例如snapshots()、schema()、partitions()等。