运行一个管道

运行器

运行器是用于执行管道的执行机制。它们都继承自AbstractRunner

SequentialRunner

使用SequentialRunner根据节点依赖关系逐个执行流水线节点。

我们建议在以下情况下使用SequentialRunner

  • 流水线的分支有限

  • 流水线运行速度快

  • 资源消耗大的步骤需要占用大部分稀缺资源(例如大量RAM、磁盘内存或CPU)

Kedro默认使用SequentialRunner,因此要按顺序执行管道:

kedro run

你也可以显式地使用SequentialRunner,如下所示:

kedro run --runner=SequentialRunner

ParallelRunner

多进程处理

你也可以选择使用ParallelRunner来并行运行管道中的节点,如下所示:

kedro run --runner=ParallelRunner

多线程

虽然ParallelRunner使用多进程处理,但您也可以通过指定ThreadRunner来使用多线程并发执行管道,如下所示:

kedro run --runner=ThreadRunner

注意

SparkDataset 无法与 ParallelRunner 正常工作。若要在使用 SparkDataset 的管道中实现并发,必须使用 ThreadRunner

如需了解更多关于如何在使用Kedro与PySpark时最大化并发性的信息,请阅读我们的指南如何使用PySpark构建Kedro流水线

自定义运行器

如果Kedro内置的运行器无法满足您的需求,您也可以在项目中自定义运行器。例如,您可能需要添加一个dry runner(空运行器),它可以列出将要运行但不会实际执行的节点:

Click to expand
# in src/<package_name>/runner.py
from typing import Any, Dict
from kedro.io import AbstractDataset, DataCatalog, MemoryDataset
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner
from pluggy import PluginManager


class DryRunner(AbstractRunner):
    """``DryRunner`` is an ``AbstractRunner`` implementation. It can be used to list which
    nodes would be run without actually executing anything. It also checks if all the
    necessary data exists.
    """

    def __init__(self, is_async: bool = False, extra_dataset_patterns: Dict[str, Dict[str, Any]] = None):
        """Instantiates the runner class.

        Args:
            is_async: If True, the node inputs and outputs are loaded and saved
                asynchronously with threads. Defaults to False.
            extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog
                during the run. This is used to set the default datasets.
        """
        default_dataset_pattern = {"{default}": {"type": "MemoryDataset"}}
        self._extra_dataset_patterns = extra_dataset_patterns or default_dataset_pattern
        super().__init__(is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns)

    def _run(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog,
        hook_manager: PluginManager = None,
        session_id: str = None,
    ) -> None:
        """The method implementing dry pipeline running.
        Example logs output using this implementation:

            kedro.runner.dry_runner - INFO - Actual run would execute 3 nodes:
            node3: identity([A]) -> [B]
            node2: identity([C]) -> [D]
            node1: identity([D]) -> [E]

        Args:
            pipeline: The ``Pipeline`` to run.
            catalog: The ``DataCatalog`` from which to fetch data.
            hook_manager: The ``PluginManager`` to activate hooks.
            session_id: The id of the session.

        """
        nodes = pipeline.nodes
        self._logger.info(
            "Actual run would execute %d nodes:\n%s",
            len(nodes),
            pipeline.describe(),
        )
        self._logger.info("Checking inputs...")
        input_names = pipeline.inputs()

        missing_inputs = [
            input_name
            for input_name in input_names
            if not catalog._get_dataset(input_name).exists()
        ]
        if missing_inputs:
            raise KeyError(f"Datasets {missing_inputs} not found.")

并通过--runner标志与kedro run一起使用:

$ kedro run --runner=<package_name>.runner.DryRunner

异步加载与保存

注意

ThreadRunner 不支持异步加载输入或保存输出操作。

在处理节点时,SequentialRunnerParallelRunner都会按顺序执行以下步骤:

  1. 基于节点输入加载数据

  2. 使用输入执行节点函数

  3. 保存输出结果

如果一个节点有多个输入或输出(例如node(func, ["a", "b", "c"], ["d", "e", "f"])),可以通过使用异步模式来减少负载并节省时间。您可以通过在运行命令中传递--async标志来启用它,如下所示:

$ kedro run --async
...
2020-03-24 09:20:01,482 - kedro.runner.sequential_runner - INFO - Asynchronous mode is enabled for loading and saving data
...

注意

为了确保异步加载/保存能正常工作,运行中使用的所有数据集都必须具备线程安全特性。

按名称运行管道

要通过名称运行流水线,您需要将新流水线添加到src//pipeline_registry.py文件中的register_pipelines()函数:

Click to expand
def register_pipelines():
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["__default__"] = sum(pipelines.values())
    my_pipeline = pipeline(
        [
            # your definition goes here
        ]
    )
    pipelines["my_pipeline"] = my_pipeline
    return pipelines

然后,在命令行中执行以下操作:

kedro run --pipeline=my_pipeline

注意

如果不指定--pipeline选项而直接运行kedro run,它将执行register_pipelines()返回的字典中的__default__流水线。

有关kedro run的更多信息,请参阅Kedro CLI文档

使用IO运行管道

上述管道定义仅适用于不与外界交互的无状态或"纯"管道。在实际应用中,我们通常需要与API、数据库、文件和其他数据源进行交互。通过将IO与管道相结合,我们可以处理这些更复杂的用例。

通过使用IO模块中的DataCatalog,我们仍然能够编写处理数据的纯函数,并将文件保存和加载操作外包给DataCatalog

通过DataCatalog,我们可以控制输入数据的加载来源、中间变量的持久化位置以及最终输出变量的写入位置。

在一个简单的示例中,我们定义了一个名为xsMemoryDataset来存储输入数据,将输入列表[1, 2, 3]保存到xs中,然后实例化SequentialRunner并调用其run方法,传入管道和数据目录实例:

Click to expand
io = DataCatalog(dict(xs=MemoryDataset()))
io.list()

输出:

Out[10]: ['xs']
io.save("xs", [1, 2, 3])
SequentialRunner().run(pipeline, catalog=io)

输出:

Out[11]: {'v': 0.666666666666667}

配置 kedro run 参数

Kedro CLI文档列出了kedro run可用的命令行选项。您也可以提供一个包含kedro run参数的配置文件。

以下是一个名为config.yml的示例文件,但您可以为该文件选择任意名称:

$ kedro run --config=config.yml

其中 config.yml 的格式如下(示例):

run:
  tags: tag1, tag2, tag3
  pipeline: pipeline1
  runner: ParallelRunner
  node_names: node1, node2
  env: env1

使用CLI时的选项语法与配置文件不同。在CLI中需要使用短横线,例如kedro run --from-nodes=...,但在配置文件中必须使用下划线:

run:
  from_nodes: ...

这是因为配置文件由Click(一个用于处理命令行界面的Python包)进行解析。Click将配置文件中定义的选项传递给Python函数,这些选项名称必须与函数中的参数名称相匹配。

Python中的变量名和参数只能包含字母数字字符和下划线,因此在使用配置文件时,选项名称中不能包含短横线。

注意

如果您同时提供了配置文件以及与配置文件冲突的CLI选项,CLI选项将优先生效。