切片管道¶
有时需要运行管道节点的一个子集或"切片"。主要有两种方法可以实现这一点:
通过Kedro-Viz可视化操作: 这种方法允许您直观地选择和切分流水线节点,随后会生成运行命令用于在Kedro项目中执行切分后的部分。具体操作步骤详见Kedro-Viz文档:Slice a Pipeline。

通过Kedro CLI以编程方式实现。 您也可以使用Kedro CLI向
kedro run命令传递参数并切分流水线。本页将展示Kedro提供的编程选项。
让我们再次回顾管道介绍文档中的示例管道,它用于计算一组数字的方差:
Click to expand
def mean(xs, n):
return sum(xs) / n
def mean_sos(xs, n):
return sum(x**2 for x in xs) / n
def variance(m, m2):
return m2 - m * m
full_pipeline = pipeline(
[
node(len, "xs", "n"),
node(mean, ["xs", "n"], "m", name="mean_node", tags="mean"),
node(mean_sos, ["xs", "n"], "m2", name="mean_sos", tags=["mean", "variance"]),
node(variance, ["m", "m2"], "v", name="variance_node", tags="variance"),
]
)
Pipeline.describe() 方法返回以下输出:
Click to expand
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean_node
mean_sos
variance_node
Outputs: v
##################################
通过提供输入来切片管道¶
一种对流水线进行切片的方法是提供一组预先计算好的输入,作为流水线的起点。例如,若要从输入m2开始向下游切片运行流水线,可以这样指定:
Click to expand
print(full_pipeline.from_inputs("m2").describe())
输出:
#### Pipeline execution order ####
Name: None
Inputs: m, m2
variance_node
Outputs: v
##################################
从输入 m 和 xs 对管道进行切片会生成以下管道:
Click to expand
print(full_pipeline.from_inputs("m", "xs").describe())
输出:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean_node
mean_sos
variance_node
Outputs: v
##################################
如您所见,在from_inputs列表中添加m并不能保证当指定了另一个输入如xs时它不会被重新计算。
通过指定节点切片管道¶
另一种切分流水线的方式是指定作为新流水线起点的节点。例如:
Click to expand
print(full_pipeline.from_nodes("mean_node").describe())
输出:
#### Pipeline execution order ####
Name: None
Inputs: m2, n, xs
mean_node
variance_node
Outputs: v
##################################
如您所见,这将切片管道并从指定节点运行到所有下游节点。
您可以通过在终端窗口中运行以下命令来执行生成的管道切片:
kedro run --from-nodes=mean_node
通过指定最终节点来切片管道¶
同样,您可以指定用于终止管道的节点。例如:
Click to expand
print(full_pipeline.to_nodes("mean_node").describe())
输出:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean_node
Outputs: m
##################################
如您所见,这将切片管道,使其从开始运行并结束于指定节点:
kedro run --to-nodes=mean_node
你也可以通过指定起始和结束节点来对流水线进行切片,从而确定要包含在流水线切片中的节点集合:
kedro run --from-nodes=A --to-nodes=Z
或者,当指定多个节点时:
kedro run --from-nodes=A,D --to-nodes=X,Y,Z
通过标记节点切片管道¶
您还可以根据节点上附加的特定标签对管道进行切片。例如,对于同时具有标签mean和标签variance的节点,可以运行以下命令:
Click to expand
print(full_pipeline.only_nodes_with_tags("mean", "variance").describe())
输出:
#### Pipeline execution order ####
Inputs: n, xs
mean_sos
Outputs: m2
##################################
要从具有标签 mean 或 标签 variance 的节点中切分管道:
Click to expand
sliced_pipeline = full_pipeline.only_nodes_with_tags(
"mean"
) + full_pipeline.only_nodes_with_tags("variance")
print(sliced_pipeline.describe())
输出:
#### Pipeline execution order ####
Inputs: n, xs
mean
mean_sos
variance
Outputs: v
##################################
通过运行指定节点来切片管道¶
有时您可能只需要运行管道中的部分节点,如下所示:
Click to expand
print(full_pipeline.only_nodes("mean_node", "mean_sos").describe())
输出:
#### Pipeline execution order ####
Name: None
Inputs: n, xs
mean_node
mean_sos
Outputs: m, m2
##################################
这将创建一个切片化的流水线,由您在方法调用中指定的节点组成。
注意
指定节点所需的所有输入必须存在,即已在数据目录中生成或存在。
如何重新生成缺失的输出¶
Kedro 可以自动从现有节点输出生成切片管道。如果您想避免重新运行耗时较长的节点,这会很有帮助:
Click to expand
print(full_pipeline.describe())
输出:
#### Pipeline execution order ####
Name: None
Inputs: xs
len([xs]) -> [n]
mean_node
mean_sos
variance_node
Outputs: v
##################################
为了演示这一点,让我们使用JSONDataset保存中间输出n。
Click to expand
from kedro_datasets.pandas import JSONDataset
from kedro.io import DataCatalog, MemoryDataset
n_json = JSONDataset(filepath="./data/07_model_output/len.json")
io = DataCatalog(dict(xs=MemoryDataset([1, 2, 3]), n=n_json))
由于之前未保存n,检查其是否存在将返回False:
Click to expand
io.exists("n")
输出:
Out[15]: False
运行管道计算n并将结果保存到磁盘:
Click to expand
SequentialRunner().run(full_pipeline, io)
输出:
Out[16]: {'v': 0.666666666666667}
io.exists("n")
输出:
Out[17]: True
我们可以通过使用Runner.run_only_missing方法来避免重新计算n(以及所有其他已保存的结果)。请注意,原始管道的第一个节点(len([xs]) -> [n])尚未运行:
Click to expand
SequentialRunner().run_only_missing(full_pipeline, io)
输出:
Out[18]: {'v': 0.666666666666667}
try:
os.remove("./data/07_model_output/len.json")
except FileNotFoundError:
pass