测试一个Kedro项目¶
测试我们的Kedro项目非常重要,可以验证节点和管道是否按预期运行。本节我们将查看spaceflights项目的一些测试示例。
本节将解释以下内容:
如何测试一个Kedro节点
如何测试Kedro管道
测试最佳实践
本节内容不包含:
为Kedro节点编写测试:单元测试¶
Kedro要求节点函数是纯函数;纯函数是指其输出仅由输入决定,没有任何可观察的副作用。测试这些函数可以验证节点是否按预期运行——对于给定的输入值集合,节点将产生预期的输出。这些测试被称为单元测试。
让我们通过实际案例来探索这一过程。以数据科学流水线中定义的节点函数split_data为例:
Click to expand
def split_data(data: pd.DataFrame, parameters: dict[str, Any]) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters_data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
该函数接收一个pandas DataFrame和参数字典作为输入,并根据提供的参数将输入数据分割成四个不同的数据对象。我们建议遵循pytest的测试结构,将测试分解为四个步骤:准备(arrange)、执行(act)、断言(assert)和清理(cleanup)。对于这个特定函数,这些步骤将是:
安排:准备输入数据
data和参数parameters。执行:调用
split_data并捕获输出结果到X_train、X_test、Y_train和Y_test。断言:确保输出的长度与预期长度一致
当测试中的任何先前步骤进行了可能影响其他测试的修改时(例如修改了作为多个测试输入使用的文件),清理步骤就变得必要。下面的示例测试不属于这种情况,因此省略了清理步骤。
记得在文件顶部导入要测试的函数以及任何必要的模块。
当我们把这些步骤整合在一起时,就形成了以下测试:
Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.
import pandas as pd
from spaceflights.pipelines.data_science.nodes import split_data
def test_split_data():
# Arrange
dummy_data = pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
dummy_parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
# Act
X_train, X_test, y_train, y_test = split_data(dummy_data, dummy_parameters["model_options"])
# Assert
assert len(X_train) == 2
assert len(y_train) == 2
assert len(X_test) == 1
assert len(y_test) == 1
这个测试是正向测试的一个示例——它验证有效输入能产生预期输出。相反,测试无效输入会被正确拒绝的情况称为负向测试,其重要性同样不可忽视。
按照上述相同的步骤,我们可以编写以下测试来验证当价格数据不可用时是否抛出错误:
Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.
import pandas as pd
from spaceflights.pipelines.data_science.nodes import split_data
def test_split_data_missing_price():
# Arrange
dummy_data = pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
# Note the missing price data
}
)
dummy_parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
with pytest.raises(KeyError) as e_info:
# Act
X_train, X_test, y_train, y_test = split_data(dummy_data, dummy_parameters["model_options"])
# Assert
assert "price" in str(e_info.value) # checks that the error is about the missing price data
为Kedro管道编写测试:集成测试¶
为每个节点编写测试可以确保节点在单独运行时表现符合预期。然而,我们还需要考虑管道中各节点之间的交互情况——这被称为集成测试。集成测试将各个单元组合成一个整体,检查它们是否按预期进行通信、共享数据并协同工作。让我们通过实践来理解这一点。
将数据科学流程视为一个整体:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
该流水线接收一个pandas DataFrame和参数字典作为输入,根据参数拆分数据,并用于训练和评估回归模型。通过集成测试,我们可以验证这一系列节点是否按预期运行。
在本教程前面的部分中,我们知道成功的流水线运行会以记录Pipeline execution completed successfully.消息作为结束。为了验证测试中是否记录了这条消息,我们使用了pytest的caplog功能来捕获执行期间生成的日志。
正如我们对单元测试所做的那样,我们将此分解为几个步骤:
准备阶段:设置运行器及其输入参数
pipeline和catalog,以及任何额外的测试配置。执行:运行管道。
断言:确保成功运行消息已被记录。
当我们把这些整合在一起时,就得到了以下测试:
Click to expand
# NOTE: This example test is yet to be refactored.
# A complete version is available under the testing best practices section.
import logging
import pandas as pd
from kedro.io import DataCatalog
from kedro.runner import SequentialRunner
from spaceflights.pipelines.data_science import create_pipeline as create_ds_pipeline
def test_data_science_pipeline(caplog): # Note: caplog is passed as an argument
# Arrange pipeline
pipeline = create_ds_pipeline()
# Arrange data catalog
catalog = DataCatalog()
dummy_data = pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
duummy_parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
catalog.add_feed_dict(
{
"model_input_table" : dummy_data,
"params:model_options": dummy_parameters["model_options"],
}
)
# Arrange the log testing setup
caplog.set_level(logging.DEBUG, logger="kedro") # Ensure all logs produced by Kedro are captured
successful_run_msg = "Pipeline execution completed successfully."
# Act
SequentialRunner().run(pipeline, catalog)
# Assert
assert successful_run_msg in caplog.text
测试最佳实践¶
在哪里编写你的测试¶
我们建议在项目根目录下创建一个tests目录。该目录结构应与/src/spaceflights的目录结构保持一致:
src
│ ...
└───spaceflights
│ └───pipelines
│ └───data_science
│ │ __init__.py
│ │ nodes.py
│ │ pipeline.py
│
tests
| ...
└───pipelines
│ └───data_science
│ │ test_data_science_pipeline.py
使用fixtures¶
在我们的测试中,可以看到dummy_data和dummy_parameters被定义了三次(大部分情况下)使用相同的值。相反,我们可以将这些定义在测试外部作为pytest fixtures:
Click to expand
import pytest
@pytest.fixture
def dummy_data():
return pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
@pytest.fixture
def dummy_parameters():
parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
return parameters
然后我们可以通过这些测试参数来访问它们。
def test_split_data(dummy_data, dummy_parameters):
...
流水线切片¶
在测试test_data_science_pipeline中,我们验证当前定义的数据科学流水线能否成功运行。然而,由于流水线并非静态不变,该测试并不具备鲁棒性。相反,我们应该明确定义待测流水线的具体范围;为此我们使用pipeline slicing来指定流水线的起点和终点:
def test_data_science_pipeline(self):
# Arrange pipeline
pipeline = create_pipeline().from_nodes("split_data_node").to_nodes("evaluate_model_node")
...
这确保了即使向流水线添加更多节点,测试仍将按设计执行。
在整合这些测试实践后,我们的测试文件test_data_science_pipeline.py变为:
# tests/pipelines/test_data_science_pipeline.py
import logging
import pandas as pd
import pytest
from kedro.io import DataCatalog
from kedro.runner import SequentialRunner
from spaceflights.pipelines.data_science import create_pipeline as create_ds_pipeline
from spaceflights.pipelines.data_science.nodes import split_data
@pytest.fixture
def dummy_data():
return pd.DataFrame(
{
"engines": [1, 2, 3],
"crew": [4, 5, 6],
"passenger_capacity": [5, 6, 7],
"price": [120, 290, 30],
}
)
@pytest.fixture
def dummy_parameters():
parameters = {
"model_options": {
"test_size": 0.2,
"random_state": 3,
"features": ["engines", "passenger_capacity", "crew"],
}
}
return parameters
def test_split_data(dummy_data, dummy_parameters):
X_train, X_test, y_train, y_test = split_data(
dummy_data, dummy_parameters["model_options"]
)
assert len(X_train) == 2
assert len(y_train) == 2
assert len(X_test) == 1
assert len(y_test) == 1
def test_split_data_missing_price(dummy_data, dummy_parameters):
dummy_data_missing_price = dummy_data.drop(columns="price")
with pytest.raises(KeyError) as e_info:
X_train, X_test, y_train, y_test = split_data(dummy_data_missing_price, dummy_parameters["model_options"])
assert "price" in str(e_info.value)
def test_data_science_pipeline(caplog, dummy_data, dummy_parameters):
pipeline = (
create_ds_pipeline()
.from_nodes("split_data_node")
.to_nodes("evaluate_model_node")
)
catalog = DataCatalog()
catalog.add_feed_dict(
{
"model_input_table" : dummy_data,
"params:model_options": dummy_parameters["model_options"],
}
)
caplog.set_level(logging.DEBUG, logger="kedro")
successful_run_msg = "Pipeline execution completed successfully."
SequentialRunner().run(pipeline, catalog)
assert successful_run_msg in caplog.text
运行你的测试¶
首先,确认您的项目已在本地安装完成。可以通过导航至项目根目录并运行以下命令来实现:
pip install -e .
此步骤允许pytest准确解析测试文件中的导入语句。
注意: 选项
-e会安装项目的可编辑版本,这样您可以在不每次重新安装的情况下修改项目文件。
请确保已安装pytest。有关使用pytest进行设置的更多信息,请参阅我们的自动化测试文档。
要运行测试,请从项目根目录下执行pytest命令。
cd <project_root>
pytest tests/pipelines/test_data_science_pipeline.py
你应该在终端中看到以下输出。
============================= test session starts ==============================
...
collected 2 items
tests/pipelines/test_data_science_pipeline.py .. [100%]
============================== 2 passed in 4.38s ===============================
该输出表明文件tests/pipelines/test_data_science_pipeline.py中的所有测试均无错误运行。