数据目录YAML示例

本页面提供了一系列示例,帮助您在conf/base/catalog.ymlconf/local/catalog.yml中构建YAML配置文件。

警告

从Kedro 0.19.0版本开始,数据集不再包含在核心Kedro包中。请改为从kedro-datasets包中导入它们。 从kedro-datasets2.0.0版本开始,所有数据集名称都已更改,将"DataSet"中的大写字母"S"替换为小写字母"s"。例如,CSVDataSet现在改为CSVDataset

使用utf-8编码从本地二进制文件加载数据

open_args_loadopen_args_save 参数会被传递给文件系统的 open 方法,用于分别在加载或保存操作期间配置数据集文件(位于特定文件系统上)的打开方式。

test_dataset:
  type: ...
  fs_args:
    open_args_load:
      mode: "rb"
      encoding: "utf-8"

load_argssave_args 用于配置第三方库(例如 CSVDataset 使用的 pandas)如何从文件加载数据或将数据保存到文件。

使用utf-8编码将数据保存为不带行名(索引)的CSV文件

test_dataset:
  type: pandas.CSVDataset
  ...
  save_args:
    index: False
    encoding: "utf-8"

从/向本地文件系统加载/保存CSV文件

bikes:
  type: pandas.CSVDataset
  filepath: data/01_raw/bikes.csv

使用指定的加载/保存参数,在本地文件系统上加载/保存CSV文件

cars:
  type: pandas.CSVDataset
  filepath: data/01_raw/company/cars.csv
  load_args:
    sep: ','
  save_args:
    index: False
    date_format: '%Y-%m-%d %H:%M'
    decimal: .

在本地文件系统上加载/保存压缩的CSV文件

boats:
  type: pandas.CSVDataset
  filepath: data/01_raw/company/boats.csv.gz
  load_args:
    sep: ','
    compression: 'gzip'
  fs_args:
    open_args_load:
      mode: 'rb'

从特定S3存储桶加载CSV文件,使用凭证和加载参数

motorbikes:
  type: pandas.CSVDataset
  filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
  credentials: dev_s3
  load_args:
    sep: ','
    skiprows: 5
    skipfooter: 1
    na_values: ['#NA', NA]

从/向本地文件系统加载/保存pickle文件

airplanes:
  type: pickle.PickleDataset
  filepath: data/06_models/airplanes.pkl
  backend: pickle

从Google云存储加载Excel文件

示例中包含Google Cloud Storage (GCS)中底层文件系统类(GCSFileSystem)的project

rockets:
  type: pandas.ExcelDataset
  filepath: gcs://your_bucket/data/02_intermediate/company/motorbikes.xlsx
  fs_args:
    project: my-project
  credentials: my_gcp_credentials
  save_args:
    sheet_name: Sheet1

从本地文件系统加载多工作表Excel文件

trains:
  type: pandas.ExcelDataset
  filepath: data/02_intermediate/company/trains.xlsx
  load_args:
    sheet_name: [Sheet1, Sheet2, Sheet3]

在Google云存储上保存使用Matplotlib创建的图像

results_plot:
  type: matplotlib.MatplotlibWriter
  filepath: gcs://your_bucket/data/08_results/plots/output_1.jpeg
  fs_args:
    project: my-project
  credentials: my_gcp_credentials

在本地文件系统存储上加载/保存HDF文件,使用指定的加载/保存参数

skateboards:
  type: pandas.HDFDataset
  filepath: data/02_intermediate/skateboards.hdf
  key: name
  load_args:
    columns: [brand, length]
  save_args:
    mode: w  # Overwrite even when the file already exists
    dropna: True

使用指定的加载/保存参数,在本地文件系统存储上加载/保存parquet文件

trucks:
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/trucks.parquet
  load_args:
    columns: [name, gear, disp, wt]
    categories: list
    index: name
  save_args:
    compression: GZIP
    file_scheme: hive
    has_nulls: False
    partition_on: [name]

在S3上加载/保存Spark表,使用指定的加载/保存参数

weather:
  type: spark.SparkDataset
  filepath: s3a://your_bucket/data/01_raw/weather*
  credentials: dev_s3
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

使用凭据、数据库连接及指定的加载/保存参数来加载/保存SQL表

scooters:
  type: pandas.SQLTableDataset
  credentials: scooters_credentials
  table_name: scooters
  load_args:
    index_col: [name]
    columns: [name, gear]
  save_args:
    if_exists: replace

使用凭据和数据库连接加载SQL表,并对该表应用SQL查询

scooters_query:
  type: pandas.SQLQueryDataset
  credentials: scooters_credentials
  sql: select * from cars where gear=4
  load_args:
    index_col: [name]

当你使用pandas.SQLTableDatasetpandas.SQLQueryDataset时,必须提供数据库连接字符串。在上面的示例中,我们通过凭据中的scooters_credentials键来传递它。

scooters_credentials 必须包含一个顶级键 con,其中包含一个 SQLAlchemy兼容 的连接字符串。作为凭证的替代方案,您可以显式地将 con 放入 load_argssave_args 中(仅适用于 pandas.SQLTableDataset)。

从API端点加载数据

本示例使用美国农业部(USDA)提供的玉米产量数据。

us_corn_yield_data:
  type: api.APIDataset
  url: https://quickstats.nass.usda.gov
  credentials: usda_credentials
  params:
    key: SOME_TOKEN
    format: JSON
    commodity_desc: CORN
    statisticcat_des: YIELD
    agg_level_desc: STATE
    year: 2000

usda_credentials 将作为 auth 参数传递给 requests 库。请在您的 credentials.yml 文件中按以下方式将用户名和密码指定为列表:

usda_credentials:
  - username
  - password

从MinIO(兼容S3的存储)加载数据

test:
  type: pandas.CSVDataset
  filepath: s3://your_bucket/test.csv # assume `test.csv` is uploaded to the MinIO server.
  credentials: dev_minio

credentials.yml中,按如下方式定义keysecretendpoint_url

dev_minio:
  key: token
  secret: key
  client_kwargs:
    endpoint_url : 'http://localhost:9000'

注意

设置MinIO最简单的方法是运行一个Docker镜像。执行以下命令后,您可以通过http://localhost:9000访问MinIO服务器,并像在S3上一样创建存储桶和添加文件。

docker run -p 9000:9000 -e "MINIO_ACCESS_KEY=token" -e "MINIO_SECRET_KEY=key" minio/minio server /data

从Azure Blob Storage加载保存为pickle的模型

ml_model:
  type: pickle.PickleDataset
  filepath: "abfs://models/ml_models.pickle"
  versioned: True
  credentials: dev_abs

credentials.yml文件中,定义account_nameaccount_key

dev_abs:
  account_name: accountname
  account_key: key

通过SSH加载远程位置存储的CSV文件

注意

此示例需要安装Paramikopip install paramiko)。

cool_dataset:
  type: pandas.CSVDataset
  filepath: "sftp:///path/to/remote_cluster/cool_data.csv"
  credentials: cluster_credentials

建立SFTP连接所需的所有参数可以通过fs_args或在credentials.yml文件中定义如下:

cluster_credentials:
  username: my_username
  host: host_address
  port: 22
  password: password

所有可用参数的列表可在Paramiko文档中查阅。

使用YAML锚点加载具有相似配置的多个数据集

不同的数据集可能使用相同的文件格式,共享相同的加载和保存参数,并存储在同一个文件夹中。YAML内置了一种语法用于分解YAML文件的部分内容,这意味着您可以决定哪些内容在数据集之间具有通用性,从而无需花费时间在catalog.yml文件中复制粘贴数据集配置。

您可以在以下示例中看到这一点:

_csv: &csv
  type: spark.SparkDataset
  file_format: csv
  load_args:
    sep: ','
    na_values: ['#NA', NA]
    header: True
    inferSchema: False

cars:
  <<: *csv
  filepath: s3a://data/01_raw/cars.csv

trucks:
  <<: *csv
  filepath: s3a://data/01_raw/trucks.csv

bikes:
  <<: *csv
  filepath: s3a://data/01_raw/bikes.csv
  load_args:
    header: False

语法 &csv 将后续代码块命名为 csv,而语法 <<: *csv 则插入名为 csv 的代码块内容。如 bikes 所示,本地声明的键会完全覆盖插入的键。

注意

模板条目名称以_开头很重要,这样Kedro就知道不要尝试将其实例化为数据集。

你也可以嵌套可重用的YAML语法:

_csv: &csv
  type: spark.SparkDataset
  file_format: csv
  load_args: &csv_load_args
    header: True
    inferSchema: False

airplanes:
  <<: *csv
  filepath: s3a://data/01_raw/airplanes.csv
  load_args:
    <<: *csv_load_args
    sep: ;

在这个示例中,默认的csv配置被插入到airplanes中,然后load_args块被覆盖。通常情况下,这会替换整个字典。为了扩展load_args,该块的默认值随后会被重新插入。

使用不同数据集通过转码读取同一文件

您可能会遇到需要使用两种不同的Dataset实现读取同一文件的情况。通过使用转码技术,您可以定义指向相同filepath的多个DataCatalog条目来实现这一需求。

如何使用转码功能

考虑一个使用Parquet文件的示例。Parquet文件可以直接通过pandas.ParquetDatasetspark.SparkDataset加载。这种转换在协调Sparkpandas的工作流时很典型。

要将同一个文件同时加载为pandas.ParquetDatasetspark.SparkDataset,请在您的conf/base/catalog.yml中为同一个数据集定义两个DataCatalog条目:

my_dataframe@spark:
  type: spark.SparkDataset
  filepath: data/02_intermediate/data.parquet
  file_format: parquet

my_dataframe@pandas:
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/data.parquet

使用转码功能时,必须确保为每个目录条目定义的文件路径采用相同的格式(例如:CSV、JSON、Parquet)。然后可以在管道中按以下方式使用这些条目:

pipeline(
    [
        node(name="my_func1_node", func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"),
        node(name="my_func2_node", func=my_func2, inputs="my_dataframe@pandas", outputs="pipeline_output"),
    ]
)

在这个示例中,Kedro能够识别my_dataframe是同一个数据集的不同格式(spark.SparkDatasetpandas.ParquetDataset),并据此解析节点的执行顺序。

在流水线中,Kedro使用spark.SparkDataset实现进行保存,并使用pandas.ParquetDataset进行加载,因此第一个节点输出的是pyspark.sql.DataFrame,而第二个节点接收的是pandas.Dataframe

如何使用转码

Kedro管道会自动解析节点执行顺序并检查以确保管道中没有循环依赖。正是在这个过程中,转码数据集被解析,转码标记@...会被移除。这意味着在管道内部,数据集my_dataframe@sparkmy_dataframe@pandas被视为同一个my_dataframe数据集。然而,DataCatalog将转码条目视为独立的数据集,因为它们仅在管道解析过程中被解析。这会导致您在pipeline.py中定义的管道与Kedro运行的解析后管道之间存在差异,这些差异可能会导致意外行为。因此,在使用转码时需要注意这一点。

注意

以下是一些转码可能产生意外副作用并引发错误的示例。

定义具有相同输入和输出的节点

考虑以下流水线:

pipeline(
    [
        node(name="my_func1_node", func=my_func1, inputs="my_dataframe@pandas", outputs="my_dataframe@spark"),
    ]
)

在流水线解析过程中,上述节点被定义为将数据集my_dataset同时作为其输入和输出。由于节点不能拥有相同的输入和输出,尝试运行此流水线将失败并显示以下错误:

ValueError: Failed to create node my_func1([my_dataframe@pandas]) -> [my_dataframe@spark].
A node cannot have the same inputs and outputs even if they are transcoded: {'my_dataframe'}

定义多个共享相同输出的节点

考虑以下流水线:

pipeline(
    [
        node(name="my_func1_node", func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"),
        node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe@pandas"),
    ]
)

当此管道解析时,两个节点都被定义为返回相同的输出my_dataset,这是不允许的。运行管道将失败并显示以下错误:

kedro.pipeline.pipeline.OutputNotUniqueError: Output(s) ['my_dataframe'] are returned by more than one nodes. Node outputs must be unique.

创建具有隐藏依赖关系的管道

考虑以下流水线:

pipeline(
    [
        node(name="my_func1_node", func=my_func1, inputs="my_dataframe@spark", outputs="spark_output"),
        node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe@pandas"),
        node(name="my_func3_node", func=my_func3, inputs="my_dataframe@pandas", outputs="pandas_output"),
    ]
)

在这个示例中,节点my_func3_nodemy_func2_node之间存在单一依赖关系。然而,当解析这个流水线时,存在一些隐藏的依赖关系会限制节点的执行顺序。我们可以通过移除转码标记来暴露这些隐藏依赖:

resolved_pipeline(
    [
        node(name="my_func1_node", func=my_func1, inputs="my_dataframe", outputs="spark_output"),
        node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe"),
        node(name="my_func3_node", func=my_func3, inputs="my_dataframe", outputs="pandas_output"),
    ]
)

当节点顺序确定后,我们可以看到节点my_func1_node被视为依赖于节点my_func2_node。这个管道仍然可以无错误运行,但应当注意避免创建隐藏依赖关系,因为它们可能会降低性能,例如在使用ParallelRunner时。

通过CLI创建数据目录YAML配置文件

你可以使用kedro catalog create命令来创建数据目录YAML配置文件

这会创建一个//catalog/.yml配置文件,其中包含MemoryDataset数据集,用于注册管道中每个数据集(如果该数据集在DataCatalog中缺失)。

# <conf_root>/<env>/catalog/<pipeline_name>.yml
rockets:
  type: MemoryDataset
scooters:
  type: MemoryDataset