使用特定平台的功能
概述
KFP的一个好处是跨平台的可移植性。 KFP SDK将管道定义编译为 IR YAML,可以被不同的后端读取和执行,包括Kubeflow Pipelines 开源后端 和 Vertex AI Pipelines。
对于特性无法跨平台移植的情况,用户可以通过 KFP SDK 平台特定的插件库编写具有平台特定功能的管道。一般来说,平台特定的插件库提供的功能类似于 KFP SDK 直接提供的 任务级配置方法。
kfp-kubernetes
目前,唯一的KFP SDK平台特定插件库是 kfp-kubernetes,它由Kubeflow Pipelines 开源后端支持,能够直接访问某些Kubernetes资源和功能。
有关更多信息,请参阅 kfp-kubernetes 文档。
Kubernetes 持久卷声明
在这个例子中,我们将使用 kfp-kubernetes 来创建一个 持久卷声明 (PVC),使用PVC在任务之间传递数据,然后删除PVC。
我们假设你对Kubernetes中的PersistentVolume和PersistentVolumeClaim资源有基本的了解,以及对KFP中的编写组件和编写管道有基本的了解。
步骤 1: 安装 kfp-kubernetes 库
运行以下命令安装 kfp-kubernetes 库:
pip install kfp[kubernetes]
步骤 2: 创建读取/写入挂载路径的组件
创建两个简单的组件,能够在/data目录中读写文件。
在后续步骤中,我们将把一个PVC卷挂载到/data目录。
from kfp import dsl
@dsl.component
def producer() -> str:
with open('/data/file.txt', 'w') as file:
file.write('Hello world')
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
@dsl.component
def consumer() -> str:
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
步骤 3: 使用 CreatePVC 动态创建 PVC
现在我们有了组件,我们可以开始构建一个管道。
我们需要一个 PVC 来挂载,所以我们将使用 kubernetes.CreatePVC 预先构建的组件来创建一个:
from kfp import kubernetes
@dsl.pipeline
def my_pipeline():
pvc1 = kubernetes.CreatePVC(
# can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
pvc_name_suffix='-my-pvc',
access_modes=['ReadWriteMany'],
size='5Gi',
storage_class_name='standard',
)
该组件从存储类'standard'提供一个5GB的PVC,使用ReadWriteMany 访问模式。
该PVC将以创建它的底层Argo工作流的名称命名,并附加后缀-my-pvc。CreatePVC组件将这个名称作为输出'name'返回。
步骤 4: 读取和写入数据到 PVC
接下来,我们将使用 mount_pvc 任务修饰符与 producer 和 consumer 组件。
我们将 task2 安排在 task1 之后运行,以便组件不同时读取和写入 PVC。
# write to the PVC
task1 = producer()
kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
# read to the PVC
task2 = consumer()
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/reused_data',
)
task2.after(task1)
步骤 5: 删除 PVC
最后,我们可以在 task2 完成后安排删除 PVC,以清理我们创建的 Kubernetes 资源。
delete_pvc1 = kubernetes.DeletePVC(
pvc_name=pvc1.outputs['name']
).after(task2)
有关完整的管道和更多信息,请参阅类似示例在kfp-kubernetes 文档中。