构建基于Python的组件

使用Python构建您自己的轻量级管道组件

Kubeflow Pipelines 组件是一组自包含的代码,执行您机器学习工作流中的一步。管道组件由以下部分组成:

  • 组件代码,实施执行您机器学习工作流中步骤所需的逻辑。

  • 组件规范,定义了以下内容:

    • 组件的元数据,包括其名称和描述。
    • 组件的接口,组件的输入和输出。
    • 组件的实现、运行的Docker容器镜像、如何将输入传递给组件代码,以及如何获取组件的输出。

基于Python函数的组件通过让您将组件代码构建为Python函数并为您生成组件规范,使得快速迭代变得更容易。本文档描述了如何构建基于Python函数的组件并在您的管道中使用它们。

开始之前

  1. 运行以下命令来安装 Kubeflow Pipelines SDK。如果您在 Jupyter notebook 中运行此命令,请在安装 SDK 后重启内核。
$ pip install kfp==1.8
  1. 导入 kfp 包。
import kfp
from kfp.components import create_component_from_func
  1. 按照使用SDK客户端连接到Kubeflow Pipelines中的步骤创建kfp.Client的实例。
client = kfp.Client() # change arguments accordingly

有关 Kubeflow Pipelines SDK 的更多信息,请参阅 SDK 参考指南

开始使用基于Python的组件

本节演示如何通过创建一个简单组件的过程来开始构建基于Python的函数组件。

  1. 将您的组件代码定义为一个 独立的 Python 函数。在这个例子中,函数将两个浮点数相加并返回两个参数的和。
def add(a: float, b: float) -> float:
  '''Calculates sum of two arguments'''
  return a + b
  1. 使用 kfp.components.create_component_from_func 生成组件规范 YAML,并返回一个工厂函数,您可以使用该函数为您的管道创建 kfp.dsl.ContainerOp 类实例。组件规范 YAML 是您组件的可重用和可共享的定义。
add_op = create_component_from_func(
    add, output_component_file='add_component.yaml')
  1. 创建并运行你的管道。 了解有关创建和运行管道的更多信息
import kfp.dsl as dsl
@dsl.pipeline(
  name='Addition pipeline',
  description='An example pipeline that performs addition calculations.'
)
def add_pipeline(
  a='1',
  b='7',
):
  # Passes a pipeline parameter and a constant value to the `add_op` factory
  # function.
  first_add_task = add_op(a, 4)
  # Passes an output reference from `first_add_task` and a pipeline parameter
  # to the `add_op` factory function. For operations with a single return
  # value, the output reference can be accessed as `task.output` or
  # `task.outputs['output_name']`.
  second_add_task = add_op(first_add_task.output, b)

# Specify argument values for your pipeline run.
arguments = {'a': '7', 'b': '8'}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)

构建基于Python的组件

使用以下说明构建一个基于Python的组件:

  1. 定义一个独立的Python函数。该函数必须满足以下要求:

  2. Kubeflow Pipelines 使用您函数的输入和输出来定义您的组件接口。了解更多关于在组件之间传递数据的信息。您函数的输入和输出必须满足以下要求:

  3. (可选。) 如果你的函数有复杂的依赖关系,请选择或构建一个容器镜像,以便你的 Python 函数在其中运行。 了解更多关于选择或构建组件的容器镜像.

  4. 调用 kfp.components.create_component_from_func(func) 将您的函数转换为管道组件。

    • func: 要转换的Python函数。
    • base_image:(可选。)指定要在其中运行此函数的Docker容器镜像。了解有关选择或构建容器镜像的更多信息
    • output_component_file: (可选。)将您的组件定义写入文件。您可以使用此文件与同事共享组件或在不同的管道中重用它。
    • packages_to_install: (可选。)在运行您的函数之前,要安装的版本化的Python包列表。

使用和安装Python包

当 Kubeflow Pipelines 运行您的管道时,每个组件在 Kubernetes Pod 上的 Docker 容器镜像中运行。要加载您的 Python 函数所依赖的包,必须满足以下条件之一:

选择或构建容器镜像

当前,如果您不指定容器镜像,您的基于Python函数的组件将使用python:3.7容器镜像。如果您的函数具有复杂的依赖关系,您可能会受益于使用预先安装了依赖关系的容器镜像,或构建自定义容器镜像。预先安装依赖关系可以减少组件运行所需的时间,因为您的组件在每次运行时不需要下载和安装软件包。

许多框架,如 TensorFlowPyTorch,以及云服务提供商提供了预构建的容器镜像,这些镜像已经安装了常见的依赖项。

如果没有预构建的容器,您可以使用您的 Python 函数的依赖项构建自定义容器映像。有关构建自定义容器的更多信息,请阅读Docker 文档中的 Dockerfile 参考指南

如果您构建或选择一个容器镜像,而不是使用默认的容器镜像,则该容器镜像必须使用Python 3.5或更高版本。

了解数据是如何在组件之间传递的

当 Kubeflow Pipelines 运行您的组件时,会在 Kubernetes Pod 中启动一个容器镜像,并将您的组件输入作为命令行参数传递。当您的组件完成后,组件的输出将作为文件返回。

基于Python函数的组件使得构建管道组件变得更加容易,通过为您构建组件规范。基于Python函数的组件还处理将输入传递到您的组件以及将您函数的输出传回管道的复杂性。

以下章节描述如何通过值和文件传递参数。

  • 按值传递的参数包括数字、布尔值和短字符串。Kubeflow Pipelines通过按值传递参数,将值作为命令行参数传递给您的组件。
  • 通过文件传递的参数包括 CSV、图像和复杂类型。 这些文件存储在可被您的组件访问的位置, 例如持久卷声明或云存储服务。 Kubeflow Pipelines 通过 文件将参数传递给您的组件,方法是将它们的路径作为命令行参数传递。

输入和输出参数名称

当您使用 Kubeflow Pipelines SDK 将您的 Python 函数转换为管道组件时,Kubeflow Pipelines SDK 使用该函数的接口以以下方式定义您组件的接口:

  • 一些参数定义输入参数。
  • 一些参数定义输出参数。
  • 该函数的返回值被用作输出参数。如果返回值是一个 collections.namedtuple,则命名元组用于返回多个小值。

由于您可以将参数作为值或路径在组件之间传递,因此 Kubeflow Pipelines SDK 会去除泄露组件预期实现的常见参数后缀。例如,一个基于 Python 函数的组件接收数据并输出 CSV 数据,可能会有一个输出参数定义为 csv_path: comp.OutputPath(str)。在这种情况下,输出是 CSV 数据,而不是路径。因此,Kubeflow Pipelines SDK 将输出名称简化为 csv

Kubeflow Pipelines SDK 使用以下规则来定义组件接口中的输入和输出参数名称:

  • 如果参数名称以 _path 结尾,并且参数被注释为 kfp.components.InputPathkfp.components.OutputPath,则参数名称为去掉后缀 _path 的参数名称。
  • 如果参数名称以 _file 结尾,则参数名称是去掉尾部 _file 的参数名称。
  • 如果您使用 return 语句从您的组件返回一个单一的小值,则输出参数被命名为 output
  • 如果您的组件返回多个小值,通过返回一个 collections.namedtuple,Kubeflow Pipelines SDK 将使用元组的字段名称作为输出参数名称。

否则,Kubeflow Pipelines SDK 将参数名称用作参数名称。

按值传递参数

基于Python的组件使得通过值在组件之间传递参数变得更容易(例如数字、布尔值和短字符串),通过让您通过注解您的Python函数来定义组件的接口。支持的类型有 intfloatboolstr。您还可以通过值传递 listdict 实例,如果它们包含小的值,例如 intfloatboolstr 值。如果您没有注解您的函数,这些输入参数将作为字符串传递。

如果您的组件通过值返回多个输出,请使用 typing.NamedTuple 类型提示对您的函数进行注释,并使用 collections.namedtuple 函数将您的函数的输出作为 tuple 的新子类返回。

您还可以从您的函数中返回元数据和指标。

以下示例演示如何通过值返回多个输出,包括组件元数据和指标。

from typing import NamedTuple
def multiple_return_values_example(a: float, b: float) -> NamedTuple(
  'ExampleOutputs',
  [
    ('sum', float),
    ('product', float),
    ('mlpipeline_ui_metadata', 'UI_metadata'),
    ('mlpipeline_metrics', 'Metrics')
  ]):
  """Example function that demonstrates how to return multiple values."""  
  sum_value = a + b
  product_value = a * b

  # Export a sample tensorboard
  metadata = {
    'outputs' : [{
      'type': 'tensorboard',
      'source': 'gs://ml-pipeline-dataset/tensorboard-train',
    }]
  }

  # Export two metrics
  metrics = {
    'metrics': [
      {
        'name': 'sum',
        'numberValue':  float(sum_value),
      },{
        'name': 'product',
        'numberValue':  float(product_value),
      }
    ]  
  }

  from collections import namedtuple
  example_output = namedtuple(
      'ExampleOutputs',
      ['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
  return example_output(sum_value, product_value, metadata, metrics)

通过文件传递参数

基于Python的组件使您更容易将文件传递给您的组件,或者从您的组件返回文件,因为它让您可以注释您的Python函数的参数,以指定哪些参数指向文件。您的Python函数的参数可以指向输入文件或输出文件。如果您的参数是输出文件,Kubeflow Pipelines会将您的函数传递一个路径或流,您可以用它来存储您的输出文件。

以下示例接受一个文件作为输入,并返回两个文件作为输出。

def split_text_lines(
    source_path: comp.InputPath(str),
    odd_lines_path: comp.OutputPath(str),
    even_lines_path: comp.OutputPath(str)):
    """Splits a text file into two files, with even lines going to one file
    and odd lines to the other."""

    with open(source_path, 'r') as reader:
        with open(odd_lines_path, 'w') as odd_writer:
            with open(even_lines_path, 'w') as even_writer:
                while True:
                    line = reader.readline()
                    if line == "":
                        break
                    odd_writer.write(line)
                    line = reader.readline()
                    if line == "":
                        break
                    even_writer.write(line)

在这个例子中,输入和输出被定义为split_text_lines函数的参数。这允许Kubeflow Pipelines将源数据文件的路径和输出数据文件的路径传递给该函数。

要接受文件作为输入参数,请使用以下类型注释之一:

要将文件作为输出返回,请使用以下类型注释之一:

示例 Python 基于函数的组件

本节演示如何构建一个基于 Python 函数的组件,该组件使用导入、辅助函数,并产生多个输出。

  1. 定义您的函数。这个示例函数使用 numpy 包来计算给定被除数和除数的商和余数,在一个辅助函数中。除了商和余数,函数还返回用于可视化的元数据和两个指标。
from typing import NamedTuple

def my_divmod(
  dividend: float,
  divisor: float) -> NamedTuple(
    'MyDivmodOutput',
    [
      ('quotient', float),
      ('remainder', float),
      ('mlpipeline_ui_metadata', 'UI_metadata'),
      ('mlpipeline_metrics', 'Metrics')
    ]):
    '''Divides two numbers and calculate  the quotient and remainder'''

    # Import the numpy package inside the component function
    import numpy as np

    # Define a helper function
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    from tensorflow.python.lib.io import file_io
    import json

    # Export a sample tensorboard
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Export two metrics
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(quotient),
        },{
          'name': 'remainder',
          'numberValue':  float(remainder),
        }]}

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput',
        ['quotient', 'remainder', 'mlpipeline_ui_metadata',
         'mlpipeline_metrics'])
    return divmod_output(quotient, remainder, json.dumps(metadata),
                         json.dumps(metrics))
  1. 直接运行你的函数,或使用单元测试来测试它。
my_divmod(100, 7)
  1. 这应该返回一个类似以下的结果:

    MyDivmodOutput(quotient=14, remainder=2, mlpipeline_ui_metadata='{"outputs": [{"type": "tensorboard", "source": "gs://ml-pipeline-dataset/tensorboard-train"}]}', mlpipeline_metrics='{"metrics": [{"name": "quotient", "numberValue": 14.0}, {"name": "remainder", "numberValue": 2.0}]}')
    
  2. 使用 kfp.components.create_component_from_func 返回一个工厂函数,您可以使用该函数为您的管道创建 kfp.dsl.ContainerOp 类实例。此示例还指定了运行此函数的基础容器镜像。

divmod_op = comp.create_component_from_func(
    my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
  1. 定义你的管道。这个例子使用了之前例子中的 divmod_op 工厂函数和 add_op 工厂函数。
import kfp.dsl as dsl
@dsl.pipeline(
   name='Calculation pipeline',
   description='An example pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
   a='1',
   b='7',
   c='17',
):
    # Passes a pipeline parameter and a constant value as operation arguments.
    add_task = add_op(a, 4) # The add_op factory function returns
                            # a dsl.ContainerOp class instance. 

    # Passes the output of the add_task and a pipeline parameter as operation
    # arguments. For an operation with a single return value, the output
    # reference is accessed using `task.output` or
    # `task.outputs['output_name']`.
    divmod_task = divmod_op(add_task.output, b)

    # For an operation with multiple return values, output references are
    # accessed as `task.outputs['output_name']`.
    result_task = add_op(divmod_task.outputs['quotient'], c)
  1. 编译并运行您的管道。 了解更多关于编译和运行管道的信息
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}

# Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

反馈

此页面有帮助吗?