创建数据处理管道

本节将解释以下内容:

  • 如何从Python函数创建Kedro节点

  • 如何从一组节点构建Kedro管道

  • 如何通过将数据集注册到数据目录中,来持久化或保存从管道输出的数据集

  • 如何运行管道

简介

数据处理流水线通过合并数据集创建模型输入表,为模型构建准备数据。该数据处理流水线由以下部分组成:

  • 位于src/spaceflights/pipelines/data_processing目录下的两个python文件

    • nodes.py (包含构成数据处理流程的节点函数)

    • pipeline.py (用于构建流水线)

  • 一个yaml文件: conf/base/parameters_data_processing.yml 用于定义运行管道时使用的参数

  • __init__.py 文件放在必要的文件夹中,以确保Python可以导入管道

注意

Kedro提供了kedro pipeline create命令来为新管道添加骨架代码。如果您是从头开始编写项目并想添加新管道,请在终端运行以下命令:kedro pipeline create 。在spaceflights示例中您不需要执行此操作,因为起始项目已提供该管道。

观看视频

这个实践视频课程将带您逐步了解太空飞行数据的数据探索和数据处理。播放列表中包含多个视频,涵盖以下主题:

数据预处理节点函数

第一步是对两个数据集进行预处理,分别是companies.csvshuttles.xlsx。节点的预处理代码位于src/spaceflights/pipelines/data_processing/nodes.py中,包含两个函数(preprocess_companiespreprocess_shuttles)。每个函数接收一个原始DataFrame作为输入,将多列数据转换为不同类型,最终输出包含预处理数据的DataFrame:

Click to expand
import pandas as pd


def _is_true(x: pd.Series) -> pd.Series:
    return x == "t"


def _parse_percentage(x: pd.Series) -> pd.Series:
    x = x.str.replace("%", "")
    x = x.astype(float) / 100
    return x


def _parse_money(x: pd.Series) -> pd.Series:
    x = x.str.replace("$", "").str.replace(",", "")
    x = x.astype(float)
    return x


def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data, with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    companies["iata_approved"] = _is_true(companies["iata_approved"])
    companies["company_rating"] = _parse_percentage(companies["company_rating"])
    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data, with `price` converted to a float and `d_check_complete`,
        `moon_clearance_complete` converted to boolean.
    """
    shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    shuttles["price"] = _parse_money(shuttles["price"])
    return shuttles

数据处理流水线

接下来,查看src/spaceflights/pipelines/data_processing/pipeline.py文件,它为上面定义的每个函数构建一个node节点,并创建一个用于数据处理modular pipeline模块化流水线:

Click to expand
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import preprocess_companies, preprocess_shuttles

...


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
            ...,
        ]
    )

请注意,inputs语句中关于companiesshuttles的引用指向的是conf/base/catalog.yml中定义的数据集。它们是preprocess_companiespreprocess_shuttles函数的输入参数。Kedro利用这些命名的节点输入(及输出)来确定节点间的依赖关系及其执行顺序。

测试示例

在终端窗口中运行以下命令来测试名为preprocess_companies_node的节点:

kedro run --nodes=preprocess_companies_node

您应该会看到类似以下的输出:

Click to expand
[08/09/22 16:43:11] INFO     Loading data from 'companies' (CSVDataset)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataset)...      data_catalog.py:382
                    INFO     Completed 1 out of 1 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataset)...   data_catalog.py:343

你可以类似地运行preprocess_shuttles节点。要同时测试这两个节点作为完整的数据处理流水线:

kedro run

你也可以依次命名每个节点来运行它们,如下所示:

kedro run --nodes=preprocess_companies_node,preprocess_shuttles_node

您应该会看到类似以下的输出:

Click to expand
                    INFO     Loading data from 'companies' (CSVDataset)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataset)...      data_catalog.py:382
                    INFO     Completed 1 out of 2 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataset)...                  data_catalog.py:343
[08/09/22 16:46:08] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
                             -> [preprocessed_shuttles]
                    INFO     Saving data to 'preprocessed_shuttles' (MemoryDataset)...       data_catalog.py:382
                    INFO     Completed 2 out of 2 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataset)...   data_catalog.py:343
                    INFO     Loading data from 'preprocessed_shuttles' (MemoryDataset)...    data_catalog.py:343

预处理数据注册

每个节点都会输出一个新的数据集(preprocessed_companiespreprocessed_shuttles)。Kedro 会将这些输出以 Parquet 格式 pandas.ParquetDataset 保存,因为它们已在 数据目录 中注册,如您在 conf/base/catalog.yml 中所见:

Click to expand
preprocessed_companies:
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/preprocessed_companies.parquet

preprocessed_shuttles:
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/preprocessed_shuttles.parquet

如果从catalog.yml中移除这些行,Kedro仍能成功运行管道,并自动将预处理数据作为MemoryDataset类的临时Python对象存储在内存中。一旦所有依赖临时数据集的节点执行完毕,Kedro会清除该数据集,Python垃圾回收器将释放内存。

为模型输入创建表格

下一步添加另一个节点,将三个数据集(preprocessed_shuttlespreprocessed_companiesreviews)合并为一个模型输入表,保存为model_input_table

create_model_input_table() 函数的代码位于 src/spaceflights/pipelines/data_processing/nodes.py 文件中:

Click to expand
def create_model_input_table(
    shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
    """Combines all data to create a model input table.

    Args:
        shuttles: Preprocessed data for shuttles.
        companies: Preprocessed data for companies.
        reviews: Raw data for reviews.
    Returns:
        model input table.

    """
    rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
    rated_shuttles = rated_shuttles.drop("id", axis=1)
    model_input_table = rated_shuttles.merge(
        companies, left_on="company_id", right_on="id"
    )
    model_input_table = model_input_table.dropna()
    return model_input_table

节点创建于 src/kedro_tutorial/pipelines/data_processing/pipeline.py:

Click to expand
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
            node(
                func=create_model_input_table,
                inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                outputs="model_input_table",
                name="create_model_input_table_node",
            ),
        ]
    )

模型输入表注册

conf/base/catalog.yml中的以下条目将模型输入表数据集保存到文件(位于data/03_primary目录中):

model_input_table:
  type: pandas.ParquetDataset
  filepath: data/03_primary/model_input_table.parquet

再次测试示例

测试示例的进度:

kedro run

您应该会看到类似以下的输出:

Click to expand
[08/09/22 17:01:10] INFO     Reached after_catalog_created hook                                     plugin.py:17
                    INFO     Loading data from 'companies' (CSVDataset)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataset)...      data_catalog.py:382
                    INFO     Completed 1 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataset)...                  data_catalog.py:343
[08/09/22 17:01:25] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
                             -> [preprocessed_shuttles]

                    INFO     Saving data to 'preprocessed_shuttles' (MemoryDataset)...       data_catalog.py:382
                    INFO     Completed 2 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (MemoryDataset)...    data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataset)...   data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataset)...                     data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                            node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,
                             reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO     Saving data to 'model_input_table' (MemoryDataset)...           data_catalog.py:382
[08/09/22 17:01:29] INFO     Completed 3 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89
                    INFO     Loading data from 'model_input_table' (MemoryDataset)...        data_catalog.py:343

可视化项目

本节介绍如何使用Kedro-Viz进行项目可视化,这是一个独立于标准Kedro安装的软件包。要在虚拟环境中安装它:

pip install kedro-viz

要启动Kedro-Viz,请在终端输入以下命令:

kedro viz run

该命令会自动打开浏览器标签页,在http://127.0.0.1:4141/地址提供可视化界面。您可以自由探索可视化内容,更多详情请参阅Kedro-Viz文档

要退出,请关闭浏览器标签页。要重新控制终端,在Mac上输入^+c或在Windows或Linux机器上输入Ctrl+c

观看视频

检查点

这是一个绝佳的时机,可以稍作停顿,总结一下到目前为止你在示例中所看到的内容。

照片由 Malte Helmhold 拍摄,发布于 Unsplash

  • 如何从starter创建新的Kedro项目并安装其依赖项

  • 如何向项目添加三个数据集并设置Kedro数据目录

  • 如何创建一个包含三个节点的数据处理流水线,用于转换和合并输入数据集并生成模型输入表

  • 如何通过将数据集注册到数据目录来持久化管道的输出

  • 如何可视化项目

下一步是为航天飞行价格预测创建数据科学流水线。