Kedro数据集工厂

您可以使用数据集工厂加载具有相似配置的多个数据集,该功能在Kedro 0.18.12版本中引入。

警告

从Kedro 0.19.0版本开始,数据集不再包含在核心Kedro包中。请改为从kedro-datasets包中导入它们。 从kedro-datasets2.0.0版本开始,所有数据集名称都已更改,将"DataSet"中的大写字母"S"替换为小写字母"s"。例如,CSVDataSet现在改为CSVDataset

数据集工厂引入了一种语法,通过将项目中管道使用的数据集与数据集工厂模式进行匹配,可以泛化您的配置并减少类似目录条目的数量。

例如:

factory_data:
  type: pandas.CSVDataset
  filepath: data/01_raw/factory_data.csv

process_data:
  type: pandas.CSVDataset
  filepath: data/01_raw/process_data.csv

使用数据集工厂,可以将其重写为:

"{name}_data":
  type: pandas.CSVDataset
  filepath: data/01_raw/{name}_data.csv

在运行时,该模式将与inputsoutputs中定义的数据集名称进行匹配。

node(
    func=process_factory,
    inputs="factory_data",
    outputs="process_data",
),

...

注意

工厂模式必须始终用引号括起来,以避免YAML解析错误。

数据集工厂类似于正则表达式,您可以将其视为反向的f-string。在这种情况下,输入数据集factory_data的名称匹配带有_data后缀的模式{name}_data,因此它将name解析为factory。同样地,对于输出数据集process_data,它将name解析为process

这允许您使用一个数据集工厂模式来替换多个数据集条目。它保持您的目录简洁,并且您可以使用相似的名称、类型或命名空间来泛化数据集。

如何泛化同一类型的数据集

您还可以将所有具有相同类型和配置细节的数据集合并。例如,考虑以下包含三个数据集的目录,这些数据集分别名为boatscarsplanes,类型均为pandas.CSVDataset

boats:
  type: pandas.CSVDataset
  filepath: data/01_raw/shuttles.csv

cars:
  type: pandas.CSVDataset
  filepath: data/01_raw/reviews.csv

planes:
  type: pandas.CSVDataset
  filepath: data/01_raw/companies.csv

这些数据集可以组合成以下数据集工厂:

"{dataset_name}#csv":
  type: pandas.CSVDataset
  filepath: data/01_raw/{dataset_name}.csv

接下来您需要更新项目中位于src///pipeline.py的管道文件,将这些数据集引用为boats#csvcars#csvplanes#csv。在数据集名称和数据集工厂模式中添加后缀或前缀(例如这里的#csv)可以确保数据集名称与预期模式匹配。

from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_boats,
                inputs="boats#csv",
                outputs="preprocessed_boats",
                name="preprocess_boats_node",
            ),
            node(
                func=preprocess_cars,
                inputs="cars#csv",
                outputs="preprocessed_cars",
                name="preprocess_cars_node",
            ),
            node(
                func=preprocess_planes,
                inputs="planes#csv",
                outputs="preprocessed_planes",
                name="preprocess_planes_node",
            ),
            node(
                func=create_model_input_table,
                inputs=[
                    "preprocessed_boats",
                    "preprocessed_planes",
                    "preprocessed_cars",
                ],
                outputs="model_input_table",
                name="create_model_input_table_node",
            ),
        ]
    )

如何使用命名空间泛化数据集

您还可以为属于命名空间模块化流水线的数据集泛化目录条目。考虑以下流水线,它接收一个model_input_table并输出属于active_modelling_pipelinecandidate_modelling_pipeline命名空间的两个回归器:

from kedro.pipeline import Pipeline, node, pipeline

from .nodes import evaluate_model, split_data, train_model


def create_pipeline(**kwargs) -> Pipeline:
    pipeline_instance = pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "y_train"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
        ]
    )
    ds_pipeline_1 = pipeline(
        pipe=pipeline_instance,
        inputs="model_input_table",
        namespace="active_modelling_pipeline",
    )
    ds_pipeline_2 = pipeline(
        pipe=pipeline_instance,
        inputs="model_input_table",
        namespace="candidate_modelling_pipeline",
    )

    return ds_pipeline_1 + ds_pipeline_2

现在您可以在目录中使用单一数据集工厂模式,而无需为active_modelling_pipeline.regressorcandidate_modelling_pipeline.regressor分别创建两个独立条目,如下所示:

"{namespace}.regressor":
  type: pickle.PickleDataset
  filepath: data/06_models/regressor_{namespace}.pkl
  versioned: true

如何在不同层级中泛化相同类型的数据集

您可以在同一模式中使用多个占位符。例如,考虑以下目录,其中数据集条目共享typefile_formatsave_args

processing.factory_data:
  type: spark.SparkDataset
  filepath: data/processing/factory_data.parquet
  file_format: parquet
  save_args:
    mode: overwrite

processing.process_data:
  type: spark.SparkDataset
  filepath: data/processing/process_data.parquet
  file_format: parquet
  save_args:
    mode: overwrite

modelling.metrics:
  type: spark.SparkDataset
  filepath: data/modelling/factory_data.parquet
  file_format: parquet
  save_args:
    mode: overwrite

这可以概括为以下模式:

"{layer}.{dataset_name}":
  type: spark.SparkDataset
  filepath: data/{layer}/{dataset_name}.parquet
  file_format: parquet
  save_args:
    mode: overwrite

目录条目主体中使用的所有占位符必须存在于工厂模式名称中。

如何使用多个数据集工厂泛化数据集

您可以在目录中拥有多个数据集工厂。例如:

"{namespace}.{dataset_name}@spark":
  type: spark.SparkDataset
  filepath: data/{namespace}/{dataset_name}.parquet
  file_format: parquet

"{dataset_name}@csv":
  type: pandas.CSVDataset
  filepath: data/01_raw/{dataset_name}.csv

在您的目录中拥有多个数据集工厂可能会导致管道中的数据集名称匹配多个模式的情况。为了解决这个问题,Kedro会对管道中数据集名称的所有潜在匹配项进行排序,并选择最佳匹配项。匹配项根据以下标准进行排名:

  1. 数据集名称与工厂模式之间的精确字符匹配数量。例如,名为factory_data$csv的数据集会优先匹配{dataset}_data$csv而非{dataset_name}$csv

  2. 占位符的数量。例如,数据集 preprocessing.shuttles+csv 会优先匹配 {namespace}.{dataset}+csv 而不是 {dataset}+csv

  3. 按字母顺序排列

如何使用数据集工厂覆盖默认数据集创建

您可以使用数据集工厂定义一个通配模式,这将覆盖默认的MemoryDataset创建方式。

"{default_dataset}":
  type: pandas.CSVDataset
  filepath: data/{default_dataset}.csv

Kedro现在会将项目中所有在流水线中提及但未在目录中以特定模式或显式条目出现的数据集视为pandas.CSVDataset

数据集工厂的CLI命令

为了管理您的数据集工厂,Kedro CLI新增了两个命令:kedro catalog rank (0.18.12) 和 kedro catalog resolve (0.18.13)。

如何使用 kedro catalog rank

该命令会输出目录中所有数据集工厂的列表,按照流水线数据集与它们匹配的顺序进行排序。排序依据以下标准确定:

  1. 模式中非占位符字符的数量

  2. 模式中的占位符数量

  3. 字母顺序排列

考虑一个包含以下模式的目录文件:

Click to expand
"{layer}.{dataset_name}":
  type: pandas.CSVDataset
  filepath: data/{layer}/{dataset_name}.csv

"preprocessed_{dataset_name}":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/preprocessed_{dataset_name}.parquet

"processed_{dataset_name}":
  type: pandas.ParquetDataset
  filepath: data/03_primary/processed_{dataset_name}.parquet

"{dataset_name}_csv":
  type: pandas.CSVDataset
  filepath: data/03_primary/{dataset_name}.csv

"{namespace}.{dataset_name}_pq":
  type: pandas.ParquetDataset
  filepath: data/03_primary/{dataset_name}_{namespace}.parquet

"{default_dataset}":
  type: pickle.PickleDataset
  filepath: data/01_raw/{default_dataset}.pickle

运行 kedro catalog rank 将产生以下输出:

- 'preprocessed_{dataset_name}'
- 'processed_{dataset_name}'
- '{namespace}.{dataset_name}_pq'
- '{dataset_name}_csv'
- '{layer}.{dataset_name}'
- '{default_dataset}'

我们可以看到,条目首先按照模式中包含多少非占位符进行降序排序。当两个条目具有相同数量的非占位符字符时(例如{namespace}.{dataset_name}_pq{dataset_name}_csv各有四个),它们会再按占位符数量进行降序排序。{default_dataset}是最不具体的模式,总是最后进行匹配。

如何使用 kedro catalog resolve

该命令将解析目录中的数据集模式与项目流水线中的任何显式数据集条目进行匹配。最终输出包含目录中的所有显式数据集条目,以及默认流水线中解析某些数据集模式的任何数据集。 此命令与运行器无关,因此不会考虑运行器中定义的任何默认数据集创建。

为了说明这一点,请考虑以下目录文件:

Click to expand
companies:
  type: pandas.CSVDataset
  filepath: data/01_raw/companies.csv

reviews:
  type: pandas.CSVDataset
  filepath: data/01_raw/reviews.csv

shuttles:
  type: pandas.ExcelDataset
  filepath: data/01_raw/shuttles.xlsx
  load_args:
    engine: openpyxl # Use modern Excel engine, it is the default since Kedro 0.18.0

"preprocessed_{name}":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/preprocessed_{name}.parquet

"{default}":
  type: pandas.ParquetDataset
  filepath: data/03_primary/{default}.parquet

以及pipeline.py中的以下流水线:

Click to expand
def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
            node(
                func=create_model_input_table,
                inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                outputs="model_input_table",
                name="create_model_input_table_node",
            ),
        ]
    )

该命令解析后的目录输出如下:

Click to expand
companies:
  filepath: data/01_raw/companies.csv
  type: pandas.CSVDataset
model_input_table:
  filepath: data/03_primary/model_input_table.parquet
  type: pandas.ParquetDataset
preprocessed_companies:
  filepath: data/02_intermediate/preprocessed_companies.parquet
  type: pandas.ParquetDataset
preprocessed_shuttles:
  filepath: data/02_intermediate/preprocessed_shuttles.parquet
  type: pandas.ParquetDataset
reviews:
  filepath: data/01_raw/reviews.csv
  type: pandas.CSVDataset
shuttles:
  filepath: data/01_raw/shuttles.xlsx
  load_args:
    engine: openpyxl
  type: pandas.ExcelDataset

默认情况下,输出会显示在终端。但如果您希望将解析后的目录输出到指定文件,可以使用重定向操作符 >

kedro catalog resolve > output_file.yaml