切片管道

有时需要运行管道节点的一个子集或"切片"。主要有两种方法可以实现这一点:

  1. 通过Kedro-Viz可视化操作: 这种方法允许您直观地选择和切分流水线节点,随后会生成运行命令用于在Kedro项目中执行切分后的部分。具体操作步骤详见Kedro-Viz文档:Slice a Pipeline

  1. 通过Kedro CLI以编程方式实现。 您也可以使用Kedro CLI向kedro run命令传递参数并切分流水线。本页将展示Kedro提供的编程选项。

让我们再次回顾管道介绍文档中的示例管道,它用于计算一组数字的方差:

Click to expand
def mean(xs, n):
    return sum(xs) / n


def mean_sos(xs, n):
    return sum(x**2 for x in xs) / n


def variance(m, m2):
    return m2 - m * m


full_pipeline = pipeline(
    [
        node(len, "xs", "n"),
        node(mean, ["xs", "n"], "m", name="mean_node", tags="mean"),
        node(mean_sos, ["xs", "n"], "m2", name="mean_sos", tags=["mean", "variance"]),
        node(variance, ["m", "m2"], "v", name="variance_node", tags="variance"),
    ]
)

Pipeline.describe() 方法返回以下输出:

Click to expand
#### Pipeline execution order ####
Name: None
Inputs: xs

len([xs]) -> [n]
mean_node
mean_sos
variance_node

Outputs: v
##################################

通过提供输入来切片管道

一种对流水线进行切片的方法是提供一组预先计算好的输入,作为流水线的起点。例如,若要从输入m2开始向下游切片运行流水线,可以这样指定:

Click to expand
print(full_pipeline.from_inputs("m2").describe())

输出:

#### Pipeline execution order ####
Name: None
Inputs: m, m2

variance_node

Outputs: v
##################################

从输入 mxs 对管道进行切片会生成以下管道:

Click to expand
print(full_pipeline.from_inputs("m", "xs").describe())

输出:

#### Pipeline execution order ####
Name: None
Inputs: xs

len([xs]) -> [n]
mean_node
mean_sos
variance_node

Outputs: v
##################################

如您所见,在from_inputs列表中添加m并不能保证当指定了另一个输入如xs时它不会被重新计算。

通过指定节点切片管道

另一种切分流水线的方式是指定作为新流水线起点的节点。例如:

Click to expand
print(full_pipeline.from_nodes("mean_node").describe())

输出:

#### Pipeline execution order ####
Name: None
Inputs: m2, n, xs

mean_node
variance_node

Outputs: v
##################################

如您所见,这将切片管道并从指定节点运行到所有下游节点。

您可以通过在终端窗口中运行以下命令来执行生成的管道切片:

kedro run --from-nodes=mean_node

通过指定最终节点来切片管道

同样,您可以指定用于终止管道的节点。例如:

Click to expand
print(full_pipeline.to_nodes("mean_node").describe())

输出:

#### Pipeline execution order ####
Name: None
Inputs: xs

len([xs]) -> [n]
mean_node

Outputs: m
##################################

如您所见,这将切片管道,使其从开始运行并结束于指定节点:

kedro run --to-nodes=mean_node

你也可以通过指定起始和结束节点来对流水线进行切片,从而确定要包含在流水线切片中的节点集合:

kedro run --from-nodes=A --to-nodes=Z

或者,当指定多个节点时:

kedro run --from-nodes=A,D --to-nodes=X,Y,Z

通过标记节点切片管道

您还可以根据节点上附加的特定标签对管道进行切片。例如,对于同时具有标签mean标签variance的节点,可以运行以下命令:

Click to expand
print(full_pipeline.only_nodes_with_tags("mean", "variance").describe())

输出:

#### Pipeline execution order ####
Inputs: n, xs

mean_sos

Outputs: m2
##################################

要从具有标签 mean 标签 variance 的节点中切分管道:

Click to expand
sliced_pipeline = full_pipeline.only_nodes_with_tags(
    "mean"
) + full_pipeline.only_nodes_with_tags("variance")
print(sliced_pipeline.describe())

输出:

#### Pipeline execution order ####
Inputs: n, xs

mean
mean_sos
variance

Outputs: v
##################################

通过运行指定节点来切片管道

有时您可能只需要运行管道中的部分节点,如下所示:

Click to expand
print(full_pipeline.only_nodes("mean_node", "mean_sos").describe())

输出:

#### Pipeline execution order ####
Name: None
Inputs: n, xs

mean_node
mean_sos

Outputs: m, m2
##################################

这将创建一个切片化的流水线,由您在方法调用中指定的节点组成。

注意

指定节点所需的所有输入必须存在,即已在数据目录中生成或存在。

如何重新生成缺失的输出

Kedro 可以自动从现有节点输出生成切片管道。如果您想避免重新运行耗时较长的节点,这会很有帮助:

Click to expand
print(full_pipeline.describe())

输出:

#### Pipeline execution order ####
Name: None
Inputs: xs

len([xs]) -> [n]
mean_node
mean_sos
variance_node

Outputs: v
##################################

为了演示这一点,让我们使用JSONDataset保存中间输出n

Click to expand
from kedro_datasets.pandas import JSONDataset
from kedro.io import DataCatalog, MemoryDataset

n_json = JSONDataset(filepath="./data/07_model_output/len.json")
io = DataCatalog(dict(xs=MemoryDataset([1, 2, 3]), n=n_json))

由于之前未保存n,检查其是否存在将返回False

Click to expand
io.exists("n")

输出:

Out[15]: False

运行管道计算n并将结果保存到磁盘:

Click to expand
SequentialRunner().run(full_pipeline, io)

输出:

Out[16]: {'v': 0.666666666666667}
io.exists("n")

输出:

Out[17]: True

我们可以通过使用Runner.run_only_missing方法来避免重新计算n(以及所有其他已保存的结果)。请注意,原始管道的第一个节点(len([xs]) -> [n])尚未运行:

Click to expand
SequentialRunner().run_only_missing(full_pipeline, io)

输出:

Out[18]: {'v': 0.666666666666667}
try:
    os.remove("./data/07_model_output/len.json")
except FileNotFoundError:
    pass