管道注册表

使用Kedro 0.17.2或更高版本生成的项目会在src//pipeline_registry.py中定义其管道。该文件会填充project中的pipelines变量,Kedro CLI和插件通过该变量访问项目管道。pipeline_registry模块必须包含一个顶层register_pipelines()函数,该函数返回从管道名称到Pipeline对象的映射。例如,已完成的spaceflights教程的Kedro起始模板中的管道注册表可以定义以下register_pipelines()函数,该函数暴露数据处理管道、数据科学管道以及第三个默认管道(该管道组合了前述两个管道):

import spaceflights.pipelines.data_processing as dp
import spaceflights.pipelines.data_science as ds


def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    data_processing_pipeline = dp.create_pipeline()
    data_science_pipeline = ds.create_pipeline()

    return {
        "__default__": data_processing_pipeline + data_science_pipeline,
        "data_processing": data_processing_pipeline,
        "data_science": data_science_pipeline,
    }

提醒一下,运行kedro run时不带--pipeline选项会执行默认管道

注意

将各个流水线(pipeline)组合在一起的顺序并不重要(data_science_pipeline + data_processing_pipeline会产生相同结果),因为Kedro会自动检测最终流水线中所有节点(node)以数据为中心的执行顺序。

管道自动发现

在上面的示例中,每当创建一个应作为项目流水线返回的流水线时,您都需要更新register_pipelines()函数。从Kedro 0.18.3开始,您可以使用 find_pipelines()以更少的代码实现相同的结果。更新后的流水线注册表不包含项目特定代码:

def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["__default__"] = sum(pipelines.values())
    return pipelines

在底层实现中,find_pipelines()函数会遍历src//pipelines/目录,并返回一个从管道目录名到Pipeline对象的映射关系,具体通过以下方式实现:

  1. 导入 .pipelines. 模块

  2. 调用由.pipelines.模块暴露的create_pipeline()函数

  3. 验证构建的对象是一个Pipeline

默认情况下,如果这些步骤中的任何一个失败,find_pipelines()(或find_pipelines(raise_errors=False))会发出适当的警告并跳过当前流水线但继续遍历。在开发过程中,这使您能够运行项目中部分可用的流水线,即使其他流水线存在问题或仍在开发中。

如果指定find_pipelines(raise_errors=True),自动发现过程将在首次出现错误时失败。在生产环境中,这可以确保错误被提前捕获,避免流水线被意外排除。

find_pipelines()返回的映射关系可以被修改,这意味着你不局限于使用上述每个create_pipeline()函数返回的流水线。例如,要添加一个不属于默认流水线的数据工程流水线,可以在构建默认流水线之后将其添加到字典中:

def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["__default__"] = sum(pipelines.values())
    pipelines["data_engineering"] = pipeline(
        pipelines["data_processing"], tags="data_engineering"
    )
    return pipelines

注意

在上述情况下,kedro run --tags data_engineering 不会运行数据工程流水线,因为它不属于默认流水线。要运行数据工程流水线,您需要指定 kedro run --pipeline data_engineering --tags data_engineering

另一方面,您也可以在将管道分配到默认管道之前进行修改,例如通过pipelines["__default__"] = sum(pipelines.values())将其包含在默认管道中。举例来说,您可以在pipeline_registry.py中使用data_engineering标签更新data_processing管道,并将此变更也包含在默认管道中:

def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["data_processing"] = pipeline(
        pipelines["data_processing"], tags="data_engineering"
    )
    pipelines["__default__"] = sum(pipelines.values())
    return pipelines