测试一个Kedro项目

测试我们的Kedro项目非常重要,可以验证节点和管道是否按预期运行。本节我们将查看spaceflights项目的一些测试示例。

本节将解释以下内容:

  • 如何测试一个Kedro节点

  • 如何测试Kedro管道

  • 测试最佳实践

本节内容不包含:

为Kedro节点编写测试:单元测试

Kedro要求节点函数是纯函数;纯函数是指其输出仅由输入决定,没有任何可观察的副作用。测试这些函数可以验证节点是否按预期运行——对于给定的输入值集合,节点将产生预期的输出。这些测试被称为单元测试。

让我们通过实际案例来探索这一过程。以数据科学流水线中定义的节点函数split_data为例:

Click to expand
def split_data(data: pd.DataFrame, parameters: dict[str, Any]) -> Tuple:
    """Splits data into features and targets training and test sets.

    Args:
        data: Data containing features and target.
        parameters: Parameters defined in parameters_data_science.yml.
    Returns:
        Split data.
    """
    X = data[parameters["features"]]
    y = data["price"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )
    return X_train, X_test, y_train, y_test

该函数接收一个pandas DataFrame和参数字典作为输入,并根据提供的参数将输入数据分割成四个不同的数据对象。我们建议遵循pytest的测试结构,将测试分解为四个步骤:准备(arrange)、执行(act)、断言(assert)和清理(cleanup)。对于这个特定函数,这些步骤将是:

  1. 安排:准备输入数据 data 和参数 parameters

  2. 执行:调用split_data并捕获输出结果到X_trainX_testY_trainY_test

  3. 断言:确保输出的长度与预期长度一致

当测试中的任何先前步骤进行了可能影响其他测试的修改时(例如修改了作为多个测试输入使用的文件),清理步骤就变得必要。下面的示例测试不属于这种情况,因此省略了清理步骤。

记得在文件顶部导入要测试的函数以及任何必要的模块。

当我们把这些步骤整合在一起时,就形成了以下测试:

Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.

import pandas as pd
from spaceflights.pipelines.data_science.nodes import split_data

def test_split_data():
    # Arrange
    dummy_data = pd.DataFrame(
        {
            "engines": [1, 2, 3],
            "crew": [4, 5, 6],
            "passenger_capacity": [5, 6, 7],
            "price": [120, 290, 30],
        }
    )

    dummy_parameters = {
        "model_options": {
            "test_size": 0.2,
            "random_state": 3,
            "features": ["engines", "passenger_capacity", "crew"],
        }
    }

    # Act
    X_train, X_test, y_train, y_test = split_data(dummy_data, dummy_parameters["model_options"])

    # Assert
    assert len(X_train) == 2
    assert len(y_train) == 2
    assert len(X_test) == 1
    assert len(y_test) == 1

这个测试是正向测试的一个示例——它验证有效输入能产生预期输出。相反,测试无效输入会被正确拒绝的情况称为负向测试,其重要性同样不可忽视。

按照上述相同的步骤,我们可以编写以下测试来验证当价格数据不可用时是否抛出错误:

Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.

import pandas as pd
from spaceflights.pipelines.data_science.nodes import split_data

def test_split_data_missing_price():
    # Arrange
    dummy_data = pd.DataFrame(
        {
            "engines": [1, 2, 3],
            "crew": [4, 5, 6],
            "passenger_capacity": [5, 6, 7],
            # Note the missing price data
        }
    )

    dummy_parameters = {
        "model_options": {
            "test_size": 0.2,
            "random_state": 3,
            "features": ["engines", "passenger_capacity", "crew"],
        }
    }

    with pytest.raises(KeyError) as e_info:
        # Act
        X_train, X_test, y_train, y_test = split_data(dummy_data, dummy_parameters["model_options"])

    # Assert
    assert "price" in str(e_info.value) # checks that the error is about the missing price data

为Kedro管道编写测试:集成测试

为每个节点编写测试可以确保节点在单独运行时表现符合预期。然而,我们还需要考虑管道中各节点之间的交互情况——这被称为集成测试。集成测试将各个单元组合成一个整体,检查它们是否按预期进行通信、共享数据并协同工作。让我们通过实践来理解这一点。

将数据科学流程视为一个整体:

Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )

该流水线接收一个pandas DataFrame和参数字典作为输入,根据参数拆分数据,并用于训练和评估回归模型。通过集成测试,我们可以验证这一系列节点是否按预期运行。

在本教程前面的部分中,我们知道成功的流水线运行会以记录Pipeline execution completed successfully.消息作为结束。为了验证测试中是否记录了这条消息,我们使用了pytest的caplog功能来捕获执行期间生成的日志。

正如我们对单元测试所做的那样,我们将此分解为几个步骤:

  1. 准备阶段:设置运行器及其输入参数 pipelinecatalog,以及任何额外的测试配置。

  2. 执行:运行管道。

  3. 断言:确保成功运行消息已被记录。

当我们把这些整合在一起时,就得到了以下测试:

Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.

import logging
import pandas as pd
from kedro.io import DataCatalog
from kedro.runner import SequentialRunner
from spaceflights.pipelines.data_science import create_pipeline as create_ds_pipeline

def test_data_science_pipeline(caplog):    # Note: caplog is passed as an argument
    # Arrange pipeline
    pipeline = create_ds_pipeline()

    # Arrange data catalog
    catalog = DataCatalog()

    dummy_data = pd.DataFrame(
        {
            "engines": [1, 2, 3],
            "crew": [4, 5, 6],
            "passenger_capacity": [5, 6, 7],
            "price": [120, 290, 30],
        }
    )

    duummy_parameters = {
        "model_options": {
            "test_size": 0.2,
            "random_state": 3,
            "features": ["engines", "passenger_capacity", "crew"],
        }
    }

    catalog.add_feed_dict(
        {
            "model_input_table" : dummy_data,
            "params:model_options": dummy_parameters["model_options"],
        }
    )

    # Arrange the log testing setup
    caplog.set_level(logging.DEBUG, logger="kedro") # Ensure all logs produced by Kedro are captured
    successful_run_msg = "Pipeline execution completed successfully."

    # Act
    SequentialRunner().run(pipeline, catalog)

    # Assert
    assert successful_run_msg in caplog.text

测试最佳实践

在哪里编写你的测试

我们建议在项目根目录下创建一个tests目录。该目录结构应与/src/spaceflights的目录结构保持一致:

src
│   ...
└───spaceflights
│   └───pipelines
│       └───data_science
│           │   __init__.py
│           │   nodes.py
│           │   pipeline.py
│
tests
|   ...
└───pipelines
│   └───data_science
│       │   test_data_science_pipeline.py

使用fixtures

在我们的测试中,可以看到dummy_datadummy_parameters被定义了三次(大部分情况下)使用相同的值。相反,我们可以将这些定义在测试外部作为pytest fixtures

Click to expand
import pytest

@pytest.fixture
def dummy_data():
    return pd.DataFrame(
        {
            "engines": [1, 2, 3],
            "crew": [4, 5, 6],
            "passenger_capacity": [5, 6, 7],
            "price": [120, 290, 30],
        }
    )

@pytest.fixture
def dummy_parameters():
    parameters = {
        "model_options": {
            "test_size": 0.2,
            "random_state": 3,
            "features": ["engines", "passenger_capacity", "crew"],
        }
    }
    return parameters

然后我们可以通过这些测试参数来访问它们。

def test_split_data(dummy_data, dummy_parameters):
        ...

流水线切片

在测试test_data_science_pipeline中,我们验证当前定义的数据科学流水线能否成功运行。然而,由于流水线并非静态不变,该测试并不具备鲁棒性。相反,我们应该明确定义待测流水线的具体范围;为此我们使用pipeline slicing来指定流水线的起点和终点:

def test_data_science_pipeline(self):
    # Arrange pipeline
    pipeline = create_pipeline().from_nodes("split_data_node").to_nodes("evaluate_model_node")
    ...

这确保了即使向流水线添加更多节点,测试仍将按设计执行。

在整合这些测试实践后,我们的测试文件test_data_science_pipeline.py变为:

# tests/pipelines/test_data_science_pipeline.py

import logging
import pandas as pd
import pytest

from kedro.io import DataCatalog
from kedro.runner import SequentialRunner
from spaceflights.pipelines.data_science import create_pipeline as create_ds_pipeline
from spaceflights.pipelines.data_science.nodes import split_data

@pytest.fixture
def dummy_data():
    return pd.DataFrame(
        {
            "engines": [1, 2, 3],
            "crew": [4, 5, 6],
            "passenger_capacity": [5, 6, 7],
            "price": [120, 290, 30],
        }
    )

@pytest.fixture
def dummy_parameters():
    parameters = {
        "model_options": {
            "test_size": 0.2,
            "random_state": 3,
            "features": ["engines", "passenger_capacity", "crew"],
        }
    }
    return parameters


def test_split_data(dummy_data, dummy_parameters):
    X_train, X_test, y_train, y_test = split_data(
        dummy_data, dummy_parameters["model_options"]
    )
    assert len(X_train) == 2
    assert len(y_train) == 2
    assert len(X_test) == 1
    assert len(y_test) == 1

def test_split_data_missing_price(dummy_data, dummy_parameters):
    dummy_data_missing_price = dummy_data.drop(columns="price")
    with pytest.raises(KeyError) as e_info:
        X_train, X_test, y_train, y_test = split_data(dummy_data_missing_price, dummy_parameters["model_options"])

    assert "price" in str(e_info.value)

def test_data_science_pipeline(caplog, dummy_data, dummy_parameters):
    pipeline = (
        create_ds_pipeline()
        .from_nodes("split_data_node")
        .to_nodes("evaluate_model_node")
    )
    catalog = DataCatalog()
    catalog.add_feed_dict(
        {
            "model_input_table" : dummy_data,
            "params:model_options": dummy_parameters["model_options"],
        }
    )

    caplog.set_level(logging.DEBUG, logger="kedro")
    successful_run_msg = "Pipeline execution completed successfully."

    SequentialRunner().run(pipeline, catalog)

    assert successful_run_msg in caplog.text

运行你的测试

首先,确认您的项目已在本地安装完成。可以通过导航至项目根目录并运行以下命令来实现:

pip install -e .

此步骤允许pytest准确解析测试文件中的导入语句。

注意: 选项 -e 会安装项目的可编辑版本,这样您可以在不每次重新安装的情况下修改项目文件。

请确保已安装pytest。有关使用pytest进行设置的更多信息,请参阅我们的自动化测试文档

要运行测试,请从项目根目录下执行pytest命令。

cd <project_root>
pytest tests/pipelines/test_data_science_pipeline.py

你应该在终端中看到以下输出。

============================= test session starts ==============================
...
collected 2 items

tests/pipelines/test_data_science_pipeline.py ..                                                  [100%]

============================== 2 passed in 4.38s ===============================

该输出表明文件tests/pipelines/test_data_science_pipeline.py中的所有测试均无错误运行。