管道对象¶
我们之前介绍了节点作为表示任务的基础构建块,可以在管道中组合以构建您的工作流。管道组织节点集合的依赖关系和执行顺序,并在保持代码模块化的同时连接输入和输出。管道通过解析依赖关系来确定节点执行顺序,并不一定按照传入节点的顺序运行它们。
利用Kedro自动依赖解析的优势在于,您可以将节点串联成Pipeline,这是一个使用共享变量集的节点列表。该类可以通过pipeline方法创建,基于节点或其他管道(在这种情况下将使用该管道中的所有节点)。
以下部分将解释如何创建和使用Kedro管道:
如何构建管道¶
在以下示例中,我们构建了一个计算一组数字方差的简单管道。实际应用中,管道可以使用更复杂的节点定义,且它们使用的变量通常对应于完整的数据集:
from kedro.pipeline import pipeline, node
def mean(xs, n):
return sum(xs) / n
def mean_sos(xs, n):
return sum(x**2 for x in xs) / n
def variance(m, m2):
return m2 - m * m
variance_pipeline = pipeline(
[
node(len, "xs", "n"),
node(mean, ["xs", "n"], "m", name="mean_node"),
node(mean_sos, ["xs", "n"], "m2", name="mean_sos"),
node(variance, ["m", "m2"], "v", name="variance_node"),
]
)
Kedro根据为每个节点指定的输入和输出来确定这些节点的执行顺序。在这个示例中:
第一个节点计算
xs的长度并将其输出为n。第二个节点使用
xs和n计算平均值,并将其输出为m。第三个节点使用
xs和n计算平方和的平均值(mean_sos),并将其输出为m2。第四个节点使用均值(
m)和均方和(m2)计算方差,并将其输出为v。
Kedro的依赖解析算法确保每个节点仅在其所需输入从先前节点的输出中可用后才会运行。通过这种方式,节点会根据定义的依赖关系自动按正确顺序执行。
如何使用describe来发现管道中包含哪些节点¶
你可以使用describe方法来获取管道中节点及其执行顺序的概览。
print(variance_pipeline.describe())
输出如下:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean_node
mean_sos
variance_node
Outputs: v
##################################
如何合并多个管道¶
您可以像下面这样合并多个流水线。请注意,在这种情况下,pipeline_de 和 pipeline_ds 会被展开为它们的基础节点列表,然后这些节点会被合并在一起:
pipeline_de = pipeline([node(len, "xs", "n"), node(mean, ["xs", "n"], "m")])
pipeline_ds = pipeline(
[node(mean_sos, ["xs", "n"], "m2"), node(variance, ["m", "m2"], "v")]
)
last_node = node(print, "v", None)
pipeline_all = pipeline([pipeline_de, pipeline_ds, last_node])
print(pipeline_all.describe())
输出如下:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean([n,xs]) -> [m]
mean_sos([n,xs]) -> [m2]
variance([m,m2]) -> [v]
print([v]) -> None
Outputs: None
##################################
如何获取管道中节点的信息¶
管道以拓扑顺序提供对其节点的访问,以实现自定义功能,例如管道可视化。每个节点都包含有关其输入和输出的信息:
nodes = variance_pipeline.nodes
nodes
输出如下:
[
Node(len, "xs", "n", None),
Node(mean, ["xs", "n"], "m", "mean_node"),
Node(mean_sos, ["xs", "n"], "m2", "mean_sos"),
Node(variance, ["m", "m2"], "v", "variance node"),
]
了解输入内容:
nodes[0].inputs
您应该会看到以下内容:
["xs"]
如何获取关于管道输入和输出的信息¶
与上述方法类似,您可以使用inputs()和outputs()来检查管道的输入和输出:
variance_pipeline.inputs()
提供以下内容:
Out[7]: {'xs'}
variance_pipeline.outputs()
显示输出:
Out[8]: {'v'}
如何标记一个pipeline¶
您还可以通过提供tags参数来标记您的流水线,这将标记流水线的所有节点。在以下示例中,两个节点都被标记为pipeline_tag。
pipeline = pipeline(
[node(..., name="node1"), node(..., name="node2")], tags="pipeline_tag"
)
你可以将流水线标记与节点标记结合使用。在以下示例中,node1和node2被标记为pipeline_tag,而node2还拥有一个node_tag。
pipeline = pipeline(
[node(..., name="node1"), node(..., name="node2", tags="node_tag")],
tags="pipeline_tag",
)
如何避免创建不良管道¶
管道通常能够轻松解析其依赖关系。但在某些情况下,依赖解析无法完成。此时,该管道就属于结构不良的管道。
包含问题节点的Pipeline¶
在这种情况下,我们有一个由单个节点组成的管道,该节点没有输入和输出:
try:
pipeline([node(lambda: print("!"), None, None)])
except Exception as e:
print(e)
输出如下:
Invalid Node definition: it must have some `inputs` or `outputs`.
Format should be: node(function, inputs, outputs)
具有循环依赖的Pipeline¶
对于每两个变量,如果第一个依赖于第二个,那么必须不存在第二种方式使得第二个也依赖于第一个,否则循环依赖将阻止我们编译管道。
第一个节点捕获了如何从x计算y的关系,第二个节点捕获了在已知y的情况下如何计算x的关系。这对节点不能同时存在于同一个管道中:
try:
pipeline(
[
node(lambda x: x + 1, "x", "y", name="first node"),
node(lambda y: y - 1, "y", "x", name="second node"),
]
)
except Exception as e:
print(e)
输出如下:
Circular dependencies exist among these items: ['first node: <lambda>([x]) -> [y]', 'second node: <lambda>([y]) -> [x]']
使用点符号命名的Pipeline节点¶
使用点符号命名的节点可能会出现异常行为。
pipeline([node(lambda x: x, inputs="input1kedro", outputs="output1.kedro")])
创建节点时,如果输入或输出名称包含.,可能会导致管道断开连接或Kedro结构格式不正确。
这是因为.在内部具有特殊含义,表示命名空间管道。在这个例子中,输出部分应该断开连接,因为名称暗示存在一个"output1"命名空间管道。输入没有命名空间化,但输出通过点符号进行了命名空间化。这导致Kedro分别处理每个部分。对于这个例子,更好的方法是将两者都写成input1_kedro和output1_kedro。
我们建议使用类似_的字符作为名称分隔符,而不是.。
如何在kedro项目中存储管道代码¶
在管理您的Kedro项目时,我们建议将相关任务分组到独立的流水线中,以实现模块化。一个项目通常包含许多任务,将频繁执行的任务组织到单独的流水线中有助于保持秩序和效率。理想情况下,每个流水线应该组织在自己的文件夹中,便于在项目内轻松复制和重用。简而言之:一个流水线对应一个文件夹。为了帮助实现这一点,Kedro引入了模块化流水线的概念,这将在下一节中详细描述。