模块化管道¶
在许多典型的Kedro项目中,随着项目发展,单一("主")管道的复杂性会不断增加。为使项目保持适用性,我们建议将代码拆分为逻辑隔离且可复用的不同管道(模块)。理想情况下,每个管道应组织在独立的文件夹中,便于在项目内或跨项目进行复制和重用。简而言之:一个管道对应一个文件夹。
Kedro通过以下工具支持模块化管道的概念:
如何使用kedro pipeline create命令创建新的空白管道¶
要创建一个新的模块化管道,请使用以下命令:
kedro pipeline create <pipeline_name>
运行此命令后,项目中将会创建一个包含样板文件夹和文件的新管道。为方便使用,Kedro提供了管道专用的nodes.py、pipeline.py、参数文件以及相应的tests测试结构。同时还会自动添加对应的__init__.py文件。下方展示了生成的文件夹结构:
├── conf
│ └── base
│ └── parameters_{{pipeline_name}}.yml <-- Pipeline-specific parameters
└── src
├── my_project
│ ├── __init__.py
│ └── pipelines
│ ├── __init__.py
│ └── {{pipeline_name}} <-- This folder defines the modular pipeline
│ ├── __init__.py <-- So that Python treats this pipeline as a module
│ ├── nodes.py <-- To declare your nodes
│ └── pipeline.py <-- To structure the pipeline itself
└── tests
├── __init__.py
└── pipelines
├── __init__.py
└── {{pipeline_name}} <-- Pipeline-specific tests
├── __init__.py
└── test_pipeline.py
如果想删除现有的流水线,可以使用kedro pipeline delete 命令来实现。
注意
要查看所有可用的CLI选项列表,可以运行kedro pipeline create --help。
如何构建您的流水线创建¶
在使用kedro pipeline create创建管道后,您将在pipeline.py中找到模板代码,您需要用实际的管道代码填充这些模板:
# src/my_project/pipelines/{{pipeline_name}}/pipeline.py
from kedro.pipeline import Pipeline, pipeline
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([])
在这里,您正在创建一个create_pipeline()函数,该函数借助pipeline函数返回一个Pipeline类实例。您应该保持函数名为create_pipeline(),因为这允许kedro自动发现管道。否则,管道需要手动注册。
在填充pipeline.py节点之前,我们建议将所有节点函数存储在nodes.py中。根据前面的示例,我们应该将函数mean()、mean_sos()和variance()添加到nodes.py中:
# src/my_project/pipelines/{{pipeline_name}}/nodes.py
def mean(xs, n):
return sum(xs) / n
def mean_sos(xs, n):
return sum(x**2 for x in xs) / n
def variance(m, m2):
return m2 - m * m
然后我们可以按照以下方式将这些节点组装成一个管道:
# src/my_project/pipelines/{{pipeline_name}}/pipelines.py
from kedro.pipeline import Pipeline, pipeline, node
from .nodes import mean, mean_sos, variance
# Import node functions from nodes.py located in the same folder
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(len, "xs", "n"),
node(mean, ["xs", "n"], "m", name="mean_node", tags="tag1"),
node(mean_sos, ["xs", "n"], "m2", name="mean_sos", tags=["tag1", "tag2"]),
node(variance, ["m", "m2"], "v", name="variance_node"),
], # A list of nodes and pipelines combined into a new pipeline
tags="tag3", # Optional, each pipeline node will be tagged
namespace="", # Optional
inputs={}, # Optional
outputs={}, # Optional
parameters={}, # Optional
)
这里展示了流水线创建函数有几个可选参数,你可以使用:
在流水线级别添加标签,以便将其应用于流水线内的所有节点
命名空间、输入、输出和参数以复用流水线。更多相关信息请参阅使用命名空间复用流水线
如何使用自定义的新管道模板¶
如果想使用自定义的Cookiecutter模板生成管道,可以将其保存在目录下。
执行kedro pipeline create命令时,系统会自动将项目中的自定义模板作为默认选项。您也可以通过--template参数指定自定义Cookiecutter管道模板的路径,示例如下:
kedro pipeline create <pipeline_name> --template <path_to_template>
通过--template参数传递给kedro pipeline create的模板文件夹将优先于任何本地模板。
Kedro支持在项目中只保留一个流水线模板。如果需要多个流水线模板,建议将它们保存在单独的文件夹中,并通过--template标志指向它们。
创建自定义流水线模板¶
您需要负责为自定义流水线创建功能性的Cookiecutter模板。请确保您理解流水线的基本结构。您的模板应渲染为有效的、可导入的Python模块,其中包含一个顶层create_pipeline函数,该函数返回一个Pipeline对象。您还需要准备适当的config和tests子目录,这些子目录将在创建流水线时被复制到项目的config和tests目录中。config和tests目录需要遵循与默认模板相同的布局且不能自定义,但参数内容和实际测试文件可以更改。除此之外,文件和文件夹的名称或结构无关紧要,可以根据您的需求进行自定义。您可以使用Kedro的默认模板作为起点。
流水线模板使用Cookiecutter渲染,且必须包含一个cookiecutter.json文件。
参考Kedro默认模板中的cookiecutter.json文件作为示例。
需要注意的是,若将自定义流水线模板嵌入Kedro启动模板中,必须告知Cookiecutter在从启动模板创建新项目时不渲染该模板。为此,
需在启动模板的cookiecutter.json文件中添加_copy_without_render: ["templates"]配置项,
而非在流水线模板的cookiecutter.json中添加。
提供特定于管道的依赖项¶
一个管道可能在本地
requirements.txt文件中指定了外部依赖项。在微打包过程中会收集特定于管道的依赖项。
这些依赖项需要使用
pip手动安装:
pip install -r requirements.txt