创建数据处理管道¶
本节将解释以下内容:
如何从Python函数创建Kedro节点
如何从一组节点构建Kedro管道
如何通过将数据集注册到数据目录中,来持久化或保存从管道输出的数据集
如何运行管道
简介¶
数据处理流水线通过合并数据集创建模型输入表,为模型构建准备数据。该数据处理流水线由以下部分组成:
位于
src/spaceflights/pipelines/data_processing目录下的两个python文件nodes.py(包含构成数据处理流程的节点函数)pipeline.py(用于构建流水线)
一个yaml文件:
conf/base/parameters_data_processing.yml用于定义运行管道时使用的参数__init__.py文件放在必要的文件夹中,以确保Python可以导入管道
注意
Kedro提供了kedro pipeline create命令来为新管道添加骨架代码。如果您是从头开始编写项目并想添加新管道,请在终端运行以下命令:kedro pipeline create 。在spaceflights示例中您不需要执行此操作,因为起始项目已提供该管道。
观看视频¶
这个实践视频课程将带您逐步了解太空飞行数据的数据探索和数据处理。播放列表中包含多个视频,涵盖以下主题:
数据预处理节点函数¶
第一步是对两个数据集进行预处理,分别是companies.csv和shuttles.xlsx。节点的预处理代码位于src/spaceflights/pipelines/data_processing/nodes.py中,包含两个函数(preprocess_companies和preprocess_shuttles)。每个函数接收一个原始DataFrame作为输入,将多列数据转换为不同类型,最终输出包含预处理数据的DataFrame:
Click to expand
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.
Args:
companies: Raw data.
Returns:
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.
Args:
shuttles: Raw data.
Returns:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
数据处理流水线¶
接下来,查看src/spaceflights/pipelines/data_processing/pipeline.py文件,它为上面定义的每个函数构建一个node节点,并创建一个用于数据处理modular pipeline模块化流水线:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import preprocess_companies, preprocess_shuttles
...
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
...,
]
)
请注意,inputs语句中关于companies和shuttles的引用指向的是conf/base/catalog.yml中定义的数据集。它们是preprocess_companies和preprocess_shuttles函数的输入参数。Kedro利用这些命名的节点输入(及输出)来确定节点间的依赖关系及其执行顺序。
测试示例¶
在终端窗口中运行以下命令来测试名为preprocess_companies_node的节点:
kedro run --nodes=preprocess_companies_node
您应该会看到类似以下的输出:
Click to expand
[08/09/22 16:43:11] INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataset)... data_catalog.py:382
INFO Completed 1 out of 1 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataset)... data_catalog.py:343
你可以类似地运行preprocess_shuttles节点。要同时测试这两个节点作为完整的数据处理流水线:
kedro run
你也可以依次命名每个节点来运行它们,如下所示:
kedro run --nodes=preprocess_companies_node,preprocess_shuttles_node
您应该会看到类似以下的输出:
Click to expand
INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataset)... data_catalog.py:382
INFO Completed 1 out of 2 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataset)... data_catalog.py:343
[08/09/22 16:46:08] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:382
INFO Completed 2 out of 2 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:343
预处理数据注册¶
每个节点都会输出一个新的数据集(preprocessed_companies 和 preprocessed_shuttles)。Kedro 会将这些输出以 Parquet 格式 pandas.ParquetDataset 保存,因为它们已在 数据目录 中注册,如您在 conf/base/catalog.yml 中所见:
Click to expand
preprocessed_companies:
type: pandas.ParquetDataset
filepath: data/02_intermediate/preprocessed_companies.parquet
preprocessed_shuttles:
type: pandas.ParquetDataset
filepath: data/02_intermediate/preprocessed_shuttles.parquet
如果从catalog.yml中移除这些行,Kedro仍能成功运行管道,并自动将预处理数据作为MemoryDataset类的临时Python对象存储在内存中。一旦所有依赖临时数据集的节点执行完毕,Kedro会清除该数据集,Python垃圾回收器将释放内存。
为模型输入创建表格¶
下一步添加另一个节点,将三个数据集(preprocessed_shuttles、preprocessed_companies和reviews)合并为一个模型输入表,保存为model_input_table。
create_model_input_table() 函数的代码位于 src/spaceflights/pipelines/data_processing/nodes.py 文件中:
Click to expand
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
"""Combines all data to create a model input table.
Args:
shuttles: Preprocessed data for shuttles.
companies: Preprocessed data for companies.
reviews: Raw data for reviews.
Returns:
model input table.
"""
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
rated_shuttles = rated_shuttles.drop("id", axis=1)
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
节点创建于 src/kedro_tutorial/pipelines/data_processing/pipeline.py:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
模型输入表注册¶
在conf/base/catalog.yml中的以下条目将模型输入表数据集保存到文件(位于data/03_primary目录中):
model_input_table:
type: pandas.ParquetDataset
filepath: data/03_primary/model_input_table.parquet
再次测试示例¶
测试示例的进度:
kedro run
您应该会看到类似以下的输出:
Click to expand
[08/09/22 17:01:10] INFO Reached after_catalog_created hook plugin.py:17
INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataset)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataset)... data_catalog.py:343
[08/09/22 17:01:25] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataset)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO Saving data to 'model_input_table' (MemoryDataset)... data_catalog.py:382
[08/09/22 17:01:29] INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'model_input_table' (MemoryDataset)... data_catalog.py:343
可视化项目¶
本节介绍如何使用Kedro-Viz进行项目可视化,这是一个独立于标准Kedro安装的软件包。要在虚拟环境中安装它:
pip install kedro-viz
要启动Kedro-Viz,请在终端输入以下命令:
kedro viz run
该命令会自动打开浏览器标签页,在http://127.0.0.1:4141/地址提供可视化界面。您可以自由探索可视化内容,更多详情请参阅Kedro-Viz文档。
要退出,请关闭浏览器标签页。要重新控制终端,在Mac上输入^+c或在Windows或Linux机器上输入Ctrl+c。
观看视频¶
检查点¶
这是一个绝佳的时机,可以稍作停顿,总结一下到目前为止你在示例中所看到的内容。

照片由 Malte Helmhold 拍摄,发布于 Unsplash
如何从starter创建新的Kedro项目并安装其依赖项
如何向项目添加三个数据集并设置Kedro数据目录
如何创建一个包含三个节点的数据处理流水线,用于转换和合并输入数据集并生成模型输入表
如何通过将数据集注册到数据目录来持久化管道的输出
如何可视化项目
下一步是为航天飞行价格预测创建数据科学流水线。