使用特定平台的功能

学习如何在Kubeflow Pipelines中使用特定于平台的功能。

概述

KFP的一个好处是跨平台的可移植性。 KFP SDK将管道定义编译为 IR YAML,可以被不同的后端读取和执行,包括Kubeflow Pipelines 开源后端Vertex AI Pipelines

对于特性无法跨平台移植的情况,用户可以通过 KFP SDK 平台特定的插件库编写具有平台特定功能的管道。一般来说,平台特定的插件库提供的功能类似于 KFP SDK 直接提供的 任务级配置方法

kfp-kubernetes

目前,唯一的KFP SDK平台特定插件库是 kfp-kubernetes,它由Kubeflow Pipelines 开源后端支持,能够直接访问某些Kubernetes资源和功能。

有关更多信息,请参阅 kfp-kubernetes 文档

Kubernetes 持久卷声明

在这个例子中,我们将使用 kfp-kubernetes 来创建一个 持久卷声明 (PVC),使用PVC在任务之间传递数据,然后删除PVC。

我们假设你对Kubernetes中的PersistentVolumePersistentVolumeClaim资源有基本的了解,以及对KFP中的编写组件编写管道有基本的了解。

步骤 1: 安装 kfp-kubernetes

运行以下命令安装 kfp-kubernetes 库:

pip install kfp[kubernetes]

步骤 2: 创建读取/写入挂载路径的组件

创建两个简单的组件,能够在/data目录中读写文件。

在后续步骤中,我们将把一个PVC卷挂载到/data目录。

from kfp import dsl

@dsl.component
def producer() -> str:
    with open('/data/file.txt', 'w') as file:
        file.write('Hello world')
    with open('/data/file.txt', 'r') as file:
        content = file.read()
    print(content)
    return content

@dsl.component
def consumer() -> str:
    with open('/data/file.txt', 'r') as file:
        content = file.read()
    print(content)
    return content

步骤 3: 使用 CreatePVC 动态创建 PVC

现在我们有了组件,我们可以开始构建一个管道。

我们需要一个 PVC 来挂载,所以我们将使用 kubernetes.CreatePVC 预先构建的组件来创建一个:

from kfp import kubernetes

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteMany'],
        size='5Gi',
        storage_class_name='standard',
    )

该组件从存储类'standard'提供一个5GB的PVC,使用ReadWriteMany 访问模式。 该PVC将以创建它的底层Argo工作流的名称命名,并附加后缀-my-pvcCreatePVC组件将这个名称作为输出'name'返回。

步骤 4: 读取和写入数据到 PVC

接下来,我们将使用 mount_pvc 任务修饰符与 producerconsumer 组件。

我们将 task2 安排在 task1 之后运行,以便组件不同时读取和写入 PVC。

    # write to the PVC
    task1 = producer()
    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )

    # read to the PVC
    task2 = consumer()
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/reused_data',
    )
    task2.after(task1)

步骤 5: 删除 PVC

最后,我们可以在 task2 完成后安排删除 PVC,以清理我们创建的 Kubernetes 资源。

    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']
    ).after(task2)

有关完整的管道和更多信息,请参阅类似示例kfp-kubernetes 文档中。

反馈

此页面有帮助吗?