运行一个管道¶
运行器¶
运行器是用于执行管道的执行机制。它们都继承自AbstractRunner。
SequentialRunner¶
使用SequentialRunner根据节点依赖关系逐个执行流水线节点。
我们建议在以下情况下使用SequentialRunner:
流水线的分支有限
流水线运行速度快
资源消耗大的步骤需要占用大部分稀缺资源(例如大量RAM、磁盘内存或CPU)
Kedro默认使用SequentialRunner,因此要按顺序执行管道:
kedro run
你也可以显式地使用SequentialRunner,如下所示:
kedro run --runner=SequentialRunner
ParallelRunner¶
多进程处理¶
你也可以选择使用ParallelRunner来并行运行管道中的节点,如下所示:
kedro run --runner=ParallelRunner
多线程¶
虽然ParallelRunner使用多进程处理,但您也可以通过指定ThreadRunner来使用多线程并发执行管道,如下所示:
kedro run --runner=ThreadRunner
注意
SparkDataset 无法与 ParallelRunner 正常工作。若要在使用 SparkDataset 的管道中实现并发,必须使用 ThreadRunner。
如需了解更多关于如何在使用Kedro与PySpark时最大化并发性的信息,请阅读我们的指南如何使用PySpark构建Kedro流水线。
自定义运行器¶
如果Kedro内置的运行器无法满足您的需求,您也可以在项目中自定义运行器。例如,您可能需要添加一个dry runner(空运行器),它可以列出将要运行但不会实际执行的节点:
Click to expand
# in src/<package_name>/runner.py
from typing import Any, Dict
from kedro.io import AbstractDataset, DataCatalog, MemoryDataset
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner
from pluggy import PluginManager
class DryRunner(AbstractRunner):
"""``DryRunner`` is an ``AbstractRunner`` implementation. It can be used to list which
nodes would be run without actually executing anything. It also checks if all the
necessary data exists.
"""
def __init__(self, is_async: bool = False, extra_dataset_patterns: Dict[str, Dict[str, Any]] = None):
"""Instantiates the runner class.
Args:
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog
during the run. This is used to set the default datasets.
"""
default_dataset_pattern = {"{default}": {"type": "MemoryDataset"}}
self._extra_dataset_patterns = extra_dataset_patterns or default_dataset_pattern
super().__init__(is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns)
def _run(
self,
pipeline: Pipeline,
catalog: DataCatalog,
hook_manager: PluginManager = None,
session_id: str = None,
) -> None:
"""The method implementing dry pipeline running.
Example logs output using this implementation:
kedro.runner.dry_runner - INFO - Actual run would execute 3 nodes:
node3: identity([A]) -> [B]
node2: identity([C]) -> [D]
node1: identity([D]) -> [E]
Args:
pipeline: The ``Pipeline`` to run.
catalog: The ``DataCatalog`` from which to fetch data.
hook_manager: The ``PluginManager`` to activate hooks.
session_id: The id of the session.
"""
nodes = pipeline.nodes
self._logger.info(
"Actual run would execute %d nodes:\n%s",
len(nodes),
pipeline.describe(),
)
self._logger.info("Checking inputs...")
input_names = pipeline.inputs()
missing_inputs = [
input_name
for input_name in input_names
if not catalog._get_dataset(input_name).exists()
]
if missing_inputs:
raise KeyError(f"Datasets {missing_inputs} not found.")
并通过--runner标志与kedro run一起使用:
$ kedro run --runner=<package_name>.runner.DryRunner
异步加载与保存¶
注意
ThreadRunner 不支持异步加载输入或保存输出操作。
在处理节点时,SequentialRunner和ParallelRunner都会按顺序执行以下步骤:
基于节点输入加载数据
使用输入执行节点函数
保存输出结果
如果一个节点有多个输入或输出(例如node(func, ["a", "b", "c"], ["d", "e", "f"])),可以通过使用异步模式来减少负载并节省时间。您可以通过在运行命令中传递--async标志来启用它,如下所示:
$ kedro run --async
...
2020-03-24 09:20:01,482 - kedro.runner.sequential_runner - INFO - Asynchronous mode is enabled for loading and saving data
...
注意
为了确保异步加载/保存能正常工作,运行中使用的所有数据集都必须具备线程安全特性。
按名称运行管道¶
要通过名称运行流水线,您需要将新流水线添加到src/文件中的register_pipelines()函数:
Click to expand
def register_pipelines():
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = find_pipelines()
pipelines["__default__"] = sum(pipelines.values())
my_pipeline = pipeline(
[
# your definition goes here
]
)
pipelines["my_pipeline"] = my_pipeline
return pipelines
然后,在命令行中执行以下操作:
kedro run --pipeline=my_pipeline
注意
如果不指定--pipeline选项而直接运行kedro run,它将执行register_pipelines()返回的字典中的__default__流水线。
有关kedro run的更多信息,请参阅Kedro CLI文档。
使用IO运行管道¶
上述管道定义仅适用于不与外界交互的无状态或"纯"管道。在实际应用中,我们通常需要与API、数据库、文件和其他数据源进行交互。通过将IO与管道相结合,我们可以处理这些更复杂的用例。
通过使用IO模块中的DataCatalog,我们仍然能够编写处理数据的纯函数,并将文件保存和加载操作外包给DataCatalog。
通过DataCatalog,我们可以控制输入数据的加载来源、中间变量的持久化位置以及最终输出变量的写入位置。
在一个简单的示例中,我们定义了一个名为xs的MemoryDataset来存储输入数据,将输入列表[1, 2, 3]保存到xs中,然后实例化SequentialRunner并调用其run方法,传入管道和数据目录实例:
Click to expand
io = DataCatalog(dict(xs=MemoryDataset()))
io.list()
输出:
Out[10]: ['xs']
io.save("xs", [1, 2, 3])
SequentialRunner().run(pipeline, catalog=io)
输出:
Out[11]: {'v': 0.666666666666667}
配置 kedro run 参数¶
Kedro CLI文档列出了kedro run可用的命令行选项。您也可以提供一个包含kedro run参数的配置文件。
以下是一个名为config.yml的示例文件,但您可以为该文件选择任意名称:
$ kedro run --config=config.yml
其中 config.yml 的格式如下(示例):
run:
tags: tag1, tag2, tag3
pipeline: pipeline1
runner: ParallelRunner
node_names: node1, node2
env: env1
使用CLI时的选项语法与配置文件不同。在CLI中需要使用短横线,例如kedro run --from-nodes=...,但在配置文件中必须使用下划线:
run:
from_nodes: ...
这是因为配置文件由Click(一个用于处理命令行界面的Python包)进行解析。Click将配置文件中定义的选项传递给Python函数,这些选项名称必须与函数中的参数名称相匹配。
Python中的变量名和参数只能包含字母数字字符和下划线,因此在使用配置文件时,选项名称中不能包含短横线。
注意
如果您同时提供了配置文件以及与配置文件冲突的CLI选项,CLI选项将优先生效。