高级:在代码中访问数据目录¶
您可以通过两种方式定义数据目录。大多数用例可以通过YAML配置文件实现,如前所述,但也可以通过DataCatalog以编程方式访问数据目录,使用API允许您在代码中配置数据源,并在笔记本中使用IO模块。
警告
从Kedro 0.19.0版本开始,数据集不再包含在核心Kedro包中。请改为从kedro-datasets包中导入它们。
从kedro-datasets的2.0.0版本开始,所有数据集名称都已更改,将"DataSet"中的大写字母"S"替换为小写字母"s"。例如,CSVDataSet现在改为CSVDataset。
如何配置数据目录¶
要使用DataCatalog API,可以在类似catalog.py的文件中以编程方式构建DataCatalog对象。
在以下代码中,我们使用了kedro-datasets documentation中记录的多个预构建数据加载器。
from kedro.io import DataCatalog
from kedro_datasets.pandas import (
CSVDataset,
SQLTableDataset,
SQLQueryDataset,
ParquetDataset,
)
catalog = DataCatalog(
{
"bikes": CSVDataset(filepath="../data/01_raw/bikes.csv"),
"cars": CSVDataset(filepath="../data/01_raw/cars.csv", load_args=dict(sep=",")),
"cars_table": SQLTableDataset(
table_name="cars", credentials=dict(con="sqlite:///kedro.db")
),
"scooters_query": SQLQueryDataset(
sql="select * from cars where gear=4",
credentials=dict(con="sqlite:///kedro.db"),
),
"ranked": ParquetDataset(filepath="ranked.parquet"),
}
)
使用SQLTableDataset或SQLQueryDataset时,必须提供一个包含SQLAlchemy兼容数据库连接字符串的con键。在上面的示例中,我们将其作为credentials参数的一部分传递。除了credentials之外,还可以将con放入load_args和save_args中(仅适用于SQLTableDataset)。
如何查看可用的数据源¶
查看DataCatalog:
catalog.list()
如何以编程方式加载数据集¶
通过名称访问每个数据集:
cars = catalog.load("cars") # data is now loaded as a DataFrame in 'cars'
gear = cars["gear"].values
当调用load时,后台发生了以下步骤:
值
cars位于数据目录中对应的
AbstractDataset对象已获取该数据集的
load方法被调用这个
load方法将加载操作委托给了底层的pandasread_csv函数
如何以编程方式保存数据¶
警告
除非您正在使用平台笔记本环境(如Sagemaker、Databricks等)或为Kedro管道编写单元/集成测试,否则不建议使用此模式。优先考虑使用YAML方法。
如何将数据保存到内存¶
使用与加载数据类似的API来保存数据:
from kedro.io import MemoryDataset
memory = MemoryDataset(data=None)
catalog.add("cars_cache", memory)
catalog.save("cars_cache", "Memory can store anything.")
catalog.load("cars_cache")
如何将数据保存到SQL数据库以供查询¶
将数据存入SQLite数据库:
import os
# This cleans up the database in case it exists at this point
try:
os.remove("kedro.db")
except FileNotFoundError:
pass
catalog.save("cars_table", cars)
# rank scooters by their mpg
ranked = catalog.load("scooters_query")[["brand", "mpg"]]
如何以Parquet格式保存数据¶
将处理后的数据以Parquet格式保存:
catalog.save("ranked", ranked)
警告
不允许将None保存到数据集!
如何使用凭据访问数据集¶
在实例化DataCatalog之前,Kedro会首先尝试从项目配置中读取凭证。然后将生成的字典作为credentials参数传递给DataCatalog.from_config()。
假设项目中包含文件 conf/local/credentials.yml,其内容如下:
dev_s3:
client_kwargs:
aws_access_key_id: key
aws_secret_access_key: secret
scooters_credentials:
con: sqlite:///kedro.db
my_gcp_credentials:
id_token: key
您的代码将如下所示:
CSVDataset(
filepath="s3://test_bucket/data/02_intermediate/company/motorbikes.csv",
load_args=dict(sep=",", skiprows=5, skipfooter=1, na_values=["#NA", "NA"]),
credentials=dict(key="token", secret="key"),
)
如何使用Code API对数据集进行版本控制¶
在文档的前面部分,我们描述了Kedro如何实现数据集和机器学习模型版本控制。
如果需要以编程方式控制特定数据集的加载和保存版本,可以实例化Version并将其作为参数传递给数据集初始化:
from kedro.io import DataCatalog, Version
from kedro_datasets.pandas import CSVDataset
import pandas as pd
data1 = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
data2 = pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]})
version = Version(
load=None, # load the latest available version
save=None, # generate save version automatically on each save operation
)
test_dataset = CSVDataset(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
catalog = DataCatalog({"test_dataset": test_dataset})
# save the dataset to data/01_raw/test.csv/<version>/test.csv
catalog.save("test_dataset", data1)
# save the dataset into a new file data/01_raw/test.csv/<version>/test.csv
catalog.save("test_dataset", data2)
# load the latest version from data/test.csv/*/test.csv
reloaded = catalog.load("test_dataset")
assert data2.equals(reloaded)
在上面的示例中,我们没有固定任何版本。当我们设置版本时,加载和保存操作的行为会略有不同:
version = Version(
load="my_exact_version", # load exact version
save="my_exact_version", # save to exact version
)
test_dataset = CSVDataset(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
catalog = DataCatalog({"test_dataset": test_dataset})
# save the dataset to data/01_raw/test.csv/my_exact_version/test.csv
catalog.save("test_dataset", data1)
# load from data/01_raw/test.csv/my_exact_version/test.csv
reloaded = catalog.load("test_dataset")
assert data1.equals(reloaded)
# raises DatasetError since the path
# data/01_raw/test.csv/my_exact_version/test.csv already exists
catalog.save("test_dataset", data2)
我们不建议传递确切的加载或保存版本号,因为这可能导致操作间出现不一致。例如,如果加载和保存操作的版本号不匹配,保存操作将产生UserWarning警告。
想象一个包含两个节点的简单管道,其中B节点接收来自A节点的输出。如果你为B节点指定加载的数据版本为my_data_2023_08_16.csv,那么A节点生成的数据(my_data_20230818.csv)将不会被使用。
Node_A -> my_data_20230818.csv
my_data_2023_08_16.csv -> Node B
在代码中:
version = Version(
load="my_data_2023_08_16.csv", # load exact version
save="my_data_20230818.csv", # save to exact version
)
test_dataset = CSVDataset(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
catalog = DataCatalog({"test_dataset": test_dataset})
catalog.save("test_dataset", data1) # emits a UserWarning due to version inconsistency
# raises DatasetError since the data/01_raw/test.csv/exact_load_version/test.csv
# file does not exist
reloaded = catalog.load("test_dataset")