创建数据科学流水线

本节将解释以下内容:

  • 如何为扩展默认项目流水线的数据科学代码添加第二个Kedro流水线

  • 如何'切片'项目以仅运行整个流水线的一部分

  • (可选) 如何创建模块化流水线

  • (可选) 如何指定管道节点的运行方式:顺序执行或并行执行

数据科学节点

数据科学流水线使用了来自scikit-learn库的LinearRegression实现。

数据科学流水线由以下部分组成:

  • src/spaceflights/pipelines/data_science目录下的两个python文件

    • nodes.py (包含构成数据处理流程的节点函数)

    • pipeline.py (用于构建流水线)

  • 一个yaml文件:conf/base/parameters_data_science.yml 用于定义运行管道时使用的参数

  • __init__.py 文件放在必要的文件夹中,以确保Python可以导入管道

首先,查看src/spaceflights/pipelines/data_science/nodes.py中数据科学节点的函数:

Click to expand
import logging
from typing import dict, Tuple

import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split


def split_data(data: pd.DataFrame, parameters: dict[str, Any]) -> Tuple:
    """Splits data into features and targets training and test sets.

    Args:
        data: Data containing features and target.
        parameters: Parameters defined in parameters_data_science.yml.
    Returns:
        Split data.
    """
    X = data[parameters["features"]]
    y = data["price"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )
    return X_train, X_test, y_train, y_test


def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
    """Trains the linear regression model.

    Args:
        X_train: Training data of independent features.
        y_train: Training data for price.

    Returns:
        Trained model.
    """
    regressor = LinearRegression()
    regressor.fit(X_train, y_train)
    return regressor


def evaluate_model(
    regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
    """Calculates and logs the coefficient of determination.

    Args:
        regressor: Trained model.
        X_test: Testing data of independent features.
        y_test: Testing data for price.
    """
    y_pred = regressor.predict(X_test)
    score = r2_score(y_test, y_pred)
    logger = logging.getLogger(__name__)
    logger.info("Model has a coefficient R^2 of %.3f on test data.", score)

输入参数配置

当管道执行时,DataCatalog所使用的参数存储在conf/base/parameters_data_science.yml中:

Click to expand
model_options:
  test_size: 0.2
  random_state: 3
  features:
    - engines
    - passenger_capacity
    - crew
    - d_check_complete
    - moon_clearance_complete
    - iata_approved
    - company_rating
    - review_scores_rating

这里,参数test_sizerandom_state用于训练测试集分割,而features则指定了模型输入表中用作特征的列名。

更多关于参数的信息可在配置文档中找到

模型注册

conf/base/catalog.yml中的以下定义注册了用于保存训练模型的数据集:

regressor:
  type: pickle.PickleDataset
  filepath: data/06_models/regressor.pickle
  versioned: true

通过将versioned设置为true,即可为regressor启用版本控制功能。这意味着每次流水线运行时,regressor的pickle输出都会被保存,从而存储使用该流水线构建的模型历史记录。您可以在后续关于数据集和ML模型版本控制的章节中了解更多信息。

数据科学流水线

数据科学流水线定义在src/spaceflights/pipelines/data_science/pipeline.py中:

Click to expand
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import evaluate_model, split_data, train_model


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )

测试管道

当你使用kedro new创建项目时,生成的文件之一是src//pipeline_registry.py,该文件构建了一个包含项目中所有管道的__default__默认管道。

这意味着您无需手动指示Kedro运行每个管道,而是可以执行默认管道,该管道依次由数据处理管道和数据科学管道组成。

kedro run

您应该会看到类似以下的输出:

Click to expand
                    INFO     Loading data from 'companies' (CSVDataset)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataset)...      data_catalog.py:382
                    INFO     Completed 1 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataset)...                  data_catalog.py:343
[08/09/22 16:56:15] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
                             -> [preprocessed_shuttles]
                    INFO     Saving data to 'preprocessed_shuttles' (MemoryDataset)...       data_catalog.py:382
                    INFO     Completed 2 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (MemoryDataset)...    data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataset)...   data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataset)...                     data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                            node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,
                             reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO     Saving data to 'model_input_table' (MemoryDataset)...           data_catalog.py:382
[08/09/22 16:56:19] INFO     Completed 3 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'model_input_table' (MemoryDataset)...        data_catalog.py:343
                    INFO     Loading data from 'params:model_options' (MemoryDataset)...     data_catalog.py:343
                    INFO     Running node: split_data_node:                                          node.py:327
                             split_data([model_input_table,params:model_options]) ->
                             [X_train,X_test,y_train,y_test]
                    INFO     Saving data to 'X_train' (MemoryDataset)...                     data_catalog.py:382
                    INFO     Saving data to 'X_test' (MemoryDataset)...                      data_catalog.py:382
                    INFO     Saving data to 'y_train' (MemoryDataset)...                     data_catalog.py:382
                    INFO     Saving data to 'y_test' (MemoryDataset)...                      data_catalog.py:382
                    INFO     Completed 4 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'X_train' (MemoryDataset)...                  data_catalog.py:343
                    INFO     Loading data from 'y_train' (MemoryDataset)...                  data_catalog.py:343
                    INFO     Running node: train_model_node: train_model([X_train,y_train]) ->       node.py:327
                             [regressor]
[08/09/22 16:56:20] INFO     Saving data to 'regressor' (PickleDataset)...                   data_catalog.py:382
                    INFO     Completed 5 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'regressor' (PickleDataset)...                data_catalog.py:343
                    INFO     Loading data from 'X_test' (MemoryDataset)...                   data_catalog.py:343
                    INFO     Loading data from 'y_test' (MemoryDataset)...                   data_catalog.py:343
                    INFO     Running node: evaluate_model_node:                                      node.py:327
                             evaluate_model([regressor,X_test,y_test]) -> None
                    INFO     Model has a coefficient R^2 of 0.462 on test data.                      nodes.py:55
                    INFO     Completed 6 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89

如你所见,data_processingdata_science流水线成功运行,生成了一个模型并进行了评估。

切片管道

有时您可能只想运行默认流水线的一部分。例如,可以跳过data_processing的执行,仅运行data_science流水线来调整价格预测模型的超参数。

你可以通过使用--pipeline选项来"切片"管道,仅指定你想运行的部分。例如,若只想运行名为data_science的管道(在register_pipelines中自动标记的),请执行以下命令:

kedro run --pipeline=data_science

有多种选项可以运行默认管道的部分内容,具体描述请参阅管道切片文档以及kedro runCLI文档

模块化管道

在许多典型的Kedro项目中,随着项目发展,单一("主")管道的复杂性会不断增加。为使项目保持适用性,我们建议您创建模块化管道,这些管道在逻辑上是隔离的且可重复使用。您可以将模块化管道多次实例化为"模板"管道,该模板可以使用不同的输入/输出/参数运行。

模块化流水线更易于开发、测试和维护。它们不仅能在同一代码库中复用,还能通过微打包跨项目移植,这是使用Kedro流水线的一种可扩展方式。

可选:通过命名空间和模块化流水线扩展项目

这段代码是可选的,因此在spaceflights初始项目中没有提供。如果你想查看实际效果,需要按照说明复制粘贴代码。

首先,在数据科学管线的建模组件中添加命名空间,将其实例化为一个模板,针对active_modelling_pipelinecandidate_modelling_pipeline使用不同参数,以测试不同特征组合下的模型表现。

  1. 更新您的目录,为每个实例的输出添加命名空间。在conf/base/catalog.yml文件中,将regressor键替换为以下两个新的数据集键:

Click to expand
active_modelling_pipeline.regressor:
  type: pickle.PickleDataset
  filepath: data/06_models/regressor_active.pickle
  versioned: true

candidate_modelling_pipeline.regressor:
  type: pickle.PickleDataset
  filepath: data/06_models/regressor_candidate.pickle
  versioned: true

  1. 更新数据科学管道的参数文件 conf/base/parameters_data_science.yml,将 model_options 的现有内容替换为以下模板管道的两个实例配置:

Click to expand
active_modelling_pipeline:
    model_options:
      test_size: 0.2
      random_state: 3
      features:
        - engines
        - passenger_capacity
        - crew
        - d_check_complete
        - moon_clearance_complete
        - iata_approved
        - company_rating
        - review_scores_rating

candidate_modelling_pipeline:
    model_options:
      test_size: 0.2
      random_state: 8
      features:
        - engines
        - passenger_capacity
        - crew
        - review_scores_rating

  1. pipelines/data_science/pipeline.py中的代码替换为以下片段:

Click to expand
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import evaluate_model, split_data, train_model


def create_pipeline(**kwargs) -> Pipeline:
    pipeline_instance = pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )
    ds_pipeline_1 = pipeline(
        pipe=pipeline_instance,
        inputs="model_input_table",
        namespace="active_modelling_pipeline",
    )
    ds_pipeline_2 = pipeline(
        pipe=pipeline_instance,
        inputs="model_input_table",
        namespace="candidate_modelling_pipeline",
    )

    return ds_pipeline_1 + ds_pipeline_2

  1. 在终端执行 kedro run。您应该会看到如下输出:

Click to expand
[11/02/22 10:41:08] INFO     Loading data from 'companies' (CSVDataset)...                                             data_catalog.py:343
                    INFO     Running node: preprocess_companies_node: preprocess_companies([companies]) ->                     node.py:327
                             [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (ParquetDataset)...                               data_catalog.py:382
                    INFO     Completed 1 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataset)...                                            data_catalog.py:343
[11/02/22 10:41:13] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) ->                        node.py:327
                             [preprocessed_shuttles]
                    INFO     Saving data to 'preprocessed_shuttles' (ParquetDataset)...                                data_catalog.py:382
                    INFO     Completed 2 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (ParquetDataset)...                             data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (ParquetDataset)...                            data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataset)...                                               data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                                                      node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,reviews]) ->
                             [model_input_table]
^[[B[11/02/22 10:41:14] INFO     Saving data to 'model_input_table' (ParquetDataset)...                                    data_catalog.py:382
[11/02/22 10:41:15] INFO     Completed 3 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'model_input_table' (ParquetDataset)...                                 data_catalog.py:343
                    INFO     Loading data from 'params:active_modelling_pipeline.model_options' (MemoryDataset)...     data_catalog.py:343
                    INFO     Running node: split_data_node:                                                                    node.py:327
                             split_data([model_input_table,params:active_modelling_pipeline.model_options]) ->
                             [active_modelling_pipeline.X_train,active_modelling_pipeline.X_test,active_modelling_pipeline.y_t
                             rain,active_modelling_pipeline.y_test]
                    INFO     Saving data to 'active_modelling_pipeline.X_train' (MemoryDataset)...                     data_catalog.py:382
                    INFO     Saving data to 'active_modelling_pipeline.X_test' (MemoryDataset)...                      data_catalog.py:382
                    INFO     Saving data to 'active_modelling_pipeline.y_train' (MemoryDataset)...                     data_catalog.py:382
                    INFO     Saving data to 'active_modelling_pipeline.y_test' (MemoryDataset)...                      data_catalog.py:382
                    INFO     Completed 4 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'model_input_table' (ParquetDataset)...                                 data_catalog.py:343
                    INFO     Loading data from 'params:candidate_modelling_pipeline.model_options' (MemoryDataset)...  data_catalog.py:343
                    INFO     Running node: split_data_node:                                                                    node.py:327
                             split_data([model_input_table,params:candidate_modelling_pipeline.model_options]) ->
                             [candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.X_test,candidate_modelling_pip
                             eline.y_train,candidate_modelling_pipeline.y_test]
                    INFO     Saving data to 'candidate_modelling_pipeline.X_train' (MemoryDataset)...                  data_catalog.py:382
                    INFO     Saving data to 'candidate_modelling_pipeline.X_test' (MemoryDataset)...                   data_catalog.py:382
                    INFO     Saving data to 'candidate_modelling_pipeline.y_train' (MemoryDataset)...                  data_catalog.py:382
                    INFO     Saving data to 'candidate_modelling_pipeline.y_test' (MemoryDataset)...                   data_catalog.py:382
                    INFO     Completed 5 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'active_modelling_pipeline.X_train' (MemoryDataset)...                  data_catalog.py:343
                    INFO     Loading data from 'active_modelling_pipeline.y_train' (MemoryDataset)...                  data_catalog.py:343
                    INFO     Running node: train_model_node:                                                                   node.py:327
                             train_model([active_modelling_pipeline.X_train,active_modelling_pipeline.y_train]) ->
                             [active_modelling_pipeline.regressor]
                    INFO     Saving data to 'active_modelling_pipeline.regressor' (PickleDataset)...                   data_catalog.py:382
                    INFO     Completed 6 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'candidate_modelling_pipeline.X_train' (MemoryDataset)...               data_catalog.py:343
                    INFO     Loading data from 'candidate_modelling_pipeline.y_train' (MemoryDataset)...               data_catalog.py:343
                    INFO     Running node: train_model_node:                                                                   node.py:327
                             train_model([candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.y_train]) ->
                             [candidate_modelling_pipeline.regressor]
                    INFO     Saving data to 'candidate_modelling_pipeline.regressor' (PickleDataset)...                data_catalog.py:382
                    INFO     Completed 7 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'active_modelling_pipeline.regressor' (PickleDataset)...                data_catalog.py:343
                    INFO     Loading data from 'active_modelling_pipeline.X_test' (MemoryDataset)...                   data_catalog.py:343
                    INFO     Loading data from 'active_modelling_pipeline.y_test' (MemoryDataset)...                   data_catalog.py:343
                    INFO     Running node: evaluate_model_node:                                                                node.py:327
                             evaluate_model([active_modelling_pipeline.regressor,active_modelling_pipeline.X_test,active_model
                             ling_pipeline.y_test]) -> None
                    INFO     Model has a coefficient R^2 of 0.462 on test data.                                                nodes.py:60
                    INFO     Completed 8 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'candidate_modelling_pipeline.regressor' (PickleDataset)...             data_catalog.py:343
                    INFO     Loading data from 'candidate_modelling_pipeline.X_test' (MemoryDataset)...                data_catalog.py:343
                    INFO     Loading data from 'candidate_modelling_pipeline.y_test' (MemoryDataset)...                data_catalog.py:343
                    INFO     Running node: evaluate_model_node:                                                                node.py:327
                             evaluate_model([candidate_modelling_pipeline.regressor,candidate_modelling_pipeline.X_test,candid
                             ate_modelling_pipeline.y_test]) -> None
                    INFO     Model has a coefficient R^2 of 0.449 on test data.                                                nodes.py:60
                    INFO     Completed 9 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.

工作原理:模块化的 pipeline() 包装器

您在代码中添加的导入引入了pipeline包装器,它使您能够实例化具有静态结构但动态输入/输出/参数的多个pipeline实例:

from kedro.pipeline import pipeline

pipeline() 包装方法接受以下参数:

关键字参数

描述

pipe

你想要包装的Pipeline对象

inputs

提供给底层封装Pipeline对象实例的任何覆盖配置

outputs

提供给底层封装Pipeline对象实例的任何覆盖配置

parameters

提供给底层封装Pipeline对象实例的任何覆盖配置

namespace

该管道实例将封装的命名空间

您可以将此代码片段视为添加到示例中的一部分代码:

Click to expand
...

ds_pipeline_1 = pipeline(
    pipe=pipeline_instance,
    inputs="model_input_table",
    namespace="active_modelling_pipeline",
)

ds_pipeline_2 = pipeline(
    pipe=pipeline_instance,
    inputs="model_input_table",
    namespace="candidate_modelling_pipeline",
)

代码实例化了模板管道两次,但传入了不同的参数。pipeline_instance变量是模板管道,而ds_pipeline_1ds_pipeline_2是两个分别参数化的实例。

命名空间如何影响参数?

ds_pipeline_1节点中的所有inputsoutputs都具有active_modelling_pipeline前缀:

  • params:model_options 转换为 active_modelling_pipeline.params:model_options

  • X_train 转换为 active_modelling_pipeline.X_train

  • X_test 转换为 active_modelling_pipeline.X_test,依此类推

针对ds_pipeline_2有一组独立的参数,使用candidate_modelling_pipeline作为前缀:

  • params:model_options 转换为 candidate_modelling_pipeline.params:model_options

  • X_train 转换为 candidate_modelling_pipeline.X_train

  • X_test 转换为 candidate_modelling_pipeline.X_test,依此类推

然而,model_input_table不会被参数化,因为它需要在实例之间共享,因此被冻结在命名空间包装器的范围之外。

使用kedro viz run命令会呈现如下效果(将鼠标悬停在数据集上可查看完整路径):

modular_ds

可选:Kedro运行器

Kedro有三种不同的运行器可以执行管道:

  • SequentialRunner - 按顺序运行节点;当一个节点完成任务后,下一个节点才会开始。

  • ParallelRunner - 并行运行节点;独立节点可以同时运行,当管道中存在独立分支时效率更高,使您能够利用多核CPU的优势。

  • ThreadRunner - 并行运行节点,类似于ParallelRunner,但使用多线程而非多进程。

默认情况下,Kedro使用SequentialRunner,当您从终端执行kedro run时会实例化该运行器。如果您决定使用ParallelRunnerThreadRunner或自定义运行器,可以通过--runner标志按如下方式实现:

kedro run --runner=ParallelRunner
kedro run --runner=ThreadRunner
kedro run --runner=module.path.to.my.runner

ParallelRunner 通过多进程实现任务并行化,而 ThreadRunner 则设计用于与远程执行引擎(如 SparkDask)配合使用。

您可以在关于运行器的流水线文档中了解更多Kedro提供的运行器以及如何创建自定义运行器。