管道注册表¶
使用Kedro 0.17.2或更高版本生成的项目会在src/中定义其管道。该文件会填充project中的pipelines变量,Kedro CLI和插件通过该变量访问项目管道。pipeline_registry模块必须包含一个顶层register_pipelines()函数,该函数返回从管道名称到Pipeline对象的映射。例如,已完成的spaceflights教程的Kedro起始模板中的管道注册表可以定义以下register_pipelines()函数,该函数暴露数据处理管道、数据科学管道以及第三个默认管道(该管道组合了前述两个管道):
import spaceflights.pipelines.data_processing as dp
import spaceflights.pipelines.data_science as ds
def register_pipelines() -> Dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
data_processing_pipeline = dp.create_pipeline()
data_science_pipeline = ds.create_pipeline()
return {
"__default__": data_processing_pipeline + data_science_pipeline,
"data_processing": data_processing_pipeline,
"data_science": data_science_pipeline,
}
提醒一下,运行kedro run时不带--pipeline选项会执行默认管道。
注意
将各个流水线(pipeline)组合在一起的顺序并不重要(data_science_pipeline + data_processing_pipeline会产生相同结果),因为Kedro会自动检测最终流水线中所有节点(node)以数据为中心的执行顺序。
管道自动发现¶
在上面的示例中,每当创建一个应作为项目流水线返回的流水线时,您都需要更新register_pipelines()函数。从Kedro 0.18.3开始,您可以使用 find_pipelines()以更少的代码实现相同的结果。更新后的流水线注册表不包含项目特定代码:
def register_pipelines() -> Dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = find_pipelines()
pipelines["__default__"] = sum(pipelines.values())
return pipelines
在底层实现中,find_pipelines()函数会遍历src/目录,并返回一个从管道目录名到Pipeline对象的映射关系,具体通过以下方式实现:
导入
模块.pipelines. 调用由
模块暴露的.pipelines. create_pipeline()函数验证构建的对象是一个
Pipeline
默认情况下,如果这些步骤中的任何一个失败,find_pipelines()(或find_pipelines(raise_errors=False))会发出适当的警告并跳过当前流水线但继续遍历。在开发过程中,这使您能够运行项目中部分可用的流水线,即使其他流水线存在问题或仍在开发中。
如果指定find_pipelines(raise_errors=True),自动发现过程将在首次出现错误时失败。在生产环境中,这可以确保错误被提前捕获,避免流水线被意外排除。
由find_pipelines()返回的映射关系可以被修改,这意味着你不局限于使用上述每个create_pipeline()函数返回的流水线。例如,要添加一个不属于默认流水线的数据工程流水线,可以在构建默认流水线之后将其添加到字典中:
def register_pipelines() -> Dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = find_pipelines()
pipelines["__default__"] = sum(pipelines.values())
pipelines["data_engineering"] = pipeline(
pipelines["data_processing"], tags="data_engineering"
)
return pipelines
注意
在上述情况下,kedro run --tags data_engineering 不会运行数据工程流水线,因为它不属于默认流水线。要运行数据工程流水线,您需要指定 kedro run --pipeline data_engineering --tags data_engineering。
另一方面,您也可以在将管道分配到默认管道之前进行修改,例如通过pipelines["__default__"] = sum(pipelines.values())将其包含在默认管道中。举例来说,您可以在pipeline_registry.py中使用data_engineering标签更新data_processing管道,并将此变更也包含在默认管道中:
def register_pipelines() -> Dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = find_pipelines()
pipelines["data_processing"] = pipeline(
pipelines["data_processing"], tags="data_engineering"
)
pipelines["__default__"] = sum(pipelines.values())
return pipelines