高级:在代码中访问数据目录

您可以通过两种方式定义数据目录。大多数用例可以通过YAML配置文件实现,如前所述,但也可以通过DataCatalog以编程方式访问数据目录,使用API允许您在代码中配置数据源,并在笔记本中使用IO模块。

警告

从Kedro 0.19.0版本开始,数据集不再包含在核心Kedro包中。请改为从kedro-datasets包中导入它们。 从kedro-datasets2.0.0版本开始,所有数据集名称都已更改,将"DataSet"中的大写字母"S"替换为小写字母"s"。例如,CSVDataSet现在改为CSVDataset

如何配置数据目录

要使用DataCatalog API,可以在类似catalog.py的文件中以编程方式构建DataCatalog对象。

在以下代码中,我们使用了kedro-datasets documentation中记录的多个预构建数据加载器。

from kedro.io import DataCatalog
from kedro_datasets.pandas import (
    CSVDataset,
    SQLTableDataset,
    SQLQueryDataset,
    ParquetDataset,
)

catalog =  DataCatalog(
    {
        "bikes": CSVDataset(filepath="../data/01_raw/bikes.csv"),
        "cars": CSVDataset(filepath="../data/01_raw/cars.csv", load_args=dict(sep=",")),
        "cars_table": SQLTableDataset(
            table_name="cars", credentials=dict(con="sqlite:///kedro.db")
        ),
        "scooters_query": SQLQueryDataset(
            sql="select * from cars where gear=4",
            credentials=dict(con="sqlite:///kedro.db"),
        ),
        "ranked": ParquetDataset(filepath="ranked.parquet"),
    }
)

使用SQLTableDatasetSQLQueryDataset时,必须提供一个包含SQLAlchemy兼容数据库连接字符串的con键。在上面的示例中,我们将其作为credentials参数的一部分传递。除了credentials之外,还可以将con放入load_argssave_args中(仅适用于SQLTableDataset)。

如何查看可用的数据源

查看DataCatalog

catalog.list()

如何以编程方式加载数据集

通过名称访问每个数据集:

cars = catalog.load("cars")  # data is now loaded as a DataFrame in 'cars'
gear = cars["gear"].values

当调用load时,后台发生了以下步骤:

  • cars 位于数据目录中

  • 对应的 AbstractDataset 对象已获取

  • 该数据集的load方法被调用

  • 这个load方法将加载操作委托给了底层的pandasread_csv函数

如何以编程方式保存数据

警告

除非您正在使用平台笔记本环境(如Sagemaker、Databricks等)或为Kedro管道编写单元/集成测试,否则不建议使用此模式。优先考虑使用YAML方法。

如何将数据保存到内存

使用与加载数据类似的API来保存数据:

from kedro.io import MemoryDataset

memory = MemoryDataset(data=None)
catalog.add("cars_cache", memory)
catalog.save("cars_cache", "Memory can store anything.")
catalog.load("cars_cache")

如何将数据保存到SQL数据库以供查询

将数据存入SQLite数据库:

import os

# This cleans up the database in case it exists at this point
try:
    os.remove("kedro.db")
except FileNotFoundError:
    pass

catalog.save("cars_table", cars)

# rank scooters by their mpg
ranked = catalog.load("scooters_query")[["brand", "mpg"]]

如何以Parquet格式保存数据

将处理后的数据以Parquet格式保存:

catalog.save("ranked", ranked)

警告

不允许将None保存到数据集!

如何使用凭据访问数据集

在实例化DataCatalog之前,Kedro会首先尝试从项目配置中读取凭证。然后将生成的字典作为credentials参数传递给DataCatalog.from_config()

假设项目中包含文件 conf/local/credentials.yml,其内容如下:

dev_s3:
  client_kwargs:
    aws_access_key_id: key
    aws_secret_access_key: secret

scooters_credentials:
  con: sqlite:///kedro.db

my_gcp_credentials:
  id_token: key

您的代码将如下所示:

CSVDataset(
    filepath="s3://test_bucket/data/02_intermediate/company/motorbikes.csv",
    load_args=dict(sep=",", skiprows=5, skipfooter=1, na_values=["#NA", "NA"]),
    credentials=dict(key="token", secret="key"),
)

如何使用Code API对数据集进行版本控制

在文档的前面部分,我们描述了Kedro如何实现数据集和机器学习模型版本控制

如果需要以编程方式控制特定数据集的加载和保存版本,可以实例化Version并将其作为参数传递给数据集初始化:

from kedro.io import DataCatalog, Version
from kedro_datasets.pandas import CSVDataset
import pandas as pd

data1 = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
data2 = pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]})
version = Version(
    load=None,  # load the latest available version
    save=None,  # generate save version automatically on each save operation
)

test_dataset = CSVDataset(
    filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
catalog =  DataCatalog({"test_dataset": test_dataset})

# save the dataset to data/01_raw/test.csv/<version>/test.csv
catalog.save("test_dataset", data1)
# save the dataset into a new file data/01_raw/test.csv/<version>/test.csv
catalog.save("test_dataset", data2)

# load the latest version from data/test.csv/*/test.csv
reloaded = catalog.load("test_dataset")
assert data2.equals(reloaded)

在上面的示例中,我们没有固定任何版本。当我们设置版本时,加载和保存操作的行为会略有不同:

version = Version(
    load="my_exact_version",  # load exact version
    save="my_exact_version",  # save to exact version
)

test_dataset = CSVDataset(
    filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
catalog =  DataCatalog({"test_dataset": test_dataset})

# save the dataset to data/01_raw/test.csv/my_exact_version/test.csv
catalog.save("test_dataset", data1)
# load from data/01_raw/test.csv/my_exact_version/test.csv
reloaded = catalog.load("test_dataset")
assert data1.equals(reloaded)

# raises DatasetError since the path
# data/01_raw/test.csv/my_exact_version/test.csv already exists
catalog.save("test_dataset", data2)

我们不建议传递确切的加载或保存版本号,因为这可能导致操作间出现不一致。例如,如果加载和保存操作的版本号不匹配,保存操作将产生UserWarning警告。

想象一个包含两个节点的简单管道,其中B节点接收来自A节点的输出。如果你为B节点指定加载的数据版本为my_data_2023_08_16.csv,那么A节点生成的数据(my_data_20230818.csv)将不会被使用。

Node_A -> my_data_20230818.csv
my_data_2023_08_16.csv -> Node B

在代码中:

version = Version(
    load="my_data_2023_08_16.csv",  # load exact version
    save="my_data_20230818.csv",  # save to exact version
)

test_dataset = CSVDataset(
    filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
catalog =  DataCatalog({"test_dataset": test_dataset})

catalog.save("test_dataset", data1)  # emits a UserWarning due to version inconsistency

# raises DatasetError since the data/01_raw/test.csv/exact_load_version/test.csv
# file does not exist
reloaded = catalog.load("test_dataset")