Shortcuts

Kubeflow 管道

TorchX 提供了一个适配器,可以将 TorchX 组件作为 Kubeflow Pipelines 的一部分运行。请参阅 KubeFlow Pipelines 示例

../_images/kfp_diagram.jpg

torchx.pipelines.kfp

../_images/pipeline_kfp_diagram.png

该模块包含用于将TorchX组件转换为KubeFlow管道组件的适配器。

当前的KFP适配器仅支持单节点(1个角色和1个副本)组件。

torchx.pipelines.kfp.adapter.container_from_app(app: AppDef, *args: object, ui_metadata: Optional[Mapping[str, object]] = None, **kwargs: object) ContainerOp[source]

container_from_app 将应用程序转换为 KFP 组件并返回相应的 ContainerOp 实例。

有关参数的描述,请参见 component_from_app。任何未指定的参数都会传递给 KFP 容器工厂方法。

>>> import kfp
>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import container_from_app
>>> app_def = specs.AppDef(
...     name="trainer",
...     roles=[specs.Role("trainer", image="foo:latest")],
... )
>>> def pipeline():
...     trainer = container_from_app(app_def)
...     print(trainer)
>>> kfp.compiler.Compiler().compile(
...     pipeline_func=pipeline,
...     package_path="/tmp/pipeline.yaml",
... )
{'ContainerOp': {... 'name': 'trainer-trainer', ...}}
torchx.pipelines.kfp.adapter.resource_from_app(app: AppDef, queue: str, service_account: Optional[str] = None) ResourceOp[source]

resource_from_app 从提供的应用程序生成一个 KFP ResourceOp,该应用程序使用 Kubernetes 上的 Volcano 作业调度器来运行分布式应用程序。有关 Volcano 的更多信息以及如何安装,请参阅 https://volcano.sh/en/docs/

Parameters:
  • app – 要适配的 torchx AppDef。

  • queue – 用于调度操作符的Volcano队列。

>>> import kfp
>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import resource_from_app
>>> app_def = specs.AppDef(
...     name="trainer",
...     roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)],
... )
>>> def pipeline():
...     trainer = resource_from_app(app_def, queue="test")
...     print(trainer)
>>> kfp.compiler.Compiler().compile(
...     pipeline_func=pipeline,
...     package_path="/tmp/pipeline.yaml",
... )
{'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}}
torchx.pipelines.kfp.adapter.component_from_app(app: AppDef, ui_metadata: Optional[Mapping[str, object]] = None) ContainerFactory[source]

component_from_app 接收一个 TorchX 组件/AppDef 并返回一个 KFP ContainerOp 工厂。这相当于 kfp.components.load_component_from_* 方法。

Parameters:
>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import component_from_app
>>> app_def = specs.AppDef(
...     name="trainer",
...     roles=[specs.Role("trainer", image="foo:latest")],
... )
>>> component_from_app(app_def)
<function component_from_app...>
torchx.pipelines.kfp.adapter.component_spec_from_app(app: AppDef) Tuple[str, 角色][source]

component_spec_from_app 接收一个 TorchX 组件并为其生成 yaml 规范。值得注意的是,这不应用资源或 port_maps,因为这些必须在运行时应用,这就是为什么它也返回角色规范。

>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import component_spec_from_app
>>> app_def = specs.AppDef(
...     name="trainer",
...     roles=[specs.Role("trainer", image="foo:latest")],
... )
>>> component_spec_from_app(app_def)
('description: ...', Role(...))
class torchx.pipelines.kfp.adapter.ContainerFactory(*args, **kwargs)[source]

ContainerFactory 是一个协议,表示一个函数,当调用时会产生一个 kfp.dsl.ContainerOp。