数据目录YAML示例¶
本页面提供了一系列示例,帮助您在conf/base/catalog.yml或conf/local/catalog.yml中构建YAML配置文件。
警告
从Kedro 0.19.0版本开始,数据集不再包含在核心Kedro包中。请改为从kedro-datasets包中导入它们。
从kedro-datasets的2.0.0版本开始,所有数据集名称都已更改,将"DataSet"中的大写字母"S"替换为小写字母"s"。例如,CSVDataSet现在改为CSVDataset。
使用utf-8编码从本地二进制文件加载数据¶
open_args_load 和 open_args_save 参数会被传递给文件系统的 open 方法,用于分别在加载或保存操作期间配置数据集文件(位于特定文件系统上)的打开方式。
test_dataset:
type: ...
fs_args:
open_args_load:
mode: "rb"
encoding: "utf-8"
load_args 和 save_args 用于配置第三方库(例如 CSVDataset 使用的 pandas)如何从文件加载数据或将数据保存到文件。
使用utf-8编码将数据保存为不带行名(索引)的CSV文件¶
test_dataset:
type: pandas.CSVDataset
...
save_args:
index: False
encoding: "utf-8"
从/向本地文件系统加载/保存CSV文件¶
bikes:
type: pandas.CSVDataset
filepath: data/01_raw/bikes.csv
使用指定的加载/保存参数,在本地文件系统上加载/保存CSV文件¶
cars:
type: pandas.CSVDataset
filepath: data/01_raw/company/cars.csv
load_args:
sep: ','
save_args:
index: False
date_format: '%Y-%m-%d %H:%M'
decimal: .
在本地文件系统上加载/保存压缩的CSV文件¶
boats:
type: pandas.CSVDataset
filepath: data/01_raw/company/boats.csv.gz
load_args:
sep: ','
compression: 'gzip'
fs_args:
open_args_load:
mode: 'rb'
从特定S3存储桶加载CSV文件,使用凭证和加载参数¶
motorbikes:
type: pandas.CSVDataset
filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
credentials: dev_s3
load_args:
sep: ','
skiprows: 5
skipfooter: 1
na_values: ['#NA', NA]
从/向本地文件系统加载/保存pickle文件¶
airplanes:
type: pickle.PickleDataset
filepath: data/06_models/airplanes.pkl
backend: pickle
从Google云存储加载Excel文件¶
示例中包含Google Cloud Storage (GCS)中底层文件系统类(GCSFileSystem)的project值
rockets:
type: pandas.ExcelDataset
filepath: gcs://your_bucket/data/02_intermediate/company/motorbikes.xlsx
fs_args:
project: my-project
credentials: my_gcp_credentials
save_args:
sheet_name: Sheet1
从本地文件系统加载多工作表Excel文件¶
trains:
type: pandas.ExcelDataset
filepath: data/02_intermediate/company/trains.xlsx
load_args:
sheet_name: [Sheet1, Sheet2, Sheet3]
在Google云存储上保存使用Matplotlib创建的图像¶
results_plot:
type: matplotlib.MatplotlibWriter
filepath: gcs://your_bucket/data/08_results/plots/output_1.jpeg
fs_args:
project: my-project
credentials: my_gcp_credentials
在本地文件系统存储上加载/保存HDF文件,使用指定的加载/保存参数¶
skateboards:
type: pandas.HDFDataset
filepath: data/02_intermediate/skateboards.hdf
key: name
load_args:
columns: [brand, length]
save_args:
mode: w # Overwrite even when the file already exists
dropna: True
使用指定的加载/保存参数,在本地文件系统存储上加载/保存parquet文件¶
trucks:
type: pandas.ParquetDataset
filepath: data/02_intermediate/trucks.parquet
load_args:
columns: [name, gear, disp, wt]
categories: list
index: name
save_args:
compression: GZIP
file_scheme: hive
has_nulls: False
partition_on: [name]
在S3上加载/保存Spark表,使用指定的加载/保存参数¶
weather:
type: spark.SparkDataset
filepath: s3a://your_bucket/data/01_raw/weather*
credentials: dev_s3
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True
使用凭据、数据库连接及指定的加载/保存参数来加载/保存SQL表¶
scooters:
type: pandas.SQLTableDataset
credentials: scooters_credentials
table_name: scooters
load_args:
index_col: [name]
columns: [name, gear]
save_args:
if_exists: replace
使用凭据和数据库连接加载SQL表,并对该表应用SQL查询¶
scooters_query:
type: pandas.SQLQueryDataset
credentials: scooters_credentials
sql: select * from cars where gear=4
load_args:
index_col: [name]
当你使用pandas.SQLTableDataset或pandas.SQLQueryDataset时,必须提供数据库连接字符串。在上面的示例中,我们通过凭据中的scooters_credentials键来传递它。
scooters_credentials 必须包含一个顶级键 con,其中包含一个 SQLAlchemy兼容 的连接字符串。作为凭证的替代方案,您可以显式地将 con 放入 load_args 和 save_args 中(仅适用于 pandas.SQLTableDataset)。
从API端点加载数据¶
本示例使用美国农业部(USDA)提供的玉米产量数据。
us_corn_yield_data:
type: api.APIDataset
url: https://quickstats.nass.usda.gov
credentials: usda_credentials
params:
key: SOME_TOKEN
format: JSON
commodity_desc: CORN
statisticcat_des: YIELD
agg_level_desc: STATE
year: 2000
usda_credentials 将作为 auth 参数传递给 requests 库。请在您的 credentials.yml 文件中按以下方式将用户名和密码指定为列表:
usda_credentials:
- username
- password
从MinIO(兼容S3的存储)加载数据¶
test:
type: pandas.CSVDataset
filepath: s3://your_bucket/test.csv # assume `test.csv` is uploaded to the MinIO server.
credentials: dev_minio
在credentials.yml中,按如下方式定义key、secret和endpoint_url:
dev_minio:
key: token
secret: key
client_kwargs:
endpoint_url : 'http://localhost:9000'
注意
设置MinIO最简单的方法是运行一个Docker镜像。执行以下命令后,您可以通过http://localhost:9000访问MinIO服务器,并像在S3上一样创建存储桶和添加文件。
docker run -p 9000:9000 -e "MINIO_ACCESS_KEY=token" -e "MINIO_SECRET_KEY=key" minio/minio server /data
从Azure Blob Storage加载保存为pickle的模型¶
ml_model:
type: pickle.PickleDataset
filepath: "abfs://models/ml_models.pickle"
versioned: True
credentials: dev_abs
在credentials.yml文件中,定义account_name和account_key:
dev_abs:
account_name: accountname
account_key: key
通过SSH加载远程位置存储的CSV文件¶
注意
此示例需要安装Paramiko(pip install paramiko)。
cool_dataset:
type: pandas.CSVDataset
filepath: "sftp:///path/to/remote_cluster/cool_data.csv"
credentials: cluster_credentials
建立SFTP连接所需的所有参数可以通过fs_args或在credentials.yml文件中定义如下:
cluster_credentials:
username: my_username
host: host_address
port: 22
password: password
所有可用参数的列表可在Paramiko文档中查阅。
使用YAML锚点加载具有相似配置的多个数据集¶
不同的数据集可能使用相同的文件格式,共享相同的加载和保存参数,并存储在同一个文件夹中。YAML内置了一种语法用于分解YAML文件的部分内容,这意味着您可以决定哪些内容在数据集之间具有通用性,从而无需花费时间在catalog.yml文件中复制粘贴数据集配置。
您可以在以下示例中看到这一点:
_csv: &csv
type: spark.SparkDataset
file_format: csv
load_args:
sep: ','
na_values: ['#NA', NA]
header: True
inferSchema: False
cars:
<<: *csv
filepath: s3a://data/01_raw/cars.csv
trucks:
<<: *csv
filepath: s3a://data/01_raw/trucks.csv
bikes:
<<: *csv
filepath: s3a://data/01_raw/bikes.csv
load_args:
header: False
语法 &csv 将后续代码块命名为 csv,而语法 <<: *csv 则插入名为 csv 的代码块内容。如 bikes 所示,本地声明的键会完全覆盖插入的键。
注意
模板条目名称以_开头很重要,这样Kedro就知道不要尝试将其实例化为数据集。
你也可以嵌套可重用的YAML语法:
_csv: &csv
type: spark.SparkDataset
file_format: csv
load_args: &csv_load_args
header: True
inferSchema: False
airplanes:
<<: *csv
filepath: s3a://data/01_raw/airplanes.csv
load_args:
<<: *csv_load_args
sep: ;
在这个示例中,默认的csv配置被插入到airplanes中,然后load_args块被覆盖。通常情况下,这会替换整个字典。为了扩展load_args,该块的默认值随后会被重新插入。
使用不同数据集通过转码读取同一文件¶
您可能会遇到需要使用两种不同的Dataset实现读取同一文件的情况。通过使用转码技术,您可以定义指向相同filepath的多个DataCatalog条目来实现这一需求。
如何使用转码功能¶
考虑一个使用Parquet文件的示例。Parquet文件可以直接通过pandas.ParquetDataset和spark.SparkDataset加载。这种转换在协调Spark到pandas的工作流时很典型。
要将同一个文件同时加载为pandas.ParquetDataset和spark.SparkDataset,请在您的conf/base/catalog.yml中为同一个数据集定义两个DataCatalog条目:
my_dataframe@spark:
type: spark.SparkDataset
filepath: data/02_intermediate/data.parquet
file_format: parquet
my_dataframe@pandas:
type: pandas.ParquetDataset
filepath: data/02_intermediate/data.parquet
使用转码功能时,必须确保为每个目录条目定义的文件路径采用相同的格式(例如:CSV、JSON、Parquet)。然后可以在管道中按以下方式使用这些条目:
pipeline(
[
node(name="my_func1_node", func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"),
node(name="my_func2_node", func=my_func2, inputs="my_dataframe@pandas", outputs="pipeline_output"),
]
)
在这个示例中,Kedro能够识别my_dataframe是同一个数据集的不同格式(spark.SparkDataset和pandas.ParquetDataset),并据此解析节点的执行顺序。
在流水线中,Kedro使用spark.SparkDataset实现进行保存,并使用pandas.ParquetDataset进行加载,因此第一个节点输出的是pyspark.sql.DataFrame,而第二个节点接收的是pandas.Dataframe。
如何不使用转码¶
Kedro管道会自动解析节点执行顺序并检查以确保管道中没有循环依赖。正是在这个过程中,转码数据集被解析,转码标记@...会被移除。这意味着在管道内部,数据集my_dataframe@spark和my_dataframe@pandas被视为同一个my_dataframe数据集。然而,DataCatalog将转码条目视为独立的数据集,因为它们仅在管道解析过程中被解析。这会导致您在pipeline.py中定义的管道与Kedro运行的解析后管道之间存在差异,这些差异可能会导致意外行为。因此,在使用转码时需要注意这一点。
注意
以下是一些转码可能产生意外副作用并引发错误的示例。
定义具有相同输入和输出的节点¶
考虑以下流水线:
pipeline(
[
node(name="my_func1_node", func=my_func1, inputs="my_dataframe@pandas", outputs="my_dataframe@spark"),
]
)
在流水线解析过程中,上述节点被定义为将数据集my_dataset同时作为其输入和输出。由于节点不能拥有相同的输入和输出,尝试运行此流水线将失败并显示以下错误:
ValueError: Failed to create node my_func1([my_dataframe@pandas]) -> [my_dataframe@spark].
A node cannot have the same inputs and outputs even if they are transcoded: {'my_dataframe'}
通过CLI创建数据目录YAML配置文件¶
你可以使用kedro catalog create命令来创建数据目录YAML配置文件。
这会创建一个配置文件,其中包含MemoryDataset数据集,用于注册管道中每个数据集(如果该数据集在DataCatalog中缺失)。
# <conf_root>/<env>/catalog/<pipeline_name>.yml
rockets:
type: MemoryDataset
scooters:
type: MemoryDataset