创建数据科学流水线¶
本节将解释以下内容:
如何为扩展默认项目流水线的数据科学代码添加第二个Kedro流水线
如何'切片'项目以仅运行整个流水线的一部分
(可选) 如何创建模块化流水线
(可选) 如何指定管道节点的运行方式:顺序执行或并行执行
数据科学节点¶
数据科学流水线使用了来自scikit-learn库的LinearRegression实现。
数据科学流水线由以下部分组成:
src/spaceflights/pipelines/data_science目录下的两个python文件nodes.py(包含构成数据处理流程的节点函数)pipeline.py(用于构建流水线)
一个yaml文件:
conf/base/parameters_data_science.yml用于定义运行管道时使用的参数__init__.py文件放在必要的文件夹中,以确保Python可以导入管道
首先,查看src/spaceflights/pipelines/data_science/nodes.py中数据科学节点的函数:
Click to expand
import logging
from typing import dict, Tuple
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: dict[str, Any]) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters_data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
"""Trains the linear regression model.
Args:
X_train: Training data of independent features.
y_train: Training data for price.
Returns:
Trained model.
"""
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
"""Calculates and logs the coefficient of determination.
Args:
regressor: Trained model.
X_test: Testing data of independent features.
y_test: Testing data for price.
"""
y_pred = regressor.predict(X_test)
score = r2_score(y_test, y_pred)
logger = logging.getLogger(__name__)
logger.info("Model has a coefficient R^2 of %.3f on test data.", score)
输入参数配置¶
当管道执行时,DataCatalog所使用的参数存储在conf/base/parameters_data_science.yml中:
Click to expand
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
这里,参数test_size和random_state用于训练测试集分割,而features则指定了模型输入表中用作特征的列名。
更多关于参数的信息可在配置文档中找到。
模型注册¶
在conf/base/catalog.yml中的以下定义注册了用于保存训练模型的数据集:
regressor:
type: pickle.PickleDataset
filepath: data/06_models/regressor.pickle
versioned: true
通过将versioned设置为true,即可为regressor启用版本控制功能。这意味着每次流水线运行时,regressor的pickle输出都会被保存,从而存储使用该流水线构建的模型历史记录。您可以在后续关于数据集和ML模型版本控制的章节中了解更多信息。
数据科学流水线¶
数据科学流水线定义在src/spaceflights/pipelines/data_science/pipeline.py中:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
测试管道¶
当你使用kedro new创建项目时,生成的文件之一是src/,该文件构建了一个包含项目中所有管道的__default__默认管道。
这意味着您无需手动指示Kedro运行每个管道,而是可以执行默认管道,该管道依次由数据处理管道和数据科学管道组成。
kedro run
您应该会看到类似以下的输出:
Click to expand
INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataset)... data_catalog.py:382
INFO Completed 1 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataset)... data_catalog.py:343
[08/09/22 16:56:15] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:382
INFO Completed 2 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataset)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO Saving data to 'model_input_table' (MemoryDataset)... data_catalog.py:382
[08/09/22 16:56:19] INFO Completed 3 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'params:model_options' (MemoryDataset)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:model_options]) ->
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataset)... data_catalog.py:382
INFO Completed 4 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataset)... data_catalog.py:343
INFO Running node: train_model_node: train_model([X_train,y_train]) -> node.py:327
[regressor]
[08/09/22 16:56:20] INFO Saving data to 'regressor' (PickleDataset)... data_catalog.py:382
INFO Completed 5 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'regressor' (PickleDataset)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataset)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([regressor,X_test,y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:55
INFO Completed 6 out of 6 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
如你所见,data_processing和data_science流水线成功运行,生成了一个模型并进行了评估。
切片管道¶
有时您可能只想运行默认流水线的一部分。例如,可以跳过data_processing的执行,仅运行data_science流水线来调整价格预测模型的超参数。
你可以通过使用--pipeline选项来"切片"管道,仅指定你想运行的部分。例如,若只想运行名为data_science的管道(在register_pipelines中自动标记的),请执行以下命令:
kedro run --pipeline=data_science
模块化管道¶
在许多典型的Kedro项目中,随着项目发展,单一("主")管道的复杂性会不断增加。为使项目保持适用性,我们建议您创建模块化管道,这些管道在逻辑上是隔离的且可重复使用。您可以将模块化管道多次实例化为"模板"管道,该模板可以使用不同的输入/输出/参数运行。
模块化流水线更易于开发、测试和维护。它们不仅能在同一代码库中复用,还能通过微打包跨项目移植,这是使用Kedro流水线的一种可扩展方式。
可选:通过命名空间和模块化流水线扩展项目¶
这段代码是可选的,因此在spaceflights初始项目中没有提供。如果你想查看实际效果,需要按照说明复制粘贴代码。
首先,在数据科学管线的建模组件中添加命名空间,将其实例化为一个模板,针对active_modelling_pipeline和candidate_modelling_pipeline使用不同参数,以测试不同特征组合下的模型表现。
更新您的目录,为每个实例的输出添加命名空间。在
conf/base/catalog.yml文件中,将regressor键替换为以下两个新的数据集键:
Click to expand
active_modelling_pipeline.regressor:
type: pickle.PickleDataset
filepath: data/06_models/regressor_active.pickle
versioned: true
candidate_modelling_pipeline.regressor:
type: pickle.PickleDataset
filepath: data/06_models/regressor_candidate.pickle
versioned: true
更新数据科学管道的参数文件
conf/base/parameters_data_science.yml,将model_options的现有内容替换为以下模板管道的两个实例配置:
Click to expand
active_modelling_pipeline:
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
candidate_modelling_pipeline:
model_options:
test_size: 0.2
random_state: 8
features:
- engines
- passenger_capacity
- crew
- review_scores_rating
将
pipelines/data_science/pipeline.py中的代码替换为以下片段:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
pipeline_instance = pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
ds_pipeline_1 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="active_modelling_pipeline",
)
ds_pipeline_2 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="candidate_modelling_pipeline",
)
return ds_pipeline_1 + ds_pipeline_2
在终端执行
kedro run。您应该会看到如下输出:
Click to expand
[11/02/22 10:41:08] INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: preprocess_companies([companies]) -> node.py:327
[preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (ParquetDataset)... data_catalog.py:382
INFO Completed 1 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataset)... data_catalog.py:343
[11/02/22 10:41:13] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) -> node.py:327
[preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (ParquetDataset)... data_catalog.py:382
INFO Completed 2 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (ParquetDataset)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (ParquetDataset)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataset)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,reviews]) ->
[model_input_table]
^[[B[11/02/22 10:41:14] INFO Saving data to 'model_input_table' (ParquetDataset)... data_catalog.py:382
[11/02/22 10:41:15] INFO Completed 3 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (ParquetDataset)... data_catalog.py:343
INFO Loading data from 'params:active_modelling_pipeline.model_options' (MemoryDataset)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:active_modelling_pipeline.model_options]) ->
[active_modelling_pipeline.X_train,active_modelling_pipeline.X_test,active_modelling_pipeline.y_t
rain,active_modelling_pipeline.y_test]
INFO Saving data to 'active_modelling_pipeline.X_train' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.X_test' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.y_train' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'active_modelling_pipeline.y_test' (MemoryDataset)... data_catalog.py:382
INFO Completed 4 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (ParquetDataset)... data_catalog.py:343
INFO Loading data from 'params:candidate_modelling_pipeline.model_options' (MemoryDataset)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:candidate_modelling_pipeline.model_options]) ->
[candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.X_test,candidate_modelling_pip
eline.y_train,candidate_modelling_pipeline.y_test]
INFO Saving data to 'candidate_modelling_pipeline.X_train' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.X_test' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.y_train' (MemoryDataset)... data_catalog.py:382
INFO Saving data to 'candidate_modelling_pipeline.y_test' (MemoryDataset)... data_catalog.py:382
INFO Completed 5 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'active_modelling_pipeline.X_train' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.y_train' (MemoryDataset)... data_catalog.py:343
INFO Running node: train_model_node: node.py:327
train_model([active_modelling_pipeline.X_train,active_modelling_pipeline.y_train]) ->
[active_modelling_pipeline.regressor]
INFO Saving data to 'active_modelling_pipeline.regressor' (PickleDataset)... data_catalog.py:382
INFO Completed 6 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'candidate_modelling_pipeline.X_train' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.y_train' (MemoryDataset)... data_catalog.py:343
INFO Running node: train_model_node: node.py:327
train_model([candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.y_train]) ->
[candidate_modelling_pipeline.regressor]
INFO Saving data to 'candidate_modelling_pipeline.regressor' (PickleDataset)... data_catalog.py:382
INFO Completed 7 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'active_modelling_pipeline.regressor' (PickleDataset)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.X_test' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'active_modelling_pipeline.y_test' (MemoryDataset)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([active_modelling_pipeline.regressor,active_modelling_pipeline.X_test,active_model
ling_pipeline.y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:60
INFO Completed 8 out of 9 tasks sequential_runner.py:85
INFO Loading data from 'candidate_modelling_pipeline.regressor' (PickleDataset)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.X_test' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'candidate_modelling_pipeline.y_test' (MemoryDataset)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([candidate_modelling_pipeline.regressor,candidate_modelling_pipeline.X_test,candid
ate_modelling_pipeline.y_test]) -> None
INFO Model has a coefficient R^2 of 0.449 on test data. nodes.py:60
INFO Completed 9 out of 9 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully.
工作原理:模块化的 pipeline() 包装器¶
您在代码中添加的导入引入了pipeline包装器,它使您能够实例化具有静态结构但动态输入/输出/参数的多个pipeline实例:
from kedro.pipeline import pipeline
pipeline() 包装方法接受以下参数:
关键字参数 |
描述 |
|---|---|
|
你想要包装的 |
|
提供给底层封装 |
|
提供给底层封装 |
|
提供给底层封装 |
|
该管道实例将封装的命名空间 |
您可以将此代码片段视为添加到示例中的一部分代码:
Click to expand
...
ds_pipeline_1 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="active_modelling_pipeline",
)
ds_pipeline_2 = pipeline(
pipe=pipeline_instance,
inputs="model_input_table",
namespace="candidate_modelling_pipeline",
)
代码实例化了模板管道两次,但传入了不同的参数。pipeline_instance变量是模板管道,而ds_pipeline_1和ds_pipeline_2是两个分别参数化的实例。
命名空间如何影响参数?¶
ds_pipeline_1节点中的所有inputs和outputs都具有active_modelling_pipeline前缀:
params:model_options转换为active_modelling_pipeline.params:model_optionsX_train转换为active_modelling_pipeline.X_trainX_test转换为active_modelling_pipeline.X_test,依此类推
针对ds_pipeline_2有一组独立的参数,使用candidate_modelling_pipeline作为前缀:
params:model_options转换为candidate_modelling_pipeline.params:model_optionsX_train转换为candidate_modelling_pipeline.X_trainX_test转换为candidate_modelling_pipeline.X_test,依此类推
然而,model_input_table不会被参数化,因为它需要在实例之间共享,因此被冻结在命名空间包装器的范围之外。
使用kedro viz run命令会呈现如下效果(将鼠标悬停在数据集上可查看完整路径):

可选:Kedro运行器¶
Kedro有三种不同的运行器可以执行管道:
SequentialRunner- 按顺序运行节点;当一个节点完成任务后,下一个节点才会开始。ParallelRunner- 并行运行节点;独立节点可以同时运行,当管道中存在独立分支时效率更高,使您能够利用多核CPU的优势。ThreadRunner- 并行运行节点,类似于ParallelRunner,但使用多线程而非多进程。
默认情况下,Kedro使用SequentialRunner,当您从终端执行kedro run时会实例化该运行器。如果您决定使用ParallelRunner、ThreadRunner或自定义运行器,可以通过--runner标志按如下方式实现:
kedro run --runner=ParallelRunner
kedro run --runner=ThreadRunner
kedro run --runner=module.path.to.my.runner
ParallelRunner 通过多进程实现任务并行化,而 ThreadRunner 则设计用于与远程执行引擎(如 Spark 和 Dask)配合使用。
您可以在关于运行器的流水线文档中了解更多Kedro提供的运行器以及如何创建自定义运行器。