在本地执行 KFP 管道
概述
KFP 支持在本地执行组件和管道,使您在远程运行代码之前能够进行紧密的开发循环。
在本地执行组件和管道很简单。只需使用 local.init() 初始化本地会话,然后像正常的 Python 函数一样调用组件或管道。
KFP 将记录关于执行的信息。
一旦执行完成,您可以像组合管道时一样访问任务输出;唯一的区别是输出现在是具体的值,而不是对未来输出的引用。
限制
本地执行旨在帮助快速在远程环境测试之前,测试组件和管道。
本地执行有几个限制:
- 本地执行不具备优化和附加功能,如缓存、重试等。虽然这些功能对于生产流水线很重要,但对于本地测试环境来说则不那么关键。你会发现任务方法如
.set_retry、.set_caching_options等在本地没有效果。 - 本地执行对您机器上可用的资源做出简单假设。 本地执行不支持指定与内存、核心、加速器等相关的资源请求/限制/亲和性。 您会发现像
.set_memory_limit、.set_memory_request、.set_accelerator_type等任务方法在本地没有效果。 - 本地执行不支持身份验证机制。如果您的组件与云资源交互或需要其他特权操作,则必须在云中测试您的管道。
- 虽然本地管道执行对顺序和嵌套管道具有完全支持,但尚不支持
dsl.Condition、dsl.ParallelFor或dsl.ExitHandler。
基本示例
在下面的示例中,我们使用 DockerRunner 类型,运行器类型将在下面详细介绍。
from kfp import local
from kfp import dsl
local.init(runner=local.DockerRunner())
@dsl.component
def add(a: int, b: int) -> int:
return a + b
# run a single component
task = add(a=1, b=2)
assert task.output == 3
# or run it in a pipeline
@dsl.pipeline
def math_pipeline(x: int, y: int, z: int) -> int:
t1 = add(a=x, b=y)
t2 = add(a=t1.output, b=z)
return t2.output
pipeline_task = math_pipeline(x=1, y=2, z=3)
assert pipeline_task.output == 6
同样,您可以创建工件并读取内容:
from kfp import local
from kfp import dsl
from kfp.dsl import Output, Artifact
import json
local.init(runner=local.SubprocessRunner())
@dsl.component
def add(a: int, b: int, out_artifact: Output[Artifact]):
import json
result = json.dumps(a + b)
with open(out_artifact.path, 'w') as f:
f.write(result)
out_artifact.metadata['operation'] = 'addition'
task = add(a=1, b=2)
# can read artifact contents
with open(task.outputs['out_artifact'].path) as f:
contents = f.read()
assert json.loads(contents) == 3
assert task.outputs['out_artifact'].metadata['operation'] == 'addition'
默认情况下,如果您的组件以失败状态退出,KFP 将抛出异常。您可以使用 raise_on_error 切换此行为。您还可以使用 pipeline_root 指定一个新的本地“管道根”。这是写入组件输出(包括工件)的本地目录。
local.init(runner=...,
raise_on_error=False,
pipeline_root='~/my/component/outputs')
运行器类型
Kubeflow pipelines 有两个本地运行器,您可以使用它们在本地执行您的组件和管道: DockerRunner 和 SubprocessRunner。
我们强烈建议尽可能使用 DockerRunner。
运行器: DockerRunner
这个 DockerRunner 需要 安装 Docker,但基本上不需要任何使用 Docker 的知识。
例如,要使用 DockerRunner:
from kfp import local
local.init(runner=local.DockerRunner())
由于本地 DockerRunner 在一个单独的容器中执行每个任务,DockerRunner:
- 提供最强的本地运行时环境隔离
- 最忠实于远程运行环境
- 允许执行所有组件类型:Lightweight Python Component、Containerized Python Components 和 Container Components
当你使用 DockerRunner 时,KFP 会将你的本地管道根目录挂载到容器中,以便在容器外写入输出。这意味着即使在容器退出后,你的组件输出仍然可以检查。
运行器: SubprocessRunner
只有在无法安装Docker的情况下,例如在某些笔记本环境中,才推荐使用SubprocessRunner。
例如,要使用 SubprocessRunner:
from kfp import local
local.init(runner=local.SubprocessRunner())
由于 SubprocessRunner 在子进程中运行您的代码,SubprocessRunner:
- 提供的本地运行时环境隔离性低于
DockerRunner - 不支持自定义镜像或轻松支持具有复杂环境依赖的任务
- 仅允许执行 Lightweight Python Component
提示
默认情况下,SubprocessRunner 将把你的依赖项安装到一个虚拟环境中。
这被推荐,但是可以通过设置 use_venv=False 来禁用:
from kfp import local
local.init(runner=local.SubprocessRunner(use_venv=False))