模块化管道

在许多典型的Kedro项目中,随着项目发展,单一("主")管道的复杂性会不断增加。为使项目保持适用性,我们建议将代码拆分为逻辑隔离且可复用的不同管道(模块)。理想情况下,每个管道应组织在独立的文件夹中,便于在项目内或跨项目进行复制和重用。简而言之:一个管道对应一个文件夹。

Kedro通过以下工具支持模块化管道的概念:

如何使用kedro pipeline create命令创建新的空白管道

要创建一个新的模块化管道,请使用以下命令:

kedro pipeline create <pipeline_name>

运行此命令后,项目中将会创建一个包含样板文件夹和文件的新管道。为方便使用,Kedro提供了管道专用的nodes.pypipeline.py、参数文件以及相应的tests测试结构。同时还会自动添加对应的__init__.py文件。下方展示了生成的文件夹结构:

├── conf
│   └── base
│       └── parameters_{{pipeline_name}}.yml  <-- Pipeline-specific parameters
└── src
    ├── my_project
    │   ├── __init__.py
    │   └── pipelines
    │       ├── __init__.py
    │       └── {{pipeline_name}}      <-- This folder defines the modular pipeline
    │           ├── __init__.py        <-- So that Python treats this pipeline as a module
    │           ├── nodes.py           <-- To declare your nodes
    │           └── pipeline.py        <-- To structure the pipeline itself
    └── tests
        ├── __init__.py
        └── pipelines
            ├── __init__.py
            └── {{pipeline_name}}      <-- Pipeline-specific tests
                ├── __init__.py
                └── test_pipeline.py

如果想删除现有的流水线,可以使用kedro pipeline delete 命令来实现。

注意

要查看所有可用的CLI选项列表,可以运行kedro pipeline create --help

如何构建您的流水线创建

在使用kedro pipeline create创建管道后,您将在pipeline.py中找到模板代码,您需要用实际的管道代码填充这些模板:

# src/my_project/pipelines/{{pipeline_name}}/pipeline.py
from kedro.pipeline import Pipeline, pipeline

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline([])

在这里,您正在创建一个create_pipeline()函数,该函数借助pipeline函数返回一个Pipeline类实例。您应该保持函数名为create_pipeline(),因为这允许kedro自动发现管道。否则,管道需要手动注册

在填充pipeline.py节点之前,我们建议将所有节点函数存储在nodes.py中。根据前面的示例,我们应该将函数mean()mean_sos()variance()添加到nodes.py中:

# src/my_project/pipelines/{{pipeline_name}}/nodes.py
def mean(xs, n):
    return sum(xs) / n

def mean_sos(xs, n):
    return sum(x**2 for x in xs) / n

def variance(m, m2):
    return m2 - m * m

然后我们可以按照以下方式将这些节点组装成一个管道:

# src/my_project/pipelines/{{pipeline_name}}/pipelines.py
from kedro.pipeline import Pipeline, pipeline, node

from .nodes import mean, mean_sos, variance
# Import node functions from nodes.py located in the same folder

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(len, "xs", "n"),
            node(mean, ["xs", "n"], "m", name="mean_node", tags="tag1"),
            node(mean_sos, ["xs", "n"], "m2", name="mean_sos", tags=["tag1", "tag2"]),
            node(variance, ["m", "m2"], "v", name="variance_node"),
        ],  # A list of nodes and pipelines combined into a new pipeline
        tags="tag3",  # Optional, each pipeline node will be tagged
        namespace="",  # Optional
        inputs={},  # Optional
        outputs={},  # Optional
        parameters={},  # Optional
    )

这里展示了流水线创建函数有几个可选参数,你可以使用:

  • 在流水线级别添加标签,以便将其应用于流水线内的所有节点

  • 命名空间、输入、输出和参数以复用流水线。更多相关信息请参阅使用命名空间复用流水线

如何使用自定义的新管道模板

如果想使用自定义的Cookiecutter模板生成管道,可以将其保存在/templates/pipeline目录下。 执行kedro pipeline create命令时,系统会自动将项目中的自定义模板作为默认选项。您也可以通过--template参数指定自定义Cookiecutter管道模板的路径,示例如下:

kedro pipeline create <pipeline_name> --template <path_to_template>

通过--template参数传递给kedro pipeline create的模板文件夹将优先于任何本地模板。 Kedro支持在项目中只保留一个流水线模板。如果需要多个流水线模板,建议将它们保存在单独的文件夹中,并通过--template标志指向它们。

创建自定义流水线模板

您需要负责为自定义流水线创建功能性的Cookiecutter模板。请确保您理解流水线的基本结构。您的模板应渲染为有效的、可导入的Python模块,其中包含一个顶层create_pipeline函数,该函数返回一个Pipeline对象。您还需要准备适当的configtests子目录,这些子目录将在创建流水线时被复制到项目的configtests目录中。configtests目录需要遵循与默认模板相同的布局且不能自定义,但参数内容和实际测试文件可以更改。除此之外,文件和文件夹的名称或结构无关紧要,可以根据您的需求进行自定义。您可以使用Kedro的默认模板作为起点。

流水线模板使用Cookiecutter渲染,且必须包含一个cookiecutter.json文件。 参考Kedro默认模板中的cookiecutter.json文件作为示例。 需要注意的是,若将自定义流水线模板嵌入Kedro启动模板中,必须告知Cookiecutter在从启动模板创建新项目时不渲染该模板。为此, 需在启动模板的cookiecutter.json文件中添加_copy_without_render: ["templates"]配置项, 而非在流水线模板的cookiecutter.json中添加。

提供特定于管道的依赖项

  • 一个管道可能在本地requirements.txt文件中指定了外部依赖项。

  • 微打包过程中会收集特定于管道的依赖项。

  • 这些依赖项需要使用 pip 手动安装:

pip install -r requirements.txt

如何分享你的管道

警告: Micro-packaging 已弃用,将在 Kedro 1.0.0 版本中移除。

通过微包可以在不同Kedro代码库之间共享流水线,但必须遵循几条规则以确保可移植性:

  • 您想要共享的流水线需要在其文件夹结构上进行分离。kedro pipeline create 命令使这一操作变得简单。

  • 流水线不应依赖于主Python包,否则会破坏其向其他项目的可移植性。

  • 在共享/使用流水线时,目录引用不会被打包,即catalog.yml文件不会被包含在内。

  • Kedro 只会查找 conf/ 目录下的顶层配置;在流水线文件夹内放置配置文件夹不会产生任何效果。

  • 我们建议您在本地README.md文件中记录所需的配置(参数和目录),以供任何下游使用者参考。