Shortcuts

高级KubeFlow管道示例

这是一个使用仅由TorchX组件构建的KubeFlow Pipelines的示例管道。

KFP适配器可用于直接将TorchX组件转换为可以在KFP中使用的内容。

输入参数

首先让我们为管道定义一些参数。

import argparse
import os.path
import sys
from typing import Dict

import kfp
import torchx
from torchx import specs
from torchx.components.dist import ddp as dist_ddp
from torchx.components.serve import torchserve
from torchx.components.utils import copy as utils_copy, python as utils_python
from torchx.pipelines.kfp.adapter import container_from_app


parser = argparse.ArgumentParser(description="example kfp pipeline")

TorchX组件是围绕图像构建的。根据你使用的调度器不同,这可能会有所不同,但对于KFP来说,这些图像被指定为docker容器。我们有一个容器用于示例应用程序,另一个用于标准内置应用程序。如果你修改了torchx示例代码,你需要在KFP上启动之前重新构建容器。

parser.add_argument(
    "--image",
    type=str,
    help="docker image to use for the examples apps",
    default=torchx.IMAGE,
)

大多数TorchX组件使用 fsspec 来抽象处理远程文件系统。这使得组件可以接受像 s3:// 这样的路径,以便轻松使用云存储提供商。

parser.add_argument(
    "--output_path",
    type=str,
    help="path to place the data",
    required=True,
)
parser.add_argument("--load_path", type=str, help="checkpoint path to load from")

此示例使用torchserve进行推理,因此我们需要指定一些选项。这假设您在默认命名空间中有一个名为torchserve的服务在同一个Kubernetes集群中运行。

查看 https://github.com/pytorch/serve/blob/master/kubernetes/README.md 获取有关如何设置 TorchServe 的信息。

parser.add_argument(
    "--management_api",
    type=str,
    help="path to the torchserve management API",
    default="http://torchserve.default.svc.cluster.local:8081",
)
parser.add_argument(
    "--model_name",
    type=str,
    help="the name of the inference model",
    default="tiny_image_net",
)

笔记本。

if "NOTEBOOK" in globals():
    argv = [
        "--output_path",
        "/tmp/output",
    ]
else:
    argv = sys.argv[1:]

args: argparse.Namespace = parser.parse_args(argv)

创建组件

第一步是将数据下载到我们可以处理的地方。为此,我们可以使用内置的复制组件。该组件接受两个有效的fsspec路径,并将它们从一个路径复制到另一个路径。在这种情况下,我们使用http作为源,并将output_path下的文件作为输出。

data_path: str = os.path.join(args.output_path, "tiny-imagenet-200.zip")
copy_app: specs.AppDef = utils_copy(
    "http://cs231n.stanford.edu/tiny-imagenet-200.zip",
    data_path,
    image=args.image,
)

下一个组件用于数据预处理。它接收来自前一个操作符的原始数据,并对其进行一些转换,以便与训练器一起使用。

datapreproc 将数据输出到指定的 fsspec 路径。这些路径都是预先指定的,因此我们有一个完全静态的管道。

processed_data_path: str = os.path.join(args.output_path, "processed")
datapreproc_app: specs.AppDef = utils_python(
    "--output_path",
    processed_data_path,
    "--input_path",
    data_path,
    "--limit",
    "100",
    image=args.image,
    m="torchx.examples.apps.datapreproc.datapreproc",
    cpu=1,
    memMB=1024,
)

接下来,我们将创建训练器组件,该组件接收来自之前的数据预处理组件的训练数据。我们通常会在一个单独的组件文件中定义这个组件。

拥有一个单独的组件文件允许您通过torchx run从TorchX CLI启动您的训练器以进行快速迭代,也可以以自动化方式从管道中运行它。

# make sure examples is on the path
if "__file__" in globals():
    sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", ".."))


logs_path: str = os.path.join(args.output_path, "logs")
models_path: str = os.path.join(args.output_path, "models")

trainer_app: specs.AppDef = dist_ddp(
    *(
        "--output_path",
        models_path,
        "--load_path",
        args.load_path or "",
        "--log_path",
        logs_path,
        "--data_path",
        processed_data_path,
        "--epochs",
        str(1),
    ),
    image=args.image,
    m="torchx.examples.apps.lightning.train",
    j="1x1",
    # per node resource settings
    cpu=1,
    memMB=3000,
)

为了让tensorboard路径显示在KFPs的用户界面中,我们需要一些元数据,以便KFP知道从哪里获取指标。

这将在我们创建KFP容器时使用。

ui_metadata: Dict[str, object] = {
    "outputs": [
        {
            "type": "tensorboard",
            "source": os.path.join(logs_path, "lightning_logs"),
        }
    ]
}

对于推理,我们正在利用一个内置的TorchX组件。该组件接收一个模型并将其上传到TorchServe管理API端点。

serve_app: specs.AppDef = torchserve(
    model_path=os.path.join(models_path, "model.mar"),
    management_api=args.management_api,
    image=args.image,
    params={
        "model_name": args.model_name,
        # set this to allocate a worker
        # "initial_workers": 1,
    },
)

为了模型的可解释性,我们利用了一个存储在其自身组件文件中的自定义组件。该组件接收来自datapreproc和train组件的输出,并生成带有集成梯度结果的图像。

interpret_path: str = os.path.join(args.output_path, "interpret")
interpret_app: specs.AppDef = utils_python(
    *(
        "--load_path",
        os.path.join(models_path, "last.ckpt"),
        "--data_path",
        processed_data_path,
        "--output_path",
        interpret_path,
    ),
    image=args.image,
    m="torchx.examples.apps.lightning.interpret",
)

管道定义

最后一步是通过KFP适配器使用torchx组件定义实际的管道,并导出可以上传到KFP集群的管道包。

KFP适配器目前不跟踪输入和输出,因此容器需要通过.after()指定它们的依赖关系。

我们调用.set_tty()以使组件的日志在示例中更具响应性。

def pipeline() -> None:
    # container_from_app creates a KFP container from the TorchX app
    # definition.
    copy = container_from_app(copy_app)
    copy.container.set_tty()

    datapreproc = container_from_app(datapreproc_app)
    datapreproc.container.set_tty()
    datapreproc.after(copy)

    # For the trainer we want to log that UI metadata so you can access
    # tensorboard from the UI.
    trainer = container_from_app(trainer_app, ui_metadata=ui_metadata)
    trainer.container.set_tty()
    trainer.after(datapreproc)

    if False:
        serve = container_from_app(serve_app)
        serve.container.set_tty()
        serve.after(trainer)

    if False:
        # Serve and interpret only require the trained model so we can run them
        # in parallel to each other.
        interpret = container_from_app(interpret_app)
        interpret.container.set_tty()
        interpret.after(trainer)


kfp.compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="pipeline.yaml",
)

with open("pipeline.yaml", "rt") as f:
    print(f.read())

一旦所有这些运行完毕,你应该会有一个管道文件(通常是pipeline.yaml),你可以通过UI或kfp.Client上传到你的KFP集群。

# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png'

脚本总运行时间: ( 0 分钟 0.000 秒)

Gallery generated by Sphinx-Gallery